Added support for IP-style URL parsing; minor cosmetic changes
This commit is contained in:
Родитель
cc3c47fc0e
Коммит
d3f8ce5ed3
|
@ -5,12 +5,14 @@ import "sync/atomic"
|
|||
// AtomicMorpherInt32 identifies a method passed to and invoked by the AtomicMorphInt32 function.
|
||||
// The AtomicMorpher callback is passed a startValue and based on this value it returns
|
||||
// what the new value should be and the result that AtomicMorph should return to its caller.
|
||||
type AtomicMorpherInt32 func(startVal int32) (val int32, morphResult interface{})
|
||||
type atomicMorpherInt32 func(startVal int32) (val int32, morphResult interface{})
|
||||
|
||||
const targetAndMorpherMustNotBeNil = "target and morpher must not be nil"
|
||||
|
||||
// AtomicMorph atomically morphs target in to new value (and result) as indicated bythe AtomicMorpher callback function.
|
||||
func AtomicMorphInt32(target *int32, morpher AtomicMorpherInt32) interface{} {
|
||||
func atomicMorphInt32(target *int32, morpher atomicMorpherInt32) interface{} {
|
||||
if target == nil || morpher == nil {
|
||||
panic("target and morpher mut not be nil")
|
||||
panic(targetAndMorpherMustNotBeNil)
|
||||
}
|
||||
for {
|
||||
currentVal := atomic.LoadInt32(target)
|
||||
|
@ -24,12 +26,12 @@ func AtomicMorphInt32(target *int32, morpher AtomicMorpherInt32) interface{} {
|
|||
// AtomicMorpherUint32 identifies a method passed to and invoked by the AtomicMorph function.
|
||||
// The AtomicMorpher callback is passed a startValue and based on this value it returns
|
||||
// what the new value should be and the result that AtomicMorph should return to its caller.
|
||||
type AtomicMorpherUint32 func(startVal uint32) (val uint32, morphResult interface{})
|
||||
type atomicMorpherUint32 func(startVal uint32) (val uint32, morphResult interface{})
|
||||
|
||||
// AtomicMorph atomically morphs target in to new value (and result) as indicated bythe AtomicMorpher callback function.
|
||||
func AtomicMorphUint32(target *uint32, morpher AtomicMorpherUint32) interface{} {
|
||||
func atomicMorphUint32(target *uint32, morpher atomicMorpherUint32) interface{} {
|
||||
if target == nil || morpher == nil {
|
||||
panic("target and morpher mut not be nil")
|
||||
panic(targetAndMorpherMustNotBeNil)
|
||||
}
|
||||
for {
|
||||
currentVal := atomic.LoadUint32(target)
|
||||
|
@ -43,12 +45,12 @@ func AtomicMorphUint32(target *uint32, morpher AtomicMorpherUint32) interface{}
|
|||
// AtomicMorpherUint64 identifies a method passed to and invoked by the AtomicMorphUint64 function.
|
||||
// The AtomicMorpher callback is passed a startValue and based on this value it returns
|
||||
// what the new value should be and the result that AtomicMorph should return to its caller.
|
||||
type AtomicMorpherInt64 func(startVal int64) (val int64, morphResult interface{})
|
||||
type atomicMorpherInt64 func(startVal int64) (val int64, morphResult interface{})
|
||||
|
||||
// AtomicMorph atomically morphs target in to new value (and result) as indicated bythe AtomicMorpher callback function.
|
||||
func AtomicMorphInt64(target *int64, morpher AtomicMorpherInt64) interface{} {
|
||||
func atomicMorphInt64(target *int64, morpher atomicMorpherInt64) interface{} {
|
||||
if target == nil || morpher == nil {
|
||||
panic("target and morpher mut not be nil")
|
||||
panic(targetAndMorpherMustNotBeNil)
|
||||
}
|
||||
for {
|
||||
currentVal := atomic.LoadInt64(target)
|
||||
|
@ -62,12 +64,12 @@ func AtomicMorphInt64(target *int64, morpher AtomicMorpherInt64) interface{} {
|
|||
// AtomicMorpherUint64 identifies a method passed to and invoked by the AtomicMorphUint64 function.
|
||||
// The AtomicMorpher callback is passed a startValue and based on this value it returns
|
||||
// what the new value should be and the result that AtomicMorph should return to its caller.
|
||||
type AtomicMorpherUint64 func(startVal uint64) (val uint64, morphResult interface{})
|
||||
type atomicMorpherUint64 func(startVal uint64) (val uint64, morphResult interface{})
|
||||
|
||||
// AtomicMorph atomically morphs target in to new value (and result) as indicated bythe AtomicMorpher callback function.
|
||||
func AtomicMorphUint64(target *uint64, morpher AtomicMorpherUint64) interface{} {
|
||||
func atomicMorphUint64(target *uint64, morpher atomicMorpherUint64) interface{} {
|
||||
if target == nil || morpher == nil {
|
||||
panic("target and morpher mut not be nil")
|
||||
panic(targetAndMorpherMustNotBeNil)
|
||||
}
|
||||
for {
|
||||
currentVal := atomic.LoadUint64(target)
|
||||
|
|
|
@ -412,7 +412,7 @@ func (t *uploadStreamToBlockBlobOptions) chunk(ctx context.Context, num uint32,
|
|||
return nil
|
||||
}
|
||||
// Else, upload a staged block...
|
||||
AtomicMorphUint32(&t.maxBlockNum, func(startVal uint32) (val uint32, morphResult interface{}) {
|
||||
atomicMorphUint32(&t.maxBlockNum, func(startVal uint32) (val uint32, morphResult interface{}) {
|
||||
// Atomically remember (in t.numBlocks) the maximum block num we've ever seen
|
||||
if startVal < num {
|
||||
return num, nil
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package azblob
|
||||
|
||||
import (
|
||||
"net"
|
||||
"net/url"
|
||||
"strings"
|
||||
)
|
||||
|
@ -14,21 +15,47 @@ const (
|
|||
// existing URL into its parts by calling NewBlobURLParts(). You construct a URL from parts by calling URL().
|
||||
// NOTE: Changing any SAS-related field requires computing a new SAS signature.
|
||||
type BlobURLParts struct {
|
||||
Scheme string // Ex: "https://"
|
||||
Host string // Ex: "account.blob.core.windows.net"
|
||||
ContainerName string // "" if no container
|
||||
BlobName string // "" if no blob
|
||||
Snapshot string // "" if not a snapshot
|
||||
SAS SASQueryParameters
|
||||
UnparsedParams string
|
||||
Scheme string // Ex: "https://"
|
||||
Host string // Ex: "account.blob.core.windows.net", "10.132.141.33", "10.132.141.33:80"
|
||||
IPEndpointStyleInfo IPEndpointStyleInfo
|
||||
ContainerName string // "" if no container
|
||||
BlobName string // "" if no blob
|
||||
Snapshot string // "" if not a snapshot
|
||||
SAS SASQueryParameters
|
||||
UnparsedParams string
|
||||
}
|
||||
|
||||
// IPEndpointStyleInfo is used for IP endpoint style URL when working with Azure storage emulator.
|
||||
// Ex: "https://10.132.141.33/accountname/containername"
|
||||
type IPEndpointStyleInfo struct {
|
||||
AccountName string // "" if not using IP endpoint style
|
||||
}
|
||||
|
||||
// isIPEndpointStyle checkes if URL's host is IP, in this case the storage account endpoint will be composed as:
|
||||
// http(s)://IP(:port)/storageaccount/container/...
|
||||
// As url's Host property, host could be both host or host:port
|
||||
func isIPEndpointStyle(host string) bool {
|
||||
if host == "" {
|
||||
return false
|
||||
}
|
||||
if h, _, err := net.SplitHostPort(host); err == nil {
|
||||
host = h
|
||||
}
|
||||
// For IPv6, there could be case where SplitHostPort fails for cannot finding port.
|
||||
// In this case, eliminate the '[' and ']' in the URL.
|
||||
// For details about IPv6 URL, please refer to https://tools.ietf.org/html/rfc2732
|
||||
if host[0] == '[' && host[len(host)-1] == ']' {
|
||||
host = host[1 : len(host)-1]
|
||||
}
|
||||
return net.ParseIP(host) != nil
|
||||
}
|
||||
|
||||
// NewBlobURLParts parses a URL initializing BlobURLParts' fields including any SAS-related & snapshot query parameters. Any other
|
||||
// query parameters remain in the UnparsedParams field. This method overwrites all fields in the BlobURLParts object.
|
||||
func NewBlobURLParts(u url.URL) BlobURLParts {
|
||||
up := BlobURLParts{
|
||||
Scheme: u.Scheme,
|
||||
Host: u.Host,
|
||||
Scheme: u.Scheme,
|
||||
Host: u.Host,
|
||||
}
|
||||
|
||||
// Find the container & blob names (if any)
|
||||
|
@ -37,10 +64,17 @@ func NewBlobURLParts(u url.URL) BlobURLParts {
|
|||
if path[0] == '/' {
|
||||
path = path[1:] // If path starts with a slash, remove it
|
||||
}
|
||||
if isIPEndpointStyle(up.Host) {
|
||||
if accountEndIndex := strings.Index(path, "/"); accountEndIndex == -1 { // Slash not found; path has account name & no container name or blob
|
||||
up.IPEndpointStyleInfo.AccountName = path
|
||||
} else {
|
||||
up.IPEndpointStyleInfo.AccountName = path[:accountEndIndex] // The account name is the part between the slashes
|
||||
path = path[accountEndIndex+1:] // path refers to portion after the account name now (container & blob names)
|
||||
}
|
||||
}
|
||||
|
||||
// Find the next slash (if it exists)
|
||||
containerEndIndex := strings.Index(path, "/")
|
||||
if containerEndIndex == -1 { // Slash not found; path has container name & no blob name
|
||||
containerEndIndex := strings.Index(path, "/") // Find the next slash (if it exists)
|
||||
if containerEndIndex == -1 { // Slash not found; path has container name & no blob name
|
||||
up.ContainerName = path
|
||||
} else {
|
||||
up.ContainerName = path[:containerEndIndex] // The container name is the part between the slashes
|
||||
|
@ -77,6 +111,9 @@ func (values caseInsensitiveValues) Get(key string) ([]string, bool) {
|
|||
// field contains the SAS, snapshot, and unparsed query parameters.
|
||||
func (up BlobURLParts) URL() url.URL {
|
||||
path := ""
|
||||
if isIPEndpointStyle(up.Host) && up.IPEndpointStyleInfo.AccountName != "" {
|
||||
path += "/" + up.IPEndpointStyleInfo.AccountName
|
||||
}
|
||||
// Concatenate container & blob names (if they exist)
|
||||
if up.ContainerName != "" {
|
||||
path += "/" + up.ContainerName
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
)
|
||||
|
||||
// BlobSASSignatureValues is used to generate a Shared Access Signature (SAS) for an Azure Storage container or blob.
|
||||
// For more information, see https://docs.microsoft.com/rest/api/storageservices/constructing-a-service-sas
|
||||
type BlobSASSignatureValues struct {
|
||||
Version string `param:"sv"` // If not specified, this defaults to SASVersion
|
||||
Protocol SASProtocol `param:"spr"` // See the SASProtocol* constants
|
||||
|
|
|
@ -11,6 +11,10 @@ import (
|
|||
"github.com/Azure/azure-pipeline-go/pipeline"
|
||||
)
|
||||
|
||||
// TokenRefresher represents a callback method that you write; this method is called periodically
|
||||
// so you can refresh the token credential's value.
|
||||
type TokenRefresher func(credential TokenCredential) time.Duration
|
||||
|
||||
// TokenCredential represents a token credential (which is also a pipeline.Factory).
|
||||
type TokenCredential interface {
|
||||
Credential
|
||||
|
@ -20,12 +24,15 @@ type TokenCredential interface {
|
|||
|
||||
// NewTokenCredential creates a token credential for use with role-based access control (RBAC) access to Azure Storage
|
||||
// resources. You initialize the TokenCredential with an initial token value. If you pass a non-nil value for
|
||||
// tokenRefresher, then the function you pass will be called immediately (so it can refresh and change the
|
||||
// TokenCredential's token value by calling SetToken; your tokenRefresher function must return a time.Duration
|
||||
// tokenRefresher, then the function you pass will be called immediately so it can refresh and change the
|
||||
// TokenCredential's token value by calling SetToken. Your tokenRefresher function must return a time.Duration
|
||||
// indicating how long the TokenCredential object should wait before calling your tokenRefresher function again.
|
||||
func NewTokenCredential(initialToken string, tokenRefresher func(credential TokenCredential) time.Duration) TokenCredential {
|
||||
// If your tokenRefresher callback fails to refresh the token, you can return a duration of 0 to stop your
|
||||
// TokenCredential object from ever invoking tokenRefresher again. Also, oen way to deal with failing to refresh a
|
||||
// token is to cancel a context.Context object used by requests that have the TokenCredential object in their pipeline.
|
||||
func NewTokenCredential(initialToken string, tokenRefresher TokenRefresher) TokenCredential {
|
||||
tc := &tokenCredential{}
|
||||
tc.SetToken(initialToken) // We dont' set it above to guarantee atomicity
|
||||
tc.SetToken(initialToken) // We don't set it above to guarantee atomicity
|
||||
if tokenRefresher == nil {
|
||||
return tc // If no callback specified, return the simple tokenCredential
|
||||
}
|
||||
|
@ -68,7 +75,7 @@ type tokenCredential struct {
|
|||
|
||||
// The members below are only used if the user specified a tokenRefresher callback function.
|
||||
timer *time.Timer
|
||||
tokenRefresher func(c TokenCredential) time.Duration
|
||||
tokenRefresher TokenRefresher
|
||||
lock sync.Mutex
|
||||
stopped bool
|
||||
}
|
||||
|
@ -84,7 +91,7 @@ func (f *tokenCredential) SetToken(token string) { f.token.Store(token) }
|
|||
|
||||
// startRefresh calls refresh which immediately calls tokenRefresher
|
||||
// and then starts a timer to call tokenRefresher in the future.
|
||||
func (f *tokenCredential) startRefresh(tokenRefresher func(c TokenCredential) time.Duration) {
|
||||
func (f *tokenCredential) startRefresh(tokenRefresher TokenRefresher) {
|
||||
f.tokenRefresher = tokenRefresher
|
||||
f.stopped = false // In case user calls StartRefresh, StopRefresh, & then StartRefresh again
|
||||
f.refresh()
|
||||
|
@ -95,11 +102,13 @@ func (f *tokenCredential) startRefresh(tokenRefresher func(c TokenCredential) ti
|
|||
// in order to refresh the token again in the future.
|
||||
func (f *tokenCredential) refresh() {
|
||||
d := f.tokenRefresher(f) // Invoke the user's refresh callback outside of the lock
|
||||
f.lock.Lock()
|
||||
if !f.stopped {
|
||||
f.timer = time.AfterFunc(d, f.refresh)
|
||||
if d > 0 { // If duration is 0 or negative, refresher wants to not be called again
|
||||
f.lock.Lock()
|
||||
if !f.stopped {
|
||||
f.timer = time.AfterFunc(d, f.refresh)
|
||||
}
|
||||
f.lock.Unlock()
|
||||
}
|
||||
f.lock.Unlock()
|
||||
}
|
||||
|
||||
// stopRefresh stops any pending timer and sets stopped field to true to prevent
|
||||
|
|
Загрузка…
Ссылка в новой задаче