This commit is contained in:
Narasimha Kulkarni 2022-02-01 11:58:25 +05:30
Родитель 3d50a5f3aa
Коммит 623d6d9d5d
1 изменённых файлов: 19 добавлений и 6 удалений

Просмотреть файл

@ -15,6 +15,7 @@ import (
"errors"
"github.com/Azure/azure-pipeline-go/pipeline"
"github.com/Azure/azure-storage-azcopy/v10/common"
)
// CommonResponse returns the headers common to all blob REST API responses.
@ -109,7 +110,7 @@ func uploadReaderAtToBlockBlob(ctx context.Context, reader io.ReaderAt, readerSi
TransferSize: readerSize,
ChunkSize: o.BlockSize,
Parallelism: o.Parallelism,
Operation: func(offset int64, count int64, ctx context.Context) error {
Operation: func(offset int64, count int64, _ int64, ctx context.Context) error {
// This function is called once per block.
// It is passed this block's offset within the buffer and its count of bytes
// Prepare to read the proper block/section of the buffer
@ -186,10 +187,17 @@ type DownloadFromBlobOptions struct {
// downloadBlobToWriterAt downloads an Azure blob to a buffer with parallel.
func downloadBlobToWriterAt(ctx context.Context, blobURL BlobURL, offset int64, count int64,
writer io.WriterAt, o DownloadFromBlobOptions, initialDownloadResponse *DownloadResponse) error {
file *os.File, o DownloadFromBlobOptions, initialDownloadResponse *DownloadResponse) error {
if o.BlockSize == 0 {
o.BlockSize = BlobDefaultDownloadBlockSize
}
numChunks := uint32(((count - 1) / o.BlockSize) + 1)
cfw := NewChunkedFileWriter(ctx,
common.NewMultiSizeSlicePool(common.MaxBlockBlobBlockSize),
common.NewCacheLimiter(4 * 1024 * 1024 * 1024), nil,
file, numChunks, 5,
common.EHashValidationOption.NoCheck(), false)
if count == CountToEnd { // If size not specified, calculate it
if initialDownloadResponse != nil {
@ -218,7 +226,7 @@ func downloadBlobToWriterAt(ctx context.Context, blobURL BlobURL, offset int64,
TransferSize: count,
ChunkSize: o.BlockSize,
Parallelism: o.Parallelism,
Operation: func(chunkStart int64, count int64, ctx context.Context) error {
Operation: func(chunkStart int64, count int64, chunkIdx int64, ctx context.Context) error {
dr, err := blobURL.Download(ctx, chunkStart+offset, count, o.AccessConditions, false, o.ClientProvidedKeyOptions)
if err != nil {
return err
@ -237,11 +245,16 @@ func downloadBlobToWriterAt(ctx context.Context, blobURL BlobURL, offset int64,
progressLock.Unlock()
})
}
_, err = io.Copy(newSectionWriter(writer, chunkStart, count), body)
//_, err = io.Copy(newSectionWriter(writer, chunkStart, count), body)
id := common.NewChunkID(file.Name(), chunkStart + offset, count)
cfw.WaitToScheduleChunk(ctx, id, count)
cfw.EnqueueChunk(ctx, id, count, body, true)
body.Close()
return err
},
})
cfw.Flush(ctx)
if err != nil {
return err
}
@ -299,7 +312,7 @@ type BatchTransferOptions struct {
TransferSize int64
ChunkSize int64
Parallelism uint16
Operation func(offset int64, chunkSize int64, ctx context.Context) error
Operation func(offset int64, chunkSize int64, chunkIdx int64, ctx context.Context) error
OperationName string
}
@ -342,7 +355,7 @@ func DoBatchTransfer(ctx context.Context, o BatchTransferOptions) error {
offset := int64(chunkNum) * o.ChunkSize
operationChannel <- func() error {
return o.Operation(offset, curChunkSize, ctx)
return o.Operation(offset, curChunkSize, int64(chunkNum), ctx)
}
}
close(operationChannel)