From 623d6d9d5dc5cfb2ba895ad389db837531a36666 Mon Sep 17 00:00:00 2001 From: Narasimha Kulkarni <63087328+nakulkar-msft@users.noreply.github.com> Date: Tue, 1 Feb 2022 11:58:25 +0530 Subject: [PATCH] Use chunked writer --- azblob/highlevel.go | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/azblob/highlevel.go b/azblob/highlevel.go index 7d5a13b..f9c36ae 100644 --- a/azblob/highlevel.go +++ b/azblob/highlevel.go @@ -15,6 +15,7 @@ import ( "errors" "github.com/Azure/azure-pipeline-go/pipeline" + "github.com/Azure/azure-storage-azcopy/v10/common" ) // CommonResponse returns the headers common to all blob REST API responses. @@ -109,7 +110,7 @@ func uploadReaderAtToBlockBlob(ctx context.Context, reader io.ReaderAt, readerSi TransferSize: readerSize, ChunkSize: o.BlockSize, Parallelism: o.Parallelism, - Operation: func(offset int64, count int64, ctx context.Context) error { + Operation: func(offset int64, count int64, _ int64, ctx context.Context) error { // This function is called once per block. // It is passed this block's offset within the buffer and its count of bytes // Prepare to read the proper block/section of the buffer @@ -186,10 +187,17 @@ type DownloadFromBlobOptions struct { // downloadBlobToWriterAt downloads an Azure blob to a buffer with parallel. func downloadBlobToWriterAt(ctx context.Context, blobURL BlobURL, offset int64, count int64, - writer io.WriterAt, o DownloadFromBlobOptions, initialDownloadResponse *DownloadResponse) error { + file *os.File, o DownloadFromBlobOptions, initialDownloadResponse *DownloadResponse) error { + if o.BlockSize == 0 { o.BlockSize = BlobDefaultDownloadBlockSize } + numChunks := uint32(((count - 1) / o.BlockSize) + 1) + cfw := NewChunkedFileWriter(ctx, + common.NewMultiSizeSlicePool(common.MaxBlockBlobBlockSize), + common.NewCacheLimiter(4 * 1024 * 1024 * 1024), nil, + file, numChunks, 5, + common.EHashValidationOption.NoCheck(), false) if count == CountToEnd { // If size not specified, calculate it if initialDownloadResponse != nil { @@ -218,7 +226,7 @@ func downloadBlobToWriterAt(ctx context.Context, blobURL BlobURL, offset int64, TransferSize: count, ChunkSize: o.BlockSize, Parallelism: o.Parallelism, - Operation: func(chunkStart int64, count int64, ctx context.Context) error { + Operation: func(chunkStart int64, count int64, chunkIdx int64, ctx context.Context) error { dr, err := blobURL.Download(ctx, chunkStart+offset, count, o.AccessConditions, false, o.ClientProvidedKeyOptions) if err != nil { return err @@ -237,11 +245,16 @@ func downloadBlobToWriterAt(ctx context.Context, blobURL BlobURL, offset int64, progressLock.Unlock() }) } - _, err = io.Copy(newSectionWriter(writer, chunkStart, count), body) + //_, err = io.Copy(newSectionWriter(writer, chunkStart, count), body) + id := common.NewChunkID(file.Name(), chunkStart + offset, count) + cfw.WaitToScheduleChunk(ctx, id, count) + cfw.EnqueueChunk(ctx, id, count, body, true) body.Close() return err }, }) + + cfw.Flush(ctx) if err != nil { return err } @@ -299,7 +312,7 @@ type BatchTransferOptions struct { TransferSize int64 ChunkSize int64 Parallelism uint16 - Operation func(offset int64, chunkSize int64, ctx context.Context) error + Operation func(offset int64, chunkSize int64, chunkIdx int64, ctx context.Context) error OperationName string } @@ -342,7 +355,7 @@ func DoBatchTransfer(ctx context.Context, o BatchTransferOptions) error { offset := int64(chunkNum) * o.ChunkSize operationChannel <- func() error { - return o.Operation(offset, curChunkSize, ctx) + return o.Operation(offset, curChunkSize, int64(chunkNum), ctx) } } close(operationChannel)