Merge branch 'master' into tiering
This commit is contained in:
Коммит
18c51cdded
|
@ -24,8 +24,8 @@ type anonymousCredentialPolicyFactory struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates a credential policy object.
|
// New creates a credential policy object.
|
||||||
func (f *anonymousCredentialPolicyFactory) New(node pipeline.Node) pipeline.Policy {
|
func (f *anonymousCredentialPolicyFactory) New(next pipeline.Policy, config *pipeline.Configuration) pipeline.Policy {
|
||||||
return &anonymousCredentialPolicy{node: node}
|
return &anonymousCredentialPolicy{next: next}
|
||||||
}
|
}
|
||||||
|
|
||||||
// credentialMarker is a package-internal method that exists just to satisfy the Credential interface.
|
// credentialMarker is a package-internal method that exists just to satisfy the Credential interface.
|
||||||
|
@ -33,11 +33,11 @@ func (*anonymousCredentialPolicyFactory) credentialMarker() {}
|
||||||
|
|
||||||
// anonymousCredentialPolicy is the credential's policy object.
|
// anonymousCredentialPolicy is the credential's policy object.
|
||||||
type anonymousCredentialPolicy struct {
|
type anonymousCredentialPolicy struct {
|
||||||
node pipeline.Node
|
next pipeline.Policy
|
||||||
}
|
}
|
||||||
|
|
||||||
// Do implements the credential's policy interface.
|
// Do implements the credential's policy interface.
|
||||||
func (p anonymousCredentialPolicy) Do(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {
|
func (p anonymousCredentialPolicy) Do(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {
|
||||||
// For anonymous credentials, this is effectively a no-op
|
// For anonymous credentials, this is effectively a no-op
|
||||||
return p.node.Do(ctx, request)
|
return p.next.Do(ctx, request)
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,8 +39,8 @@ func (f SharedKeyCredential) AccountName() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates a credential policy object.
|
// New creates a credential policy object.
|
||||||
func (f *SharedKeyCredential) New(node pipeline.Node) pipeline.Policy {
|
func (f *SharedKeyCredential) New(next pipeline.Policy, config *pipeline.Configuration) pipeline.Policy {
|
||||||
return sharedKeyCredentialPolicy{node: node, factory: f}
|
return sharedKeyCredentialPolicy{factory: f, next: next, config: config}
|
||||||
}
|
}
|
||||||
|
|
||||||
// credentialMarker is a package-internal method that exists just to satisfy the Credential interface.
|
// credentialMarker is a package-internal method that exists just to satisfy the Credential interface.
|
||||||
|
@ -48,8 +48,9 @@ func (*SharedKeyCredential) credentialMarker() {}
|
||||||
|
|
||||||
// sharedKeyCredentialPolicy is the credential's policy object.
|
// sharedKeyCredentialPolicy is the credential's policy object.
|
||||||
type sharedKeyCredentialPolicy struct {
|
type sharedKeyCredentialPolicy struct {
|
||||||
node pipeline.Node
|
|
||||||
factory *SharedKeyCredential
|
factory *SharedKeyCredential
|
||||||
|
next pipeline.Policy
|
||||||
|
config *pipeline.Configuration
|
||||||
}
|
}
|
||||||
|
|
||||||
// Do implements the credential's policy interface.
|
// Do implements the credential's policy interface.
|
||||||
|
@ -63,10 +64,10 @@ func (p sharedKeyCredentialPolicy) Do(ctx context.Context, request pipeline.Requ
|
||||||
authHeader := strings.Join([]string{"SharedKey ", p.factory.accountName, ":", signature}, "")
|
authHeader := strings.Join([]string{"SharedKey ", p.factory.accountName, ":", signature}, "")
|
||||||
request.Header[headerAuthorization] = []string{authHeader}
|
request.Header[headerAuthorization] = []string{authHeader}
|
||||||
|
|
||||||
response, err := p.node.Do(ctx, request)
|
response, err := p.next.Do(ctx, request)
|
||||||
if err != nil && response != nil && response.Response() != nil && response.Response().StatusCode == http.StatusForbidden {
|
if err != nil && response != nil && response.Response() != nil && response.Response().StatusCode == http.StatusForbidden {
|
||||||
// Service failed to authenticate request, log it
|
// Service failed to authenticate request, log it
|
||||||
p.node.Log(pipeline.LogError, "===== HTTP Forbidden status, String-to-Sign:\n"+stringToSign+"\n===============================\n")
|
p.config.Log(pipeline.LogError, "===== HTTP Forbidden status, String-to-Sign:\n"+stringToSign+"\n===============================\n")
|
||||||
}
|
}
|
||||||
return response, err
|
return response, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,8 +25,8 @@ func (f *TokenCredential) Token() string { return f.token.Load().(string) }
|
||||||
func (f *TokenCredential) SetToken(token string) { f.token.Store(token) }
|
func (f *TokenCredential) SetToken(token string) { f.token.Store(token) }
|
||||||
|
|
||||||
// New creates a credential policy object.
|
// New creates a credential policy object.
|
||||||
func (f *TokenCredential) New(node pipeline.Node) pipeline.Policy {
|
func (f *TokenCredential) New(next pipeline.Policy, config *pipeline.Configuration) pipeline.Policy {
|
||||||
return &tokenCredentialPolicy{node: node, factory: f}
|
return &tokenCredentialPolicy{factory: f, next: next}
|
||||||
}
|
}
|
||||||
|
|
||||||
// credentialMarker is a package-internal method that exists just to satisfy the Credential interface.
|
// credentialMarker is a package-internal method that exists just to satisfy the Credential interface.
|
||||||
|
@ -34,8 +34,8 @@ func (*TokenCredential) credentialMarker() {}
|
||||||
|
|
||||||
// tokenCredentialPolicy is the credential's policy object.
|
// tokenCredentialPolicy is the credential's policy object.
|
||||||
type tokenCredentialPolicy struct {
|
type tokenCredentialPolicy struct {
|
||||||
node pipeline.Node
|
|
||||||
factory *TokenCredential
|
factory *TokenCredential
|
||||||
|
next pipeline.Policy
|
||||||
}
|
}
|
||||||
|
|
||||||
// Do implements the credential's policy interface.
|
// Do implements the credential's policy interface.
|
||||||
|
@ -44,5 +44,5 @@ func (p tokenCredentialPolicy) Do(ctx context.Context, request pipeline.Request)
|
||||||
panic("Token credentials require a URL using the https protocol scheme.")
|
panic("Token credentials require a URL using the https protocol scheme.")
|
||||||
}
|
}
|
||||||
request.Header[headerAuthorization] = []string{"Bearer " + p.factory.Token()}
|
request.Header[headerAuthorization] = []string{"Bearer " + p.factory.Token()}
|
||||||
return p.node.Do(ctx, request)
|
return p.next.Do(ctx, request)
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,12 +34,13 @@ type requestLogPolicyFactory struct {
|
||||||
o RequestLogOptions
|
o RequestLogOptions
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *requestLogPolicyFactory) New(node pipeline.Node) pipeline.Policy {
|
func (f *requestLogPolicyFactory) New(next pipeline.Policy, config *pipeline.Configuration) pipeline.Policy {
|
||||||
return &requestLogPolicy{node: node, o: f.o}
|
return &requestLogPolicy{o: f.o, next: next, config: config}
|
||||||
}
|
}
|
||||||
|
|
||||||
type requestLogPolicy struct {
|
type requestLogPolicy struct {
|
||||||
node pipeline.Node
|
next pipeline.Policy
|
||||||
|
config *pipeline.Configuration
|
||||||
o RequestLogOptions
|
o RequestLogOptions
|
||||||
try int32
|
try int32
|
||||||
operationStart time.Time
|
operationStart time.Time
|
||||||
|
@ -81,75 +82,68 @@ func (p *requestLogPolicy) Do(ctx context.Context, request pipeline.Request) (re
|
||||||
}
|
}
|
||||||
|
|
||||||
// Log the outgoing request as informational
|
// Log the outgoing request as informational
|
||||||
if p.node.ShouldLog(pipeline.LogInfo) {
|
if p.config.ShouldLog(pipeline.LogInfo) {
|
||||||
b := &bytes.Buffer{}
|
b := &bytes.Buffer{}
|
||||||
fmt.Fprintf(b, "==> OUTGOING REQUEST (Try=%d)\n", p.try)
|
fmt.Fprintf(b, "==> OUTGOING REQUEST (Try=%d)\n", p.try)
|
||||||
pipeline.WriteRequest(b, prepareRequestForLogging(request))
|
pipeline.WriteRequestWithResponse(b, prepareRequestForLogging(request), nil, nil)
|
||||||
p.node.Log(pipeline.LogInfo, b.String())
|
p.config.Log(pipeline.LogInfo, b.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set the time for this particular retry operation and then Do the operation.
|
// Set the time for this particular retry operation and then Do the operation.
|
||||||
tryStart := time.Now()
|
tryStart := time.Now()
|
||||||
response, err = p.node.Do(ctx, request) // Make the request
|
response, err = p.next.Do(ctx, request) // Make the request
|
||||||
tryEnd := time.Now()
|
tryEnd := time.Now()
|
||||||
tryDuration := tryEnd.Sub(tryStart)
|
tryDuration := tryEnd.Sub(tryStart)
|
||||||
opDuration := tryEnd.Sub(p.operationStart)
|
opDuration := tryEnd.Sub(p.operationStart)
|
||||||
|
|
||||||
severity := pipeline.LogInfo // Assume success and default to informational logging
|
logLevel, forceLog := pipeline.LogInfo, false // Default logging information
|
||||||
logMsg := func(b *bytes.Buffer) {
|
|
||||||
b.WriteString("SUCCESSFUL OPERATION\n")
|
|
||||||
pipeline.WriteRequestWithResponse(b, prepareRequestForLogging(request), response.Response())
|
|
||||||
}
|
|
||||||
|
|
||||||
forceLog := false
|
|
||||||
// If the response took too long, we'll upgrade to warning.
|
// If the response took too long, we'll upgrade to warning.
|
||||||
if p.o.LogWarningIfTryOverThreshold > 0 && tryDuration > p.o.LogWarningIfTryOverThreshold {
|
if p.o.LogWarningIfTryOverThreshold > 0 && tryDuration > p.o.LogWarningIfTryOverThreshold {
|
||||||
// Log a warning if the try duration exceeded the specified threshold
|
// Log a warning if the try duration exceeded the specified threshold
|
||||||
severity = pipeline.LogWarning
|
logLevel, forceLog = pipeline.LogWarning, true
|
||||||
logMsg = func(b *bytes.Buffer) {
|
|
||||||
fmt.Fprintf(b, "SLOW OPERATION [tryDuration > %v]\n", p.o.LogWarningIfTryOverThreshold)
|
|
||||||
pipeline.WriteRequestWithResponse(b, prepareRequestForLogging(request), response.Response())
|
|
||||||
forceLog = true // For CSS (Customer Support Services), we always log these to help diagnose latency issues
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if err == nil { // We got a response from the service
|
if err == nil { // We got a response from the service
|
||||||
sc := response.Response().StatusCode
|
sc := response.Response().StatusCode
|
||||||
if ((sc >= 400 && sc <= 499) && sc != http.StatusNotFound && sc != http.StatusConflict && sc != http.StatusPreconditionFailed && sc != http.StatusRequestedRangeNotSatisfiable) || (sc >= 500 && sc <= 599) {
|
if ((sc >= 400 && sc <= 499) && sc != http.StatusNotFound && sc != http.StatusConflict && sc != http.StatusPreconditionFailed && sc != http.StatusRequestedRangeNotSatisfiable) || (sc >= 500 && sc <= 599) {
|
||||||
severity = pipeline.LogError // Promote to Error any 4xx (except those listed is an error) or any 5xx
|
logLevel, forceLog = pipeline.LogError, true // Promote to Error any 4xx (except those listed is an error) or any 5xx
|
||||||
logMsg = func(b *bytes.Buffer) {
|
|
||||||
// Write the error, the originating request and the stack
|
|
||||||
fmt.Fprintf(b, "OPERATION ERROR:\n")
|
|
||||||
pipeline.WriteRequestWithResponse(b, prepareRequestForLogging(request), response.Response())
|
|
||||||
b.Write(stack()) // For errors, we append the stack trace (an expensive operation)
|
|
||||||
forceLog = true // TODO: Do we really want this here?
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
// For other status codes, we leave the severity as is.
|
// For other status codes, we leave the level as is.
|
||||||
}
|
}
|
||||||
} else { // This error did not get an HTTP response from the service; upgrade the severity to Error
|
} else { // This error did not get an HTTP response from the service; upgrade the severity to Error
|
||||||
severity = pipeline.LogError
|
logLevel, forceLog = pipeline.LogError, true
|
||||||
logMsg = func(b *bytes.Buffer) {
|
|
||||||
// Write the error, the originating request and the stack
|
|
||||||
fmt.Fprintf(b, "NETWORK ERROR:\n%v\n", err)
|
|
||||||
pipeline.WriteRequest(b, prepareRequestForLogging(request))
|
|
||||||
b.Write(stack()) // For errors, we append the stack trace (an expensive operation)
|
|
||||||
forceLog = true
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if shouldLog := p.node.ShouldLog(severity); forceLog || shouldLog {
|
if shouldLog := p.config.ShouldLog(logLevel); forceLog || shouldLog {
|
||||||
// We're going to log this; build the string to log
|
// We're going to log this; build the string to log
|
||||||
b := &bytes.Buffer{}
|
b := &bytes.Buffer{}
|
||||||
fmt.Fprintf(b, "==> REQUEST/RESPONSE (Try=%d, TryDuration=%v, OpDuration=%v) -- ", p.try, tryDuration, opDuration)
|
slow := ""
|
||||||
logMsg(b)
|
if p.o.LogWarningIfTryOverThreshold > 0 && tryDuration > p.o.LogWarningIfTryOverThreshold {
|
||||||
|
slow = fmt.Sprintf("[SLOW >%v]", p.o.LogWarningIfTryOverThreshold)
|
||||||
|
}
|
||||||
|
fmt.Fprintf(b, "==> REQUEST/RESPONSE (Try=%d/%v%s, OpTime=%v) -- ", p.try, tryDuration, slow, opDuration)
|
||||||
|
if err != nil { // This HTTP request did not get a response from the service
|
||||||
|
fmt.Fprintf(b, "REQUEST ERROR\n")
|
||||||
|
} else {
|
||||||
|
if logLevel == pipeline.LogError {
|
||||||
|
fmt.Fprintf(b, "RESPONSE STATUS CODE ERROR\n")
|
||||||
|
} else {
|
||||||
|
fmt.Fprintf(b, "RESPONSE SUCCESSFULLY RECEIVED\n")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pipeline.WriteRequestWithResponse(b, prepareRequestForLogging(request), response.Response(), err)
|
||||||
|
if logLevel <= pipeline.LogError {
|
||||||
|
b.Write(stack()) // For errors (or lower levels), we append the stack trace (an expensive operation)
|
||||||
|
}
|
||||||
msg := b.String()
|
msg := b.String()
|
||||||
|
|
||||||
if forceLog {
|
if forceLog {
|
||||||
pipeline.ForceLog(severity, msg)
|
pipeline.ForceLog(logLevel, msg)
|
||||||
}
|
}
|
||||||
if shouldLog {
|
if shouldLog {
|
||||||
p.node.Log(severity, msg)
|
p.config.Log(logLevel, msg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return response, err
|
return response, err
|
||||||
|
|
|
@ -57,6 +57,18 @@ type RetryOptions struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (o RetryOptions) defaults() RetryOptions {
|
func (o RetryOptions) defaults() RetryOptions {
|
||||||
|
if o.Policy != RetryPolicyExponential && o.Policy != RetryPolicyFixed {
|
||||||
|
panic(errors.New("RetryPolicy must be RetryPolicyExponential or RetryPolicyFixed"))
|
||||||
|
}
|
||||||
|
if o.MaxTries < 0 {
|
||||||
|
panic("MaxTries must be >= 0")
|
||||||
|
}
|
||||||
|
if o.TryTimeout < 0 || o.RetryDelay < 0 || o.MaxRetryDelay < 0 {
|
||||||
|
panic("TryTimeout, RetryDelay, and MaxRetryDelay must all be >= 0")
|
||||||
|
}
|
||||||
|
if o.RetryDelay > o.MaxRetryDelay {
|
||||||
|
panic("RetryDelay must be <= MaxRetryDelay")
|
||||||
|
}
|
||||||
if (o.RetryDelay == 0 && o.MaxRetryDelay != 0) || (o.RetryDelay != 0 && o.MaxRetryDelay == 0) {
|
if (o.RetryDelay == 0 && o.MaxRetryDelay != 0) || (o.RetryDelay != 0 && o.MaxRetryDelay == 0) {
|
||||||
panic(errors.New("Both RetryDelay and MaxRetryDelay must be 0 or neither can be 0"))
|
panic(errors.New("Both RetryDelay and MaxRetryDelay must be 0 or neither can be 0"))
|
||||||
}
|
}
|
||||||
|
@ -122,19 +134,20 @@ type retryPolicyFactory struct {
|
||||||
o RetryOptions
|
o RetryOptions
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *retryPolicyFactory) New(node pipeline.Node) pipeline.Policy {
|
func (f *retryPolicyFactory) New(next pipeline.Policy, config *pipeline.Configuration) pipeline.Policy {
|
||||||
return &retryPolicy{node: node, o: f.o}
|
return &retryPolicy{o: f.o, next: next}
|
||||||
}
|
}
|
||||||
|
|
||||||
type retryPolicy struct {
|
type retryPolicy struct {
|
||||||
node pipeline.Node
|
next pipeline.Policy
|
||||||
o RetryOptions
|
o RetryOptions
|
||||||
}
|
}
|
||||||
|
|
||||||
// According to https://github.com/golang/go/wiki/CompilerOptimizations, the compiler will inline this method and hopefully optimize all calls to it away
|
// According to https://github.com/golang/go/wiki/CompilerOptimizations, the compiler will inline this method and hopefully optimize all calls to it away
|
||||||
var logf = func(format string, a ...interface{}) {}
|
var logf = func(format string, a ...interface{}) {}
|
||||||
|
|
||||||
//var logf = fmt.Printf // Use this version to see the retry method's code path (import "fmt")
|
// Use this version to see the retry method's code path (import "fmt")
|
||||||
|
//var logf = fmt.Printf
|
||||||
|
|
||||||
func (p *retryPolicy) Do(ctx context.Context, request pipeline.Request) (response pipeline.Response, err error) {
|
func (p *retryPolicy) Do(ctx context.Context, request pipeline.Request) (response pipeline.Response, err error) {
|
||||||
// Before each try, we'll select either the primary or secondary URL.
|
// Before each try, we'll select either the primary or secondary URL.
|
||||||
|
@ -203,7 +216,7 @@ func (p *retryPolicy) Do(ctx context.Context, request pipeline.Request) (respons
|
||||||
|
|
||||||
// Set the time for this particular retry operation and then Do the operation.
|
// Set the time for this particular retry operation and then Do the operation.
|
||||||
tryCtx, tryCancel := context.WithTimeout(ctx, time.Second*time.Duration(timeout))
|
tryCtx, tryCancel := context.WithTimeout(ctx, time.Second*time.Duration(timeout))
|
||||||
response, err = p.node.Do(tryCtx, requestCopy) // Make the request
|
response, err = p.next.Do(tryCtx, requestCopy) // Make the request
|
||||||
logf("Err=%v, response=%v\n", err, response)
|
logf("Err=%v, response=%v\n", err, response)
|
||||||
|
|
||||||
action := "" // This MUST get changed within the switch code below
|
action := "" // This MUST get changed within the switch code below
|
||||||
|
|
|
@ -35,19 +35,19 @@ type telemetryPolicyFactory struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates a telemetryPolicy object.
|
// New creates a telemetryPolicy object.
|
||||||
func (f *telemetryPolicyFactory) New(node pipeline.Node) pipeline.Policy {
|
func (f *telemetryPolicyFactory) New(next pipeline.Policy, config *pipeline.Configuration) pipeline.Policy {
|
||||||
return &telemetryPolicy{node: node, factory: f}
|
return &telemetryPolicy{factory: f, next: next}
|
||||||
}
|
}
|
||||||
|
|
||||||
// telemetryPolicy ...
|
// telemetryPolicy ...
|
||||||
type telemetryPolicy struct {
|
type telemetryPolicy struct {
|
||||||
node pipeline.Node
|
|
||||||
factory *telemetryPolicyFactory
|
factory *telemetryPolicyFactory
|
||||||
|
next pipeline.Policy
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *telemetryPolicy) Do(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {
|
func (p *telemetryPolicy) Do(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {
|
||||||
request.Header.Set("User-Agent", p.factory.telemetryValue)
|
request.Header.Set("User-Agent", p.factory.telemetryValue)
|
||||||
return p.node.Do(ctx, request)
|
return p.next.Do(ctx, request)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NOTE: the ONLY function that should read OR write to this variable is platformInfo
|
// NOTE: the ONLY function that should read OR write to this variable is platformInfo
|
||||||
|
|
|
@ -22,13 +22,13 @@ type uniqueRequestIDPolicyFactory struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates a UniqueRequestIDPolicy object.
|
// New creates a UniqueRequestIDPolicy object.
|
||||||
func (f *uniqueRequestIDPolicyFactory) New(node pipeline.Node) pipeline.Policy {
|
func (f *uniqueRequestIDPolicyFactory) New(next pipeline.Policy, config *pipeline.Configuration) pipeline.Policy {
|
||||||
return &uniqueRequestIDPolicy{node: node}
|
return &uniqueRequestIDPolicy{next: next}
|
||||||
}
|
}
|
||||||
|
|
||||||
// UniqueRequestIDPolicy ...
|
// UniqueRequestIDPolicy ...
|
||||||
type uniqueRequestIDPolicy struct {
|
type uniqueRequestIDPolicy struct {
|
||||||
node pipeline.Node
|
next pipeline.Policy
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *uniqueRequestIDPolicy) Do(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {
|
func (p *uniqueRequestIDPolicy) Do(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {
|
||||||
|
@ -36,7 +36,7 @@ func (p *uniqueRequestIDPolicy) Do(ctx context.Context, request pipeline.Request
|
||||||
if id == "" { // Add a unique request ID if the caller didn't specify one already
|
if id == "" { // Add a unique request ID if the caller didn't specify one already
|
||||||
request.Header.Set(xMsClientRequestID, newUUID().String())
|
request.Header.Set(xMsClientRequestID, newUUID().String())
|
||||||
}
|
}
|
||||||
return p.node.Do(ctx, request)
|
return p.next.Do(ctx, request)
|
||||||
}
|
}
|
||||||
|
|
||||||
// The UUID reserved variants.
|
// The UUID reserved variants.
|
||||||
|
|
|
@ -69,7 +69,7 @@ func (e *storageError) Error() string {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
req := pipeline.Request{Request: e.response.Request}.Copy() // Make a copy of the response's request
|
req := pipeline.Request{Request: e.response.Request}.Copy() // Make a copy of the response's request
|
||||||
pipeline.WriteRequestWithResponse(b, prepareRequestForLogging(req), e.response)
|
pipeline.WriteRequestWithResponse(b, prepareRequestForLogging(req), e.response, nil)
|
||||||
return e.ErrorNode.Error(b.String())
|
return e.ErrorNode.Error(b.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -139,11 +139,11 @@ func ExampleNewPipeline() {
|
||||||
|
|
||||||
// Set LogOptions to control what & where all pipeline log events go
|
// Set LogOptions to control what & where all pipeline log events go
|
||||||
Log: pipeline.LogOptions{
|
Log: pipeline.LogOptions{
|
||||||
Log: func(s pipeline.LogSeverity, m string) { // This func is called to log each event
|
Log: func(s pipeline.LogLevel, m string) { // This func is called to log each event
|
||||||
// This method is not called for filtered-out severities.
|
// This method is not called for filtered-out severities.
|
||||||
logger.Output(2, m) // This example uses Go's standard logger
|
logger.Output(2, m) // This example uses Go's standard logger
|
||||||
},
|
},
|
||||||
MinimumSeverityToLog: func() pipeline.LogSeverity { return pipeline.LogInfo }, // Log all events from informational to more severe
|
MinimumLevelToLog: func() pipeline.LogLevel { return pipeline.LogInfo }, // Log all events from informational to more severe
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -299,7 +299,7 @@ func ExampleAccountSASSignatureValues() {
|
||||||
|
|
||||||
// If you have a SAS query parameter string, you can parse it into its parts:
|
// If you have a SAS query parameter string, you can parse it into its parts:
|
||||||
values, _ := url.ParseQuery(qp)
|
values, _ := url.ParseQuery(qp)
|
||||||
sasQueryParams = NewSASQueryParameters(values)
|
sasQueryParams = NewSASQueryParameters(values, true)
|
||||||
fmt.Printf("SAS expiry time=%v", sasQueryParams.ExpiryTime)
|
fmt.Printf("SAS expiry time=%v", sasQueryParams.ExpiryTime)
|
||||||
|
|
||||||
_ = serviceURL // Avoid compiler's "declared and not used" error
|
_ = serviceURL // Avoid compiler's "declared and not used" error
|
||||||
|
@ -348,7 +348,7 @@ func ExampleBlobSASSignatureValues() {
|
||||||
|
|
||||||
// If you have a SAS query parameter string, you can parse it into its parts:
|
// If you have a SAS query parameter string, you can parse it into its parts:
|
||||||
values, _ := url.ParseQuery(qp)
|
values, _ := url.ParseQuery(qp)
|
||||||
sasQueryParams = NewSASQueryParameters(values)
|
sasQueryParams = NewSASQueryParameters(values, true)
|
||||||
fmt.Printf("SAS expiry time=%v", sasQueryParams.ExpiryTime)
|
fmt.Printf("SAS expiry time=%v", sasQueryParams.ExpiryTime)
|
||||||
|
|
||||||
_ = blobURL // Avoid compiler's "declared and not used" error
|
_ = blobURL // Avoid compiler's "declared and not used" error
|
||||||
|
|
|
@ -31,11 +31,9 @@ func (s *aztestsSuite) TestRetryTestScenarioUntilSuccess(c *chk.C) {
|
||||||
func (s *aztestsSuite) TestRetryTestScenarioUntilOperationCancel(c *chk.C) {
|
func (s *aztestsSuite) TestRetryTestScenarioUntilOperationCancel(c *chk.C) {
|
||||||
testRetryTestScenario(c, retryTestScenarioRetryUntilOperationCancel)
|
testRetryTestScenario(c, retryTestScenarioRetryUntilOperationCancel)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *aztestsSuite) TestRetryTestScenarioUntilMaxRetries(c *chk.C) {
|
func (s *aztestsSuite) TestRetryTestScenarioUntilMaxRetries(c *chk.C) {
|
||||||
testRetryTestScenario(c, retryTestScenarioRetryUntilMaxRetries)
|
testRetryTestScenario(c, retryTestScenarioRetryUntilMaxRetries)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newRetryTestPolicyFactory(c *chk.C, scenario retryTestScenario, maxRetries int32, cancel context.CancelFunc) *retryTestPolicyFactory {
|
func newRetryTestPolicyFactory(c *chk.C, scenario retryTestScenario, maxRetries int32, cancel context.CancelFunc) *retryTestPolicyFactory {
|
||||||
return &retryTestPolicyFactory{c: c, scenario: scenario, maxRetries: maxRetries, cancel: cancel}
|
return &retryTestPolicyFactory{c: c, scenario: scenario, maxRetries: maxRetries, cancel: cancel}
|
||||||
}
|
}
|
||||||
|
@ -48,13 +46,13 @@ type retryTestPolicyFactory struct {
|
||||||
try int32
|
try int32
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *retryTestPolicyFactory) New(node pipeline.Node) pipeline.Policy {
|
func (f *retryTestPolicyFactory) New(next pipeline.Policy, config *pipeline.Configuration) pipeline.Policy {
|
||||||
f.try = 0 // Reset this for each test
|
f.try = 0 // Reset this for each test
|
||||||
return &retryTestPolicy{node: node, factory: f}
|
return &retryTestPolicy{factory: f, next: next}
|
||||||
}
|
}
|
||||||
|
|
||||||
type retryTestPolicy struct {
|
type retryTestPolicy struct {
|
||||||
node pipeline.Node
|
next pipeline.Policy
|
||||||
factory *retryTestPolicyFactory
|
factory *retryTestPolicyFactory
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -158,8 +156,8 @@ func testRetryTestScenario(c *chk.C, scenario retryTestScenario) {
|
||||||
ctx, cancel := context.WithTimeout(ctx, 64 /*2^MaxTries(6)*/ *retryOptions.TryTimeout)
|
ctx, cancel := context.WithTimeout(ctx, 64 /*2^MaxTries(6)*/ *retryOptions.TryTimeout)
|
||||||
retrytestPolicyFactory := newRetryTestPolicyFactory(c, scenario, retryOptions.MaxTries, cancel)
|
retrytestPolicyFactory := newRetryTestPolicyFactory(c, scenario, retryOptions.MaxTries, cancel)
|
||||||
factories := [...]pipeline.Factory{
|
factories := [...]pipeline.Factory{
|
||||||
retrytestPolicyFactory,
|
|
||||||
azblob.NewRetryPolicyFactory(retryOptions),
|
azblob.NewRetryPolicyFactory(retryOptions),
|
||||||
|
retrytestPolicyFactory,
|
||||||
}
|
}
|
||||||
p := pipeline.NewPipeline(factories[:], pipeline.Options{})
|
p := pipeline.NewPipeline(factories[:], pipeline.Options{})
|
||||||
request, err := pipeline.NewRequest(http.MethodGet, *u, strings.NewReader("TestData"))
|
request, err := pipeline.NewRequest(http.MethodGet, *u, strings.NewReader("TestData"))
|
||||||
|
@ -172,7 +170,7 @@ func testRetryTestScenario(c *chk.C, scenario retryTestScenario) {
|
||||||
case retryTestScenarioRetryUntilMaxRetries:
|
case retryTestScenarioRetryUntilMaxRetries:
|
||||||
c.Assert(err, chk.NotNil) // Ensure we ended with an error
|
c.Assert(err, chk.NotNil) // Ensure we ended with an error
|
||||||
c.Assert(response, chk.IsNil) // Ensure we ended without a valid response
|
c.Assert(response, chk.IsNil) // Ensure we ended without a valid response
|
||||||
c.Assert(retrytestPolicyFactory.try, chk.Equals, retryOptions.MaxTries) // Ensure the operation end with the exact right number of tries
|
c.Assert(retrytestPolicyFactory.try, chk.Equals, retryOptions.MaxTries) // Ensure the operation ends with the exact right number of tries
|
||||||
case retryTestScenarioRetryUntilOperationCancel:
|
case retryTestScenarioRetryUntilOperationCancel:
|
||||||
c.Assert(err, chk.Equals, context.Canceled) // Ensure we ended due to cancellation
|
c.Assert(err, chk.Equals, context.Canceled) // Ensure we ended due to cancellation
|
||||||
c.Assert(response, chk.IsNil) // Ensure we ended without a valid response
|
c.Assert(response, chk.IsNil) // Ensure we ended without a valid response
|
||||||
|
|
|
@ -6,8 +6,9 @@ package azblob
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/xml"
|
"encoding/xml"
|
||||||
"github.com/Azure/azure-pipeline-go/pipeline"
|
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
|
||||||
|
"github.com/Azure/azure-pipeline-go/pipeline"
|
||||||
)
|
)
|
||||||
|
|
||||||
type responder func(resp pipeline.Response) (result pipeline.Response, err error)
|
type responder func(resp pipeline.Response) (result pipeline.Response, err error)
|
||||||
|
@ -18,18 +19,18 @@ type responderPolicyFactory struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates a responder policy factory.
|
// New creates a responder policy factory.
|
||||||
func (arpf responderPolicyFactory) New(node pipeline.Node) pipeline.Policy {
|
func (arpf responderPolicyFactory) New(next pipeline.Policy, config *pipeline.Configuration) pipeline.Policy {
|
||||||
return responderPolicy{node: node, responder: arpf.responder}
|
return responderPolicy{next: next, responder: arpf.responder}
|
||||||
}
|
}
|
||||||
|
|
||||||
type responderPolicy struct {
|
type responderPolicy struct {
|
||||||
node pipeline.Node
|
next pipeline.Policy
|
||||||
responder responder
|
responder responder
|
||||||
}
|
}
|
||||||
|
|
||||||
// Do sends the request to the service and validates/deserializes the HTTP response.
|
// Do sends the request to the service and validates/deserializes the HTTP response.
|
||||||
func (arp responderPolicy) Do(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {
|
func (arp responderPolicy) Do(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {
|
||||||
resp, err := arp.node.Do(ctx, request)
|
resp, err := arp.next.Do(ctx, request)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return resp, err
|
return resp, err
|
||||||
}
|
}
|
||||||
|
|
Загрузка…
Ссылка в новой задаче