putter.tryPut(): new method, extracted from `putter.Close()`

This makes the error handling simpler, while simultaneously fixing
some problems:

* `checkClosed()` did not work correctly, because it took its `err`
  parameter as a value rather than a reference.

* The use of `defer` in a loop means that the defers are not run until
  the end of the function, delaying the closure of up to five response
  bodies along with the corresponding error handling.

* A failure to close the response body was treated as an unretryable
  error, which was probably too strict.

* If all 5 retry attempts had failed, the old code would have
  needlessly fallen through to the MD5 sum checking code. I think this
  would have still ended up reported an error (?), but it's not
  obvious that it would always report the original error.
This commit is contained in:
Michael Haggerty 2021-09-06 16:41:25 +02:00
Родитель 179bf0bc0e
Коммит f36aee13dd
1 изменённых файлов: 74 добавлений и 52 удалений

126
putter.go
Просмотреть файл

@ -235,7 +235,7 @@ func (p *putter) putPart(part *part) error {
return nil
}
func (p *putter) Close() (err error) {
func (p *putter) Close() error {
if p.closed {
p.abort()
return syscall.EINVAL
@ -262,62 +262,23 @@ func (p *putter) Close() (err error) {
body, err := xml.Marshal(p.xml)
if err != nil {
p.abort()
return
return err
}
for retries := 0; retries < 5; retries++ {
b := bytes.NewReader(body)
v := url.Values{}
v.Set("uploadId", p.UploadID)
var resp *http.Response
resp, err = p.retryRequest("POST", p.url.String()+"?"+v.Encode(), b, nil)
if err != nil {
// If the connection got closed (firwall, proxy, etc.)
// we should also retry, just like if we'd had a 500.
if err == io.ErrUnexpectedEOF {
continue
}
p.abort()
return
}
if resp.StatusCode != 200 {
p.abort()
return newRespError(resp)
}
defer checkClose(resp.Body, err)
// S3 will return an error under a 200 as well. Instead of the
// CompleteMultipartUploadResult that we expect below, we might be
// getting an Error, e.g. with InternalError under it. We should behave
// in that case as though we received a 500 and try again.
p.Code = ""
// Parse etag from body of response
err = xml.NewDecoder(resp.Body).Decode(p)
if err != nil {
// The decoder unfortunately returns string error
// instead of specific errors.
if err.Error() == "unexpected EOF" {
continue
}
return
attemptsLeft := 5
for {
retryable, err := p.tryPut(body)
if err == nil {
// Success!
break
}
// This is what S3 returns instead of a 500 when we should try
// to complete the multipart upload again
if p.Code == "InternalError" {
continue
attemptsLeft--
if !retryable || attemptsLeft == 0 {
return err
}
// Some other generic error
if p.Code != "" {
return fmt.Errorf("CompleteMultipartUpload error: %s", p.Code)
}
break
}
// Check md5 hash of concatenated part md5 hashes against ETag
// more info: https://forums.aws.amazon.com/thread.jspa?messageID=456442&#456442
calculatedMd5ofParts := fmt.Sprintf("%x", p.md5OfParts.Sum(nil))
@ -341,7 +302,68 @@ func (p *putter) Close() (err error) {
}
}
}
return
return nil
}
// tryPut makes one attempt at putting `body` via `p`. Return:
//
// * `false, nil` on success;
// * `true, err` if there was a retryable error;
// * `false, err` if there was an unretryable error.
func (p *putter) tryPut(body []byte) (bool, error) {
b := bytes.NewReader(body)
v := url.Values{}
v.Set("uploadId", p.UploadID)
resp, err := p.retryRequest("POST", p.url.String()+"?"+v.Encode(), b, nil)
if err != nil {
// If the connection got closed (firwall, proxy, etc.)
// we should also retry, just like if we'd had a 500.
if err == io.ErrUnexpectedEOF {
return true, err
}
p.abort()
return false, err
}
if resp.StatusCode != 200 {
p.abort()
return false, newRespError(resp)
}
// S3 will return an error under a 200 as well. Instead of the
// CompleteMultipartUploadResult that we expect below, we might be
// getting an Error, e.g. with InternalError under it. We should behave
// in that case as though we received a 500 and try again.
p.Code = ""
// Parse etag from body of response
err = xml.NewDecoder(resp.Body).Decode(p)
closeErr := resp.Body.Close()
if err != nil {
// The decoder unfortunately returns string error
// instead of specific errors.
if err.Error() == "unexpected EOF" {
return true, err
}
return false, err
}
if closeErr != nil {
return true, closeErr
}
// This is what S3 returns instead of a 500 when we should try
// to complete the multipart upload again
if p.Code == "InternalError" {
return true, errors.New("S3 internal error")
}
// Some other generic error
if p.Code != "" {
return false, fmt.Errorf("CompleteMultipartUpload error: %s", p.Code)
}
return false, nil
}
// Try to abort multipart upload. Do not error on failure.