Коммит
914989036b
|
@ -121,9 +121,8 @@ func (f *SharedKeyCredential) buildStringToSign(request pipeline.Request) string
|
|||
headers.Get(headerIfUnmodifiedSince),
|
||||
headers.Get(headerRange),
|
||||
buildCanonicalizedHeader(headers),
|
||||
f.buildCanonicalizedResource(request),
|
||||
f.buildCanonicalizedResource(request.URL),
|
||||
}, "\n")
|
||||
|
||||
return stringToSign
|
||||
}
|
||||
|
||||
|
@ -156,43 +155,42 @@ func buildCanonicalizedHeader(headers http.Header) string {
|
|||
return string(ch.Bytes())
|
||||
}
|
||||
|
||||
func (f *SharedKeyCredential) buildCanonicalizedResource(request pipeline.Request) string {
|
||||
func (f *SharedKeyCredential) buildCanonicalizedResource(u *url.URL) string {
|
||||
// https://docs.microsoft.com/en-us/rest/api/storageservices/authentication-for-the-azure-storage-services
|
||||
cr := bytes.NewBufferString("/")
|
||||
cr.WriteString(f.accountName)
|
||||
|
||||
if len(request.URL.Path) > 0 {
|
||||
if len(u.Path) > 0 {
|
||||
// Any portion of the CanonicalizedResource string that is derived from
|
||||
// the resource's URI should be encoded exactly as it is in the URI.
|
||||
// -- https://msdn.microsoft.com/en-gb/library/azure/dd179428.aspx
|
||||
cr.WriteString(request.URL.EscapedPath())
|
||||
cr.WriteString(u.EscapedPath())
|
||||
} else {
|
||||
// a slash is required to indicate the root path
|
||||
cr.WriteString("/")
|
||||
}
|
||||
|
||||
params, err := url.ParseQuery(request.URL.RawQuery)
|
||||
// params is a map[string][]string; param name is key; params values is []string
|
||||
params, err := url.ParseQuery(u.RawQuery) // Returns URL decoded values
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if len(params) > 0 {
|
||||
cr.WriteRune('\n')
|
||||
|
||||
keys := []string{}
|
||||
for key := range params {
|
||||
keys = append(keys, key)
|
||||
if len(params) > 0 { // There is at least 1 query parameter
|
||||
paramNames := []string{} // We use this to sort the parameter key names
|
||||
for paramName := range params {
|
||||
paramNames = append(paramNames, paramName) // paramNames must be lowercase
|
||||
}
|
||||
sort.Strings(keys)
|
||||
sort.Strings(paramNames)
|
||||
|
||||
completeParams := []string{}
|
||||
for _, key := range keys {
|
||||
if len(params[key]) > 1 {
|
||||
sort.Strings(params[key])
|
||||
}
|
||||
for _, paramName := range paramNames {
|
||||
paramValues := params[paramName]
|
||||
sort.Strings(paramValues)
|
||||
|
||||
completeParams = append(completeParams, strings.Join([]string{key, ":", strings.Join(params[key], ",")}, ""))
|
||||
// Join the sorted key values separated by ','
|
||||
// Then prepend "keyName:"; then add this string to the buffer
|
||||
cr.WriteString("\n" + paramName + ":" + strings.Join(paramValues, ","))
|
||||
}
|
||||
cr.WriteString(strings.Join(completeParams, "\n"))
|
||||
}
|
||||
return string(cr.Bytes())
|
||||
}
|
||||
|
|
|
@ -3,7 +3,6 @@ package azblob
|
|||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
|
@ -12,9 +11,9 @@ import (
|
|||
"github.com/Azure/azure-pipeline-go/pipeline"
|
||||
)
|
||||
|
||||
// StreamToBlockBlobOptions identifies options used by the StreamToBlockBlob function. Note that the
|
||||
// UploadStreamToBlockBlobOptions identifies options used by the UploadStreamToBlockBlob function. Note that the
|
||||
// BlockSize field is mandatory and must be set; other fields are optional.
|
||||
type StreamToBlockBlobOptions struct {
|
||||
type UploadStreamToBlockBlobOptions struct {
|
||||
// BlockSize is mandatory. It specifies the block size to use; the maximum size is BlockBlobMaxPutBlockBytes.
|
||||
BlockSize int64
|
||||
|
||||
|
@ -26,18 +25,23 @@ type StreamToBlockBlobOptions struct {
|
|||
|
||||
// Metadata indicates the metadata to be associated with the blob when PutBlockList is called.
|
||||
Metadata Metadata
|
||||
// BlobAccessConditions???
|
||||
|
||||
// AccessConditions indicates the access conditions for the block blob.
|
||||
AccessConditions BlobAccessConditions
|
||||
}
|
||||
|
||||
// StreamToBlockBlob uploads a large stream of data in blocks to a block blob.
|
||||
func StreamToBlockBlob(ctx context.Context, stream io.ReaderAt, streamSize int64,
|
||||
blockBlobURL BlockBlobURL, o StreamToBlockBlobOptions) (*BlockBlobsPutBlockListResponse, error) {
|
||||
// UploadStreamToBlockBlob uploads a stream of data in blocks to a block blob.
|
||||
func UploadStreamToBlockBlob(ctx context.Context, stream io.ReaderAt, streamSize int64,
|
||||
blockBlobURL BlockBlobURL, o UploadStreamToBlockBlobOptions) (*BlockBlobsPutBlockListResponse, error) {
|
||||
|
||||
if o.BlockSize <= 0 || o.BlockSize > BlockBlobMaxPutBlockBytes {
|
||||
panic(fmt.Sprintf("BlockSize option must be > 0 and <= %d", BlockBlobMaxPutBlockBytes))
|
||||
}
|
||||
|
||||
numBlocks := ((streamSize - int64(1)) / o.BlockSize) + 1
|
||||
if numBlocks > BlockBlobMaxBlocks {
|
||||
panic(fmt.Sprintf("The streamSize is too big or the BlockSize is too small; the number of blocks must be <= %d", BlockBlobMaxBlocks))
|
||||
}
|
||||
blockIDList := make([]string, numBlocks) // Base 64 encoded block IDs
|
||||
blockSize := o.BlockSize
|
||||
|
||||
|
@ -54,50 +58,47 @@ func StreamToBlockBlob(ctx context.Context, stream io.ReaderAt, streamSize int64
|
|||
func(bytesTransferred int64) { o.Progress(streamOffset + bytesTransferred) })
|
||||
}
|
||||
|
||||
blockIDList[blockNum] = blockIDUint64ToBase64(uint64(streamOffset)) // The streamOffset is the block ID
|
||||
_, err := blockBlobURL.PutBlock(ctx, blockIDList[blockNum], body, LeaseAccessConditions{})
|
||||
// Block IDs are unique values to avoid issue if 2+ clients are uploading blocks
|
||||
// at the same time causeing PutBlockList to get a mix of blocks from all the clients.
|
||||
blockIDList[blockNum] = base64.StdEncoding.EncodeToString(newUUID().bytes())
|
||||
_, err := blockBlobURL.PutBlock(ctx, blockIDList[blockNum], body, o.AccessConditions.LeaseAccessConditions)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return blockBlobURL.PutBlockList(ctx, blockIDList, o.Metadata, o.BlobHTTPHeaders, BlobAccessConditions{})
|
||||
return blockBlobURL.PutBlockList(ctx, blockIDList, o.Metadata, o.BlobHTTPHeaders, o.AccessConditions)
|
||||
}
|
||||
|
||||
// NOTE: The blockID must be <= 64 bytes and ALL blockIDs for the block must be the same length
|
||||
// These helper functions convert an int64 block ID to a base-64 string
|
||||
func blockIDUint64ToBase64(blockID uint64) string {
|
||||
binaryBlockID := [64 / 8]byte{} // All block IDs are 8 bytes long
|
||||
binary.LittleEndian.PutUint64(binaryBlockID[:], blockID)
|
||||
return base64.StdEncoding.EncodeToString(binaryBlockID[:])
|
||||
}
|
||||
|
||||
// GetRetryStreamOptions is used to configure a call to NewGetTryStream to download a large stream with intelligent retries.
|
||||
type GetRetryStreamOptions struct {
|
||||
// DownloadStreamOptions is used to configure a call to NewDownloadBlobToStream to download a large stream with intelligent retries.
|
||||
type DownloadStreamOptions struct {
|
||||
// Range indicates the starting offset and count of bytes within the blob to download.
|
||||
Range BlobRange
|
||||
|
||||
// Acc indicates the BlobAccessConditions to use when accessing the blob.
|
||||
AC BlobAccessConditions
|
||||
|
||||
// GetBlobResult identifies a function to invoke immediately after GetRetryStream's Read method internally
|
||||
// calls GetBlob. This function is invoked after every call to GetBlob. The callback can example GetBlob's
|
||||
// response and error information.
|
||||
GetBlobResult func(*GetResponse, error)
|
||||
// AccessConditions indicates the BlobAccessConditions to use when accessing the blob.
|
||||
AccessConditions BlobAccessConditions
|
||||
}
|
||||
|
||||
type retryStream struct {
|
||||
ctx context.Context
|
||||
blobURL BlobURL
|
||||
o GetRetryStreamOptions
|
||||
getBlob func(ctx context.Context, blobRange BlobRange, ac BlobAccessConditions, rangeGetContentMD5 bool) (*GetResponse, error)
|
||||
o DownloadStreamOptions
|
||||
response *http.Response
|
||||
}
|
||||
|
||||
// NewGetRetryStream creates a stream over a blob allowing you download the blob's contents.
|
||||
// NewDownloadStream creates a stream over a blob allowing you download the blob's contents.
|
||||
// When network errors occur, the retry stream internally issues new HTTP GET requests for
|
||||
// the remaining range of the blob's contents.
|
||||
func NewGetRetryStream(ctx context.Context, blobURL BlobURL, o GetRetryStreamOptions) io.ReadCloser {
|
||||
// the remaining range of the blob's contents. The GetBlob argument identifies the function
|
||||
// to invoke when the GetRetryStream needs to make an HTTP GET request as Read methods are called.
|
||||
// The callback can wrap the response body (with progress reporting, for example) before returning.
|
||||
func NewDownloadStream(ctx context.Context,
|
||||
getBlob func(ctx context.Context, blobRange BlobRange, ac BlobAccessConditions, rangeGetContentMD5 bool) (*GetResponse, error),
|
||||
o DownloadStreamOptions) io.ReadCloser {
|
||||
|
||||
// BlobAccessConditions may already have an If-Match:etag header
|
||||
return &retryStream{ctx: ctx, blobURL: blobURL, o: o, response: nil}
|
||||
if getBlob == nil {
|
||||
panic("getBlob must not be nil")
|
||||
}
|
||||
return &retryStream{ctx: ctx, getBlob: getBlob, o: o, response: nil}
|
||||
}
|
||||
|
||||
func (s *retryStream) Read(p []byte) (n int, err error) {
|
||||
|
@ -111,6 +112,7 @@ func (s *retryStream) Read(p []byte) (n int, err error) {
|
|||
}
|
||||
return n, err // Return the return to the caller
|
||||
}
|
||||
s.Close()
|
||||
s.response = nil // Something went wrong; our stream is no longer good
|
||||
if nerr, ok := err.(net.Error); ok {
|
||||
if !nerr.Timeout() && !nerr.Temporary() {
|
||||
|
@ -122,11 +124,7 @@ func (s *retryStream) Read(p []byte) (n int, err error) {
|
|||
}
|
||||
|
||||
// We don't have a response stream to read from, try to get one
|
||||
response, err := s.blobURL.GetBlob(s.ctx, s.o.Range, s.o.AC, false)
|
||||
if s.o.GetBlobResult != nil {
|
||||
// If caller desires notification of each GetBlob call, notify them
|
||||
s.o.GetBlobResult(response, err)
|
||||
}
|
||||
response, err := s.getBlob(s.ctx, s.o.Range, s.o.AccessConditions, false)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
@ -134,13 +132,15 @@ func (s *retryStream) Read(p []byte) (n int, err error) {
|
|||
s.response = response.Response()
|
||||
|
||||
// Ensure that future requests are from the same version of the source
|
||||
s.o.AC.IfMatch = response.ETag()
|
||||
s.o.AccessConditions.IfMatch = response.ETag()
|
||||
|
||||
// Loop around and try to read from this stream
|
||||
}
|
||||
}
|
||||
|
||||
func (s *retryStream) Close() error {
|
||||
//s.blobURL = BlobURL{} // This blobURL is no longer valid
|
||||
return s.response.Body.Close()
|
||||
if s.response != nil && s.response.Body != nil {
|
||||
return s.response.Body.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -48,11 +48,11 @@ func NewBlobURLParts(u url.URL) BlobURLParts {
|
|||
}
|
||||
}
|
||||
|
||||
// Convert the query parameters to a case-sensitive map & trim whitsapce
|
||||
// Convert the query parameters to a case-sensitive map & trim whitespace
|
||||
paramsMap := u.Query()
|
||||
|
||||
up.Snapshot = time.Time{} // Assume no snapshot
|
||||
if snapshotStr, ok := paramsMap["snapshot"]; ok {
|
||||
if snapshotStr, ok := caseInsensitiveValues(paramsMap).Get("snapshot"); ok {
|
||||
up.Snapshot, _ = time.Parse(snapshotTimeFormat, snapshotStr[0])
|
||||
// If we recognized the query parameter, remove it from the map
|
||||
delete(paramsMap, "snapshot")
|
||||
|
@ -62,6 +62,17 @@ func NewBlobURLParts(u url.URL) BlobURLParts {
|
|||
return up
|
||||
}
|
||||
|
||||
type caseInsensitiveValues url.Values // map[string][]string
|
||||
func (v caseInsensitiveValues) Get(key string) ([]string, bool) {
|
||||
key = strings.ToLower(key)
|
||||
for key, value := range v {
|
||||
if strings.ToLower(key) == key {
|
||||
return value, true
|
||||
}
|
||||
}
|
||||
return []string{}, false
|
||||
}
|
||||
|
||||
// URL returns a URL object whose fields are initialized from the BlobURLParts fields. The URL's RawQuery
|
||||
// field contains the SAS, snapshot, and unparsed query parameters.
|
||||
func (up BlobURLParts) URL() url.URL {
|
||||
|
|
|
@ -5,7 +5,9 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"runtime"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/Azure/azure-pipeline-go/pipeline"
|
||||
|
@ -22,7 +24,7 @@ func NewRequestLogPolicyFactory(o RequestLogOptions) pipeline.Factory {
|
|||
if o.LogWarningIfTryOverThreshold == 0 {
|
||||
// It would be good to relate this to https://azure.microsoft.com/en-us/support/legal/sla/storage/v1_2/
|
||||
// But this monitors the time to get the HTTP response; NOT the time to download the response body.
|
||||
o.LogWarningIfTryOverThreshold = 2 * time.Second // Default to 2 seconds
|
||||
o.LogWarningIfTryOverThreshold = 3 * time.Second // Default to 3 seconds
|
||||
}
|
||||
return &requestLogPolicyFactory{o: o}
|
||||
}
|
||||
|
@ -36,20 +38,47 @@ func (f *requestLogPolicyFactory) New(node pipeline.Node) pipeline.Policy {
|
|||
}
|
||||
|
||||
type requestLogPolicy struct {
|
||||
node pipeline.Node
|
||||
o RequestLogOptions
|
||||
try int
|
||||
node pipeline.Node
|
||||
o RequestLogOptions
|
||||
try int
|
||||
operationStart time.Time
|
||||
}
|
||||
|
||||
func redactSigQueryParam(rawQuery string) (bool, string) {
|
||||
sigFound := strings.EqualFold(rawQuery, "?sig=")
|
||||
if !sigFound {
|
||||
sigFound = strings.EqualFold(rawQuery, "&sig=")
|
||||
if !sigFound {
|
||||
return sigFound, rawQuery // [?|&]sig= not found; return same rawQuery passed in (no memory allocation)
|
||||
}
|
||||
}
|
||||
// [?|&]sig= was found, redact its value
|
||||
values, _ := url.ParseQuery(rawQuery)
|
||||
for name := range values {
|
||||
if strings.EqualFold(name, "sig") {
|
||||
values[name] = []string{"(redacted)"}
|
||||
}
|
||||
}
|
||||
return sigFound, values.Encode()
|
||||
}
|
||||
|
||||
func (p *requestLogPolicy) Do(ctx context.Context, request pipeline.Request) (response pipeline.Response, err error) {
|
||||
p.try++ // The first try is #1 (not #0)
|
||||
operationStart := time.Now()
|
||||
if p.try == 1 {
|
||||
p.operationStart = time.Now() // If this is the 1st try, record the operation state time
|
||||
}
|
||||
|
||||
// Log the outgoing request as informational
|
||||
if p.node.WouldLog(pipeline.LogInfo) {
|
||||
b := &bytes.Buffer{}
|
||||
fmt.Fprintf(b, "==> OUTGOING REQUEST (Try=%d)\n", p.try)
|
||||
pipeline.WriteRequest(b, request.Request)
|
||||
req := request
|
||||
if sigFound, rawQuery := redactSigQueryParam(req.URL.RawQuery); sigFound {
|
||||
// TODO: Make copy so we dont' destroy the query parameters we actually need to send in the request
|
||||
req = request.Copy()
|
||||
req.Request.URL.RawQuery = rawQuery
|
||||
}
|
||||
pipeline.WriteRequest(b, req.Request)
|
||||
p.node.Log(pipeline.LogInfo, b.String())
|
||||
}
|
||||
|
||||
|
@ -58,11 +87,11 @@ func (p *requestLogPolicy) Do(ctx context.Context, request pipeline.Request) (re
|
|||
response, err = p.node.Do(ctx, request) // Make the request
|
||||
tryEnd := time.Now()
|
||||
tryDuration := tryEnd.Sub(tryStart)
|
||||
opDuration := tryEnd.Sub(operationStart)
|
||||
opDuration := tryEnd.Sub(p.operationStart)
|
||||
|
||||
severity := pipeline.LogInfo // Assume success and default to informational logging
|
||||
logMsg := func(b *bytes.Buffer) {
|
||||
b.WriteString("SUCCESS\n")
|
||||
b.WriteString("SUCCESSFUL OPERATION\n")
|
||||
pipeline.WriteResponseWithRequest(b, response.Response())
|
||||
}
|
||||
|
||||
|
@ -71,32 +100,35 @@ func (p *requestLogPolicy) Do(ctx context.Context, request pipeline.Request) (re
|
|||
// Log a warning if the try duration exceeded the specified threshold
|
||||
severity = pipeline.LogWarning
|
||||
logMsg = func(b *bytes.Buffer) {
|
||||
fmt.Fprintf(b, "SLOW [tryDuration > %v]\n", p.o.LogWarningIfTryOverThreshold)
|
||||
fmt.Fprintf(b, "SLOW OPERATION [tryDuration > %v]\n", p.o.LogWarningIfTryOverThreshold)
|
||||
pipeline.WriteResponseWithRequest(b, response.Response())
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
if serr, ok := err.(StorageError); ok && serr.Response() != nil {
|
||||
// This Storage Error did get a HTTP response from the service, so we won't change the severity
|
||||
logMsg = func(b *bytes.Buffer) {
|
||||
fmt.Fprintf(b, "OPERATION ERROR:\n%v\n", serr)
|
||||
}
|
||||
} else {
|
||||
// This error did not get an HTTP response from the service; upgrade the severity to Error
|
||||
severity = pipeline.LogError
|
||||
if err == nil { // We got a response from the service
|
||||
sc := response.Response().StatusCode
|
||||
if ((sc >= 400 && sc <= 499) && sc != http.StatusNotFound && sc != http.StatusConflict && sc != http.StatusPreconditionFailed && sc != http.StatusRequestedRangeNotSatisfiable) || (sc >= 500 && sc <= 599) {
|
||||
severity = pipeline.LogError // Promote to Error any 4xx (except those listed is an error) or any 5xx
|
||||
logMsg = func(b *bytes.Buffer) {
|
||||
// Write the error, the originating request and the stack
|
||||
fmt.Fprintf(b, "NETWORK ERROR:\n%v\n", err)
|
||||
pipeline.WriteRequest(b, request.Request)
|
||||
fmt.Fprintf(b, "OPERATION ERROR:\n%v\n", err)
|
||||
pipeline.WriteResponseWithRequest(b, response.Response())
|
||||
b.Write(stack()) // For errors, we append the stack trace (an expensive operation)
|
||||
}
|
||||
} else {
|
||||
// For other status codes, we leave the severity as is.
|
||||
}
|
||||
} else { // This error did not get an HTTP response from the service; upgrade the severity to Error
|
||||
severity = pipeline.LogError
|
||||
logMsg = func(b *bytes.Buffer) {
|
||||
// Write the error, the originating request and the stack
|
||||
fmt.Fprintf(b, "NETWORK ERROR:\n%v\n", err)
|
||||
pipeline.WriteRequest(b, request.Request)
|
||||
b.Write(stack()) // For errors, we append the stack trace (an expensive operation)
|
||||
}
|
||||
} else if response.Response().StatusCode == http.StatusInternalServerError || response.Response().StatusCode == http.StatusServiceUnavailable {
|
||||
severity = pipeline.LogError // If the service returns 500 or 503, then log this as an error
|
||||
}
|
||||
|
||||
if p.node.WouldLog(severity) || true { // true is for testing
|
||||
if p.node.WouldLog(severity) || false { // Change false to true for testing
|
||||
// We're going to log this; build the string to log
|
||||
b := &bytes.Buffer{}
|
||||
fmt.Fprintf(b, "==> REQUEST/RESPONSE (Try=%d, TryDuration=%v, OpDuration=%v) -- ", p.try, tryDuration, opDuration)
|
||||
|
|
|
@ -131,6 +131,11 @@ type retryPolicy struct {
|
|||
o RetryOptions
|
||||
}
|
||||
|
||||
// According to https://github.com/golang/go/wiki/CompilerOptimizations, the compiler will inline this method and hopefully optimize all calls to it away
|
||||
var logf = func(format string, a ...interface{}) {}
|
||||
|
||||
//var logf = fmt.Printf // Use this version to see the retry method's code path (import "fmt")
|
||||
|
||||
func (p *retryPolicy) Do(ctx context.Context, request pipeline.Request) (response pipeline.Response, err error) {
|
||||
// Before each try, we'll select either the primary or secondary URL.
|
||||
secondaryHost := ""
|
||||
|
@ -149,20 +154,26 @@ func (p *retryPolicy) Do(ctx context.Context, request pipeline.Request) (respons
|
|||
// For a primary wait ((2 ^ primaryTries - 1) * delay * random(0.8, 1.2)
|
||||
// If secondary gets a 404, don't fail, retry but future retries are only against the primary
|
||||
// When retrying against a secondary, ignore the retry count and wait (.1 second * random(0.8, 1.2))
|
||||
for try := 0; try < p.o.MaxTries; try++ {
|
||||
// Determine which endpoint to try. It's primary if there is no secondary or if it is an even attempt.
|
||||
tryingPrimary := !considerSecondary || (try%2 == 0)
|
||||
for try := 1; try <= p.o.MaxTries; try++ {
|
||||
logf("\n=====> Try=%d\n", try)
|
||||
|
||||
// Determine which endpoint to try. It's primary if there is no secondary or if it is an add # attempt.
|
||||
tryingPrimary := !considerSecondary || (try%2 == 1)
|
||||
// Select the correct host and delay
|
||||
if tryingPrimary {
|
||||
primaryTry++
|
||||
time.Sleep(p.o.calcDelay(primaryTry)) // The 1st try returns 0 delay
|
||||
delay := p.o.calcDelay(primaryTry)
|
||||
logf("Primary try=%d, Delay=%v\n", primaryTry, delay)
|
||||
time.Sleep(delay) // The 1st try returns 0 delay
|
||||
} else {
|
||||
time.Sleep(time.Second * time.Duration(rand.Float32()/2+0.8)) // Delay with some jitter before trying secondary
|
||||
delay := time.Second * time.Duration(rand.Float32()/2+0.8)
|
||||
logf("Secondary try=%d, Delay=%v\n", try-primaryTry, delay)
|
||||
time.Sleep(delay) // Delay with some jitter before trying secondary
|
||||
}
|
||||
|
||||
// Clone the original request to ensure that each try starts with the original (unmutated) request.
|
||||
requestCopy := request.Copy()
|
||||
if try > 0 {
|
||||
if try > 1 {
|
||||
// For a retry, seek to the beginning of the Body stream.
|
||||
if err = requestCopy.RewindBody(); err != nil {
|
||||
panic(err)
|
||||
|
@ -176,50 +187,49 @@ func (p *retryPolicy) Do(ctx context.Context, request pipeline.Request) (respons
|
|||
timeout := int(p.o.TryTimeout.Seconds()) // Max seconds per try
|
||||
if deadline, ok := ctx.Deadline(); ok { // If user's ctx has a deadline, make the timeout the smaller of the two
|
||||
t := int(deadline.Sub(time.Now()).Seconds()) // Duration from now until user's ctx reaches its deadline
|
||||
logf("MaxTryTimeout=%d secs, TimeTilDeadline=%d sec\n", timeout, t)
|
||||
if t < timeout {
|
||||
timeout = t
|
||||
}
|
||||
if timeout < 0 {
|
||||
timeout = 0 // If timeout ever goes negative, set it to zero; this happen while debugging
|
||||
}
|
||||
logf("TryTimeout adjusted to=%d sec\n", timeout)
|
||||
}
|
||||
q := requestCopy.Request.URL.Query()
|
||||
q.Set("timeout", strconv.Itoa(timeout))
|
||||
requestCopy.Request.URL.RawQuery = q.Encode()
|
||||
logf("Url=%s\n", requestCopy.Request.URL.String())
|
||||
|
||||
// Set the time for this particular retry operation and then Do the operation.
|
||||
tryCtx, tryCancel := context.WithTimeout(ctx, time.Second*time.Duration(timeout))
|
||||
response, err = p.node.Do(tryCtx, requestCopy) // Make the request
|
||||
logf("Err=%v, response=%v\n", err, response)
|
||||
|
||||
action := "" // This MUST get changed by the code below
|
||||
if ctx.Err() != nil {
|
||||
action := "" // This MUST get changed within the switch code below
|
||||
switch {
|
||||
case ctx.Err() != nil:
|
||||
action = "NoRetry: Op timeout"
|
||||
} else if err != nil { // Protocol Responder returns non-nil if REST API returns invalid status code
|
||||
if nerr, ok := err.(net.Error); ok {
|
||||
// We have a network or StorageError
|
||||
if nerr.Temporary() { // If a StorageError, an HTTP 500/503 returns true (service throttling)
|
||||
action = "Retry: Temporary"
|
||||
} else if nerr.Timeout() && (tryCtx.Err() != nil) {
|
||||
action = "Retry: Timeout"
|
||||
} else if !tryingPrimary {
|
||||
// If attempt was against the secondary & it returned a StatusNotFound (404), then
|
||||
// the resource was not found. This may be due to replication delay. So, in this
|
||||
// case, we'll never try the secondary again for this operation.
|
||||
if resp := response.Response(); resp != nil && resp.StatusCode == http.StatusNotFound {
|
||||
considerSecondary = false
|
||||
action = "Retry: Secondary URL 404"
|
||||
} else {
|
||||
// An error (against secondary DC) that is neither temporary or timeout; no retry
|
||||
action = "NoRetry: error (secondary; not-retryable & not 404)"
|
||||
}
|
||||
} else {
|
||||
// An error that is neither temporary or timeout; no retry
|
||||
action = "NoRetry: error (not-retryable)"
|
||||
}
|
||||
case !tryingPrimary && response != nil && response.Response().StatusCode == http.StatusNotFound:
|
||||
// If attempt was against the secondary & it returned a StatusNotFound (404), then
|
||||
// the resource was not found. This may be due to replication delay. So, in this
|
||||
// case, we'll never try the secondary again for this operation.
|
||||
considerSecondary = false
|
||||
action = "Retry: Secondary URL returned 404"
|
||||
case err == context.DeadlineExceeded: // tryCtx.Err should also return context.DeadlineExceeded
|
||||
action = "Retry: timeout"
|
||||
case err != nil:
|
||||
// NOTE: Protocol Responder returns non-nil if REST API returns invalid status code for the invoked operation
|
||||
if nerr, ok := err.(net.Error); ok && (nerr.Temporary() || nerr.Timeout()) { // We have a network or StorageError
|
||||
action = "Retry: net.Error and Temporary() or Timeout()"
|
||||
} else {
|
||||
// A non-net.Error error; no retry
|
||||
action = "NoRetry: error (non-net.Error)"
|
||||
action = "NoRetry: unrecognized error"
|
||||
}
|
||||
} else {
|
||||
action = "NoRetry: success" // no error
|
||||
default:
|
||||
action = "NoRetry: successful HTTP request" // no error
|
||||
}
|
||||
|
||||
logf("Action=%s\n", action)
|
||||
// fmt.Println(action + "\n") // This is where we could log the retry operation; action is why we're retrying
|
||||
if action[0] != 'R' { // Retry only if action starts with 'R'
|
||||
if err != nil {
|
||||
|
|
|
@ -20,13 +20,18 @@ type TelemetryOptions struct {
|
|||
// NewTelemetryPolicyFactory creates a factory that can create telemetry policy objects
|
||||
// which add telemetry information to outgoing HTTP requests.
|
||||
func NewTelemetryPolicyFactory(o TelemetryOptions) pipeline.Factory {
|
||||
return &telemetryPolicyFactory{serviceVersion: serviceLibVersion, options: o}
|
||||
b := &bytes.Buffer{}
|
||||
b.WriteString(o.Value)
|
||||
if b.Len() > 0 {
|
||||
b.WriteRune(' ')
|
||||
}
|
||||
fmt.Fprintf(b, "Azure-Storage/%s %s", serviceLibVersion, platformInfo)
|
||||
return &telemetryPolicyFactory{telemetryValue: b.String()}
|
||||
}
|
||||
|
||||
// telemetryPolicyFactory struct
|
||||
type telemetryPolicyFactory struct {
|
||||
serviceVersion string
|
||||
options TelemetryOptions
|
||||
telemetryValue string
|
||||
}
|
||||
|
||||
// New creates a telemetryPolicy object.
|
||||
|
@ -41,14 +46,7 @@ type telemetryPolicy struct {
|
|||
}
|
||||
|
||||
func (p *telemetryPolicy) Do(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {
|
||||
request = request.Copy() // Don't mutate the incoming request object's headers
|
||||
b := &bytes.Buffer{}
|
||||
b.WriteString(p.factory.options.Value)
|
||||
if b.Len() > 0 {
|
||||
b.WriteRune(' ')
|
||||
}
|
||||
fmt.Fprintf(b, "Azure-Storage/%s %s", p.factory.serviceVersion, platformInfo)
|
||||
request.Header.Set("User-Agent", b.String())
|
||||
request.Header.Set("User-Agent", p.factory.telemetryValue)
|
||||
return p.node.Do(ctx, request)
|
||||
}
|
||||
|
||||
|
|
|
@ -34,7 +34,6 @@ type uniqueRequestIDPolicy struct {
|
|||
func (p *uniqueRequestIDPolicy) Do(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {
|
||||
id := request.Header.Get(xMsClientRequestID)
|
||||
if id == "" { // Add a unique request ID if the caller didn't specify one already
|
||||
request = request.Copy() // Don't mutate the incoming request object's headers
|
||||
request.Header.Set(xMsClientRequestID, newUUID().String())
|
||||
}
|
||||
return p.node.Do(ctx, request)
|
||||
|
@ -108,3 +107,7 @@ func parseUUID(uuidStr string) uuid {
|
|||
}
|
||||
return uuidVal
|
||||
}
|
||||
|
||||
func (u uuid) bytes() []byte {
|
||||
return u[:]
|
||||
}
|
||||
|
|
|
@ -59,7 +59,7 @@ func NewSASQueryParameters(values url.Values) SASQueryParameters {
|
|||
for k, v := range values {
|
||||
val := v[0]
|
||||
isSASKey := true
|
||||
switch k {
|
||||
switch strings.ToLower(k) {
|
||||
case "sv":
|
||||
p.Version = val
|
||||
case "ss":
|
||||
|
|
|
@ -81,11 +81,11 @@ func (v BlobSASSignatureValues) NewSASQueryParameters(sharedKeyCredential *Share
|
|||
func getCanonicalName(account string, containerName string, blobName string) string {
|
||||
// Container: "/blob/account/containername"
|
||||
// Blob: "/blob/account/containername/blobname"
|
||||
elems := []string{"/blob/", account, "/", containerName}
|
||||
elements := []string{"/blob/", account, "/", containerName}
|
||||
if blobName != "" {
|
||||
elems = append(elems, "/", strings.Replace(blobName, "\\", "/", -1))
|
||||
elements = append(elements, "/", strings.Replace(blobName, "\\", "/", -1))
|
||||
}
|
||||
return strings.Join(elems, "")
|
||||
return strings.Join(elements, "")
|
||||
}
|
||||
|
||||
// The ContainerSASPermissions type simplifies creating the permissions string for an Azure Storage container SAS.
|
||||
|
|
|
@ -134,7 +134,7 @@ func (b BlobURL) Delete(ctx context.Context, deleteOptions DeleteSnapshotsOption
|
|||
ifModifiedSince, ifUnmodifiedSince, ifMatchETag, ifNoneMatchETag, nil)
|
||||
}
|
||||
|
||||
// GetMetadata returns the blob's metadata and properties.
|
||||
// GetPropertiesAndMetadata returns the blob's metadata and properties.
|
||||
// For more information, see https://docs.microsoft.com/rest/api/storageservices/get-blob-properties.
|
||||
func (b BlobURL) GetPropertiesAndMetadata(ctx context.Context, ac BlobAccessConditions) (*BlobsGetPropertiesResponse, error) {
|
||||
ifModifiedSince, ifUnmodifiedSince, ifMatchETag, ifNoneMatchETag := ac.HTTPAccessConditions.pointers()
|
||||
|
|
|
@ -12,6 +12,9 @@ import (
|
|||
const (
|
||||
// BlockBlobMaxPutBlockBytes indicates the maximum number of bytes that can be sent in a call to PutBlock.
|
||||
BlockBlobMaxPutBlockBytes = 100 * 1024 * 1024 // 100MB
|
||||
|
||||
// BlockBlobMaxBlocks indicates the maximum number of blocks allowed in a block blob.
|
||||
BlockBlobMaxBlocks = 50000
|
||||
)
|
||||
|
||||
// BlockBlobURL defines a set of operations applicable to block blobs.
|
||||
|
|
|
@ -138,11 +138,12 @@ func ExampleNewPipeline() {
|
|||
|
||||
// Set LogOptions to control what & where all pipeline log events go
|
||||
Log: pipeline.LogOptions{
|
||||
LogMaxSeverity: pipeline.LogInfo, // Log all events from informational to more severe
|
||||
Log: func(s pipeline.LogSeverity, m string) { // This func is called to log each event
|
||||
// This method is not called for filtered-out severities.
|
||||
logger.Output(2, m) // This example uses Go's standard logger
|
||||
}},
|
||||
},
|
||||
MinimumSeverityToLog: func() pipeline.LogSeverity { return pipeline.LogInfo }, // Log all events from informational to more severe
|
||||
},
|
||||
}
|
||||
|
||||
// Create a request pipeline object configured with credentials and with pipeline options. Once created,
|
||||
|
@ -959,8 +960,8 @@ func ExampleStreamToBlockBlob() {
|
|||
ctx := context.Background() // This example uses a never-expiring context
|
||||
|
||||
// Pass the Context, stream, stream size, block blob URL, and options to StreamToBlockBlob
|
||||
putBlockList, err := StreamToBlockBlob(ctx, file, fileSize.Size(), blockBlobURL,
|
||||
StreamToBlockBlobOptions{
|
||||
putBlockList, err := UploadStreamToBlockBlob(ctx, file, fileSize.Size(), blockBlobURL,
|
||||
UploadStreamToBlockBlobOptions{
|
||||
// BlockSize is mandatory. It specifies the block size to use; the maximum size is BlockBlobMaxPutBlockBytes.
|
||||
BlockSize: BlockBlobMaxPutBlockBytes,
|
||||
|
||||
|
@ -989,17 +990,18 @@ func ExampleNewGetRetryStream() {
|
|||
contentLength := int64(0) // Used for progress reporting to report the total number of bytes being downloaded.
|
||||
|
||||
// NewGetRetryStream creates an intelligent retryable stream around a blob; it returns an io.ReadCloser.
|
||||
rs := NewGetRetryStream(context.Background(), blobURL,
|
||||
GetRetryStreamOptions{
|
||||
// We set GetBlobResult so we can capture the blob's full content length on the very
|
||||
// first internal call to GetBlob.
|
||||
GetBlobResult: func(response *GetResponse, err error) {
|
||||
if err == nil && contentLength == 0 {
|
||||
// If 1st successful Get, record blob's full size for progress reporting
|
||||
contentLength = response.ContentLength()
|
||||
}
|
||||
},
|
||||
})
|
||||
rs := NewDownloadStream(context.Background(),
|
||||
// We pass more tha "blobUrl.GetBlob" here so we can capture the blob's full
|
||||
// content length on the very first internal call to Read.
|
||||
func(ctx context.Context, blobRange BlobRange, ac BlobAccessConditions, rangeGetContentMD5 bool) (*GetResponse, error) {
|
||||
get, err := blobURL.GetBlob(ctx, blobRange, ac, rangeGetContentMD5)
|
||||
if err == nil && contentLength == 0 {
|
||||
// If 1st successful Get, record blob's full size for progress reporting
|
||||
contentLength = get.ContentLength()
|
||||
}
|
||||
return get, err
|
||||
},
|
||||
DownloadStreamOptions{})
|
||||
|
||||
// NewResponseBodyStream wraps the GetRetryStream with progress reporting; it returns an io.ReadCloser.
|
||||
stream := pipeline.NewResponseBodyProgress(rs,
|
||||
|
|
|
@ -0,0 +1,204 @@
|
|||
package azblob_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
chk "gopkg.in/check.v1"
|
||||
|
||||
"github.com/Azure/azure-pipeline-go/pipeline"
|
||||
"github.com/Azure/azure-storage-blob-go/2016-05-31/azblob"
|
||||
)
|
||||
|
||||
type retryTestScenario int
|
||||
|
||||
const (
|
||||
// Retry until success. Max reties hit. Operation time out prevents additional retries
|
||||
retryTestScenarioRetryUntilSuccess retryTestScenario = 1
|
||||
retryTestScenarioRetryUntilOperationCancel retryTestScenario = 2
|
||||
retryTestScenarioRetryUntilMaxRetries retryTestScenario = 3
|
||||
)
|
||||
|
||||
func (s *aztestsSuite) TestRetryTestScenarioUntilSuccess(c *chk.C) {
|
||||
testRetryTestScenario(c, retryTestScenarioRetryUntilSuccess)
|
||||
}
|
||||
|
||||
func (s *aztestsSuite) TestRetryTestScenarioUntilOperationCancel(c *chk.C) {
|
||||
testRetryTestScenario(c, retryTestScenarioRetryUntilOperationCancel)
|
||||
}
|
||||
|
||||
func (s *aztestsSuite) TestRetryTestScenarioUntilMaxRetries(c *chk.C) {
|
||||
testRetryTestScenario(c, retryTestScenarioRetryUntilMaxRetries)
|
||||
}
|
||||
|
||||
func newRetryTestPolicyFactory(c *chk.C, scenario retryTestScenario, maxRetries int, cancel context.CancelFunc) *retryTestPolicyFactory {
|
||||
return &retryTestPolicyFactory{c: c, scenario: scenario, maxRetries: maxRetries, cancel: cancel}
|
||||
}
|
||||
|
||||
type retryTestPolicyFactory struct {
|
||||
c *chk.C
|
||||
scenario retryTestScenario
|
||||
maxRetries int
|
||||
cancel context.CancelFunc
|
||||
try int
|
||||
}
|
||||
|
||||
func (f *retryTestPolicyFactory) New(node pipeline.Node) pipeline.Policy {
|
||||
f.try = 0 // Reset this for each test
|
||||
return &retryTestPolicy{node: node, factory: f}
|
||||
}
|
||||
|
||||
type retryTestPolicy struct {
|
||||
node pipeline.Node
|
||||
factory *retryTestPolicyFactory
|
||||
}
|
||||
|
||||
type retryError struct {
|
||||
temporary, timeout bool
|
||||
statusCode int
|
||||
}
|
||||
|
||||
func (e *retryError) Temporary() bool { return e.temporary }
|
||||
func (e *retryError) Timeout() bool { return e.timeout }
|
||||
func (e *retryError) Error() string {
|
||||
return fmt.Sprintf("Temporary=%t, Timeout=%t", e.Temporary(), e.Timeout())
|
||||
}
|
||||
|
||||
type httpResponse struct {
|
||||
response *http.Response
|
||||
}
|
||||
|
||||
func (r *httpResponse) Response() *http.Response { return r.response }
|
||||
|
||||
func (p *retryTestPolicy) Do(ctx context.Context, request pipeline.Request) (response pipeline.Response, err error) {
|
||||
c := p.factory.c
|
||||
p.factory.try++ // Increment the try
|
||||
c.Assert(p.factory.try <= p.factory.maxRetries, chk.Equals, true) // Ensure # of tries < MaxRetries
|
||||
req := request.Request
|
||||
|
||||
// Validate the expected pre-conditions for each try
|
||||
expectedHost := "PrimaryDC"
|
||||
if p.factory.try%2 == 0 {
|
||||
if p.factory.scenario != retryTestScenarioRetryUntilSuccess || p.factory.try <= 4 {
|
||||
expectedHost = "SecondaryDC"
|
||||
}
|
||||
}
|
||||
c.Assert(req.URL.Host, chk.Equals, expectedHost) // Ensure we got the expected primary/secondary DC
|
||||
|
||||
// Ensure that any headers & query parameters this method adds (later) are removed/reset for each try
|
||||
c.Assert(req.Header.Get("TestHeader"), chk.Equals, "") // Ensure our "TestHeader" is not in the HTTP request
|
||||
values := req.URL.Query()
|
||||
c.Assert(len(values["TestQueryParam"]), chk.Equals, 0) // TestQueryParam shouldn't be in the HTTP request
|
||||
|
||||
if seeker, ok := req.Body.(io.ReadSeeker); !ok {
|
||||
c.Fail() // Body must be an io.ReadSeeker
|
||||
} else {
|
||||
pos, err := seeker.Seek(0, io.SeekCurrent)
|
||||
c.Assert(err, chk.IsNil) // Ensure that body was seekable
|
||||
c.Assert(pos, chk.Equals, int64(0)) // Ensure body seeked back to position 0
|
||||
}
|
||||
|
||||
// Add a query param & header; these not be here on teh next try
|
||||
values["TestQueryParam"] = []string{"TestQueryParamValue"}
|
||||
req.Header.Set("TestHeader", "TestValue") // Add a header this not exist with each try
|
||||
b := []byte{0}
|
||||
n, err := req.Body.Read(b)
|
||||
c.Assert(n, chk.Equals, 1) // Read failed
|
||||
|
||||
switch p.factory.scenario {
|
||||
case retryTestScenarioRetryUntilSuccess:
|
||||
switch p.factory.try {
|
||||
case 1:
|
||||
if deadline, ok := ctx.Deadline(); ok {
|
||||
time.Sleep(time.Until(deadline) + time.Second) // Let the context timeout expire
|
||||
}
|
||||
err = ctx.Err()
|
||||
case 2:
|
||||
err = &retryError{temporary: true}
|
||||
case 3:
|
||||
err = &retryError{timeout: true}
|
||||
case 4:
|
||||
response = &httpResponse{response: &http.Response{StatusCode: http.StatusNotFound}}
|
||||
case 5:
|
||||
err = &retryError{temporary: true} // These attempts all fail but we're making sure we never see the secondary DC again
|
||||
case 6:
|
||||
response = &httpResponse{response: &http.Response{StatusCode: http.StatusOK}} // Stop retries with valid response
|
||||
default:
|
||||
c.Fail() // Retries should have stopped so we shouldn't get here
|
||||
}
|
||||
case retryTestScenarioRetryUntilOperationCancel:
|
||||
switch p.factory.try {
|
||||
case 1:
|
||||
p.factory.cancel()
|
||||
err = context.Canceled
|
||||
default:
|
||||
c.Fail() // Retries should have stopped so we shouldn't get here
|
||||
}
|
||||
case retryTestScenarioRetryUntilMaxRetries:
|
||||
err = &retryError{temporary: true} // Keep retrying until maxRetries is hit
|
||||
}
|
||||
return response, err // Return the response & err
|
||||
}
|
||||
|
||||
func testRetryTestScenario(c *chk.C, scenario retryTestScenario) {
|
||||
u, _ := url.Parse("http://PrimaryDC")
|
||||
retryOptions := azblob.RetryOptions{
|
||||
Policy: azblob.RetryPolicyExponential,
|
||||
MaxTries: 6,
|
||||
TryTimeout: 2 * time.Second,
|
||||
RetryDelay: 1 * time.Second,
|
||||
MaxRetryDelay: 4 * time.Second,
|
||||
RetryReadsFromSecondaryHost: "SecondaryDC",
|
||||
}
|
||||
ctx := context.Background()
|
||||
ctx, cancel := context.WithTimeout(ctx, 64 /*2^MaxTries(6)*/ *retryOptions.TryTimeout)
|
||||
retrytestPolicyFactory := newRetryTestPolicyFactory(c, scenario, retryOptions.MaxTries, cancel)
|
||||
factories := [...]pipeline.Factory{
|
||||
retrytestPolicyFactory,
|
||||
azblob.NewRetryPolicyFactory(retryOptions),
|
||||
}
|
||||
p := pipeline.NewPipeline(factories[:], pipeline.Options{})
|
||||
request, err := pipeline.NewRequest(http.MethodGet, *u, strings.NewReader("TestData"))
|
||||
response, err := p.Do(ctx, nil, request)
|
||||
switch scenario {
|
||||
case retryTestScenarioRetryUntilSuccess:
|
||||
if err != nil || response == nil || response.Response() == nil || response.Response().StatusCode != http.StatusOK {
|
||||
c.Fail() // Operation didn't run to success
|
||||
}
|
||||
case retryTestScenarioRetryUntilMaxRetries:
|
||||
c.Assert(err, chk.NotNil) // Ensure we ended with an error
|
||||
c.Assert(response, chk.IsNil) // Ensure we ended without a valid response
|
||||
c.Assert(retrytestPolicyFactory.try, chk.Equals, retryOptions.MaxTries) // Ensure the operation end with the exact right number of tries
|
||||
case retryTestScenarioRetryUntilOperationCancel:
|
||||
c.Assert(err, chk.Equals, context.Canceled) // Ensure we ended due to cancellation
|
||||
c.Assert(response, chk.IsNil) // Ensure we ended without a valid response
|
||||
c.Assert(retrytestPolicyFactory.try <= retryOptions.MaxTries, chk.Equals, true) // Ensure we didn't end due to reaching max tries
|
||||
}
|
||||
cancel()
|
||||
}
|
||||
|
||||
/*
|
||||
Fail primary; retry should be on secondary URL - maybe do this twice
|
||||
Fail secondary; and never see primary again
|
||||
|
||||
Make sure any mutations are lost on each retry
|
||||
Make sure body is reset on each retry
|
||||
|
||||
Timeout a try; should retry (unless no more)
|
||||
timeout an operation; should not retry
|
||||
check timeout query param; should be try timeout
|
||||
|
||||
Return Temporary() = true; should retry (unless max)
|
||||
Return Timeout() true; should retry (unless max)
|
||||
|
||||
Secondary try returns 404; no more tries against secondary
|
||||
|
||||
error where Temporary() and Timeout() return false; no retry
|
||||
error where Temporary() & Timeout don't exist; no retry
|
||||
no error; no retry; return success, nil
|
||||
*/
|
Разница между файлами не показана из-за своего большого размера
Загрузить разницу
Загрузка…
Ссылка в новой задаче