This commit is contained in:
FAREAST\jiacfan 2018-08-10 16:03:42 +08:00
Родитель f336a6a5cd
Коммит c7a2d66432
1 изменённых файлов: 64 добавлений и 12 удалений

Просмотреть файл

@ -12,6 +12,8 @@ import (
"sync"
"time"
"errors"
"github.com/Azure/azure-pipeline-go/pipeline"
)
@ -65,12 +67,26 @@ func UploadBufferToBlockBlob(ctx context.Context, b []byte,
if o.BlockSize < 0 || o.BlockSize > BlockBlobMaxUploadBlobBytes {
panic(fmt.Sprintf("BlockSize option must be > 0 and <= %d", BlockBlobMaxUploadBlobBytes))
}
if o.BlockSize == 0 {
o.BlockSize = BlockBlobMaxUploadBlobBytes // Default if unspecified
}
size := int64(len(b))
if size <= BlockBlobMaxUploadBlobBytes {
bufferSize := int64(len(b))
if o.BlockSize == 0 {
// If bufferSize > (BlockBlobMaxStageBlockBytes * BlockBlobMaxBlocks), then error
if bufferSize > BlockBlobMaxStageBlockBytes*BlockBlobMaxBlocks {
return nil, errors.New("Buffer is too large to upload to a block blob")
}
// If bufferSize <= BlockBlobMaxUploadBlobBytes, then Upload should be used with just 1 I/O request
if bufferSize <= BlockBlobMaxUploadBlobBytes {
o.BlockSize = BlockBlobMaxUploadBlobBytes // Default if unspecified
} else {
o.BlockSize = bufferSize / BlockBlobMaxBlocks // buffer / max blocks = block size to use all 50,000 blocks
if o.BlockSize < BlobDefaultDownloadBlockSize { // If the block size is smaller than 4MB, round up to 4MB
o.BlockSize = BlobDefaultDownloadBlockSize
}
// StageBlock will be called with blockSize blocks and a parallelism of (BufferSize / BlockSize).
}
}
if bufferSize <= BlockBlobMaxUploadBlobBytes {
// If the size can fit in 1 Upload call, do it this way
var body io.ReadSeeker = bytes.NewReader(b)
if o.Progress != nil {
@ -79,7 +95,7 @@ func UploadBufferToBlockBlob(ctx context.Context, b []byte,
return blockBlobURL.Upload(ctx, body, o.BlobHTTPHeaders, o.Metadata, o.AccessConditions)
}
var numBlocks = uint16(((size - 1) / o.BlockSize) + 1)
var numBlocks = uint16(((bufferSize - 1) / o.BlockSize) + 1)
if numBlocks > BlockBlobMaxBlocks {
panic(fmt.Sprintf("The buffer's size is too big or the BlockSize is too small; the number of blocks must be <= %d", BlockBlobMaxBlocks))
}
@ -90,7 +106,7 @@ func UploadBufferToBlockBlob(ctx context.Context, b []byte,
err := doBatchTransfer(ctx, batchTransferOptions{
operationName: "UploadBufferToBlockBlob",
transferSize: size,
transferSize: bufferSize,
chunkSize: o.BlockSize,
parallelism: o.Parallelism,
operation: func(offset int64, count int64) error {
@ -372,7 +388,10 @@ func UploadStreamToBlockBlob(ctx context.Context, reader io.Reader, blockBlobURL
result, err := uploadStream(ctx, reader,
UploadStreamOptions{BufferSize: o.BufferSize, MaxBuffers: o.MaxBuffers},
&uploadStreamToBlockBlobOptions{b: blockBlobURL, o: o, blockIDPrefix: newUUID()})
return result.(CommonResponse), err
if err != nil {
return nil, err
}
return result.(CommonResponse), nil
}
type uploadStreamToBlockBlobOptions struct {
@ -434,7 +453,28 @@ type UploadStreamOptions struct {
BufferSize int
}
type firstErr struct {
lock sync.Mutex
finalError error
}
func (fe *firstErr) set(err error) {
fe.lock.Lock()
if fe.finalError == nil {
fe.finalError = err
}
fe.lock.Unlock()
}
func (fe *firstErr) get() (err error) {
fe.lock.Lock()
err = fe.finalError
fe.lock.Unlock()
return
}
func uploadStream(ctx context.Context, reader io.Reader, o UploadStreamOptions, t iTransfer) (interface{}, error) {
firstErr := firstErr{}
ctx, cancel := context.WithCancel(ctx) // New context so that any failure cancels everything
defer cancel()
wg := sync.WaitGroup{} // Used to know when all outgoing messages have finished processing
@ -461,9 +501,12 @@ func uploadStream(ctx context.Context, reader io.Reader, o UploadStreamOptions,
err := t.chunk(ctx, outgoingMsg.chunkNum, outgoingMsg.buffer)
wg.Done() // Indicate this buffer was sent
if nil != err {
// NOTE: finalErr could be assigned to multiple times here which is OK,
// some error will be returned.
firstErr.set(err)
cancel()
}
incoming <- outgoingMsg.buffer // The goroutine reading from the stream can use reuse this buffer now
incoming <- outgoingMsg.buffer // The goroutine reading from the stream can reuse this buffer now
}
}()
}
@ -488,7 +531,7 @@ func uploadStream(ctx context.Context, reader io.Reader, o UploadStreamOptions,
buffer = <-incoming
}
n, err := io.ReadFull(reader, buffer)
if err != nil {
if err != nil { // Less than len(buffer) bytes were read
buffer = buffer[:n] // Make slice match the # of read bytes
}
if len(buffer) > 0 {
@ -497,12 +540,21 @@ func uploadStream(ctx context.Context, reader io.Reader, o UploadStreamOptions,
outgoing <- OutgoingMsg{chunkNum: c, buffer: buffer}
}
if err != nil { // The reader is done, no more outgoing buffers
if err == io.EOF || err == io.ErrUnexpectedEOF {
err = nil // This function does NOT return an error if io.ReadFull returns io.EOF or io.ErrUnexpectedEOF
} else {
firstErr.set(err)
}
break
}
}
// NOTE: Don't close the incoming channel because the outgoing goroutines post buffers into it when they are done
close(outgoing) // Make all the outgoing goroutines terminate when this channel is empty
wg.Wait() // Wait for all pending outgoing messages to complete
// After all blocks uploaded, commit them to the blob & return the result
return t.end(ctx)
err := firstErr.get()
if err == nil {
// If no error, after all blocks uploaded, commit them to the blob & return the result
return t.end(ctx)
}
return nil, err
}