diff --git a/azblob/highlevel.go b/azblob/highlevel.go index 7d5a13b..f9c36ae 100644 --- a/azblob/highlevel.go +++ b/azblob/highlevel.go @@ -15,6 +15,7 @@ import ( "errors" "github.com/Azure/azure-pipeline-go/pipeline" + "github.com/Azure/azure-storage-azcopy/v10/common" ) // CommonResponse returns the headers common to all blob REST API responses. @@ -109,7 +110,7 @@ func uploadReaderAtToBlockBlob(ctx context.Context, reader io.ReaderAt, readerSi TransferSize: readerSize, ChunkSize: o.BlockSize, Parallelism: o.Parallelism, - Operation: func(offset int64, count int64, ctx context.Context) error { + Operation: func(offset int64, count int64, _ int64, ctx context.Context) error { // This function is called once per block. // It is passed this block's offset within the buffer and its count of bytes // Prepare to read the proper block/section of the buffer @@ -186,10 +187,17 @@ type DownloadFromBlobOptions struct { // downloadBlobToWriterAt downloads an Azure blob to a buffer with parallel. func downloadBlobToWriterAt(ctx context.Context, blobURL BlobURL, offset int64, count int64, - writer io.WriterAt, o DownloadFromBlobOptions, initialDownloadResponse *DownloadResponse) error { + file *os.File, o DownloadFromBlobOptions, initialDownloadResponse *DownloadResponse) error { + if o.BlockSize == 0 { o.BlockSize = BlobDefaultDownloadBlockSize } + numChunks := uint32(((count - 1) / o.BlockSize) + 1) + cfw := NewChunkedFileWriter(ctx, + common.NewMultiSizeSlicePool(common.MaxBlockBlobBlockSize), + common.NewCacheLimiter(4 * 1024 * 1024 * 1024), nil, + file, numChunks, 5, + common.EHashValidationOption.NoCheck(), false) if count == CountToEnd { // If size not specified, calculate it if initialDownloadResponse != nil { @@ -218,7 +226,7 @@ func downloadBlobToWriterAt(ctx context.Context, blobURL BlobURL, offset int64, TransferSize: count, ChunkSize: o.BlockSize, Parallelism: o.Parallelism, - Operation: func(chunkStart int64, count int64, ctx context.Context) error { + Operation: func(chunkStart int64, count int64, chunkIdx int64, ctx context.Context) error { dr, err := blobURL.Download(ctx, chunkStart+offset, count, o.AccessConditions, false, o.ClientProvidedKeyOptions) if err != nil { return err @@ -237,11 +245,16 @@ func downloadBlobToWriterAt(ctx context.Context, blobURL BlobURL, offset int64, progressLock.Unlock() }) } - _, err = io.Copy(newSectionWriter(writer, chunkStart, count), body) + //_, err = io.Copy(newSectionWriter(writer, chunkStart, count), body) + id := common.NewChunkID(file.Name(), chunkStart + offset, count) + cfw.WaitToScheduleChunk(ctx, id, count) + cfw.EnqueueChunk(ctx, id, count, body, true) body.Close() return err }, }) + + cfw.Flush(ctx) if err != nil { return err } @@ -299,7 +312,7 @@ type BatchTransferOptions struct { TransferSize int64 ChunkSize int64 Parallelism uint16 - Operation func(offset int64, chunkSize int64, ctx context.Context) error + Operation func(offset int64, chunkSize int64, chunkIdx int64, ctx context.Context) error OperationName string } @@ -342,7 +355,7 @@ func DoBatchTransfer(ctx context.Context, o BatchTransferOptions) error { offset := int64(chunkNum) * o.ChunkSize operationChannel <- func() error { - return o.Operation(offset, curChunkSize, ctx) + return o.Operation(offset, curChunkSize, int64(chunkNum), ctx) } } close(operationChannel)