From 773dca2b4c4e951117fcdc3d7d4e8d1c83f92f81 Mon Sep 17 00:00:00 2001 From: Eldar Akchurin Date: Wed, 7 Sep 2016 15:46:20 +0200 Subject: [PATCH] First version of GPU prefetch --- Makefile | 1 + Source/CNTKv2LibraryDll/MinibatchSource.cpp | 2 +- Source/Math/DataTransferer.cpp | 17 ++ Source/Math/DataTransferer.h | 68 ++++++ Source/Math/GPUDataTransferer.cpp | 164 ++++++++++--- Source/Math/GPUDataTransferer.h | 83 ++++++- Source/Math/GPUMatrix.cu | 20 +- Source/Math/GPUMatrix.h | 4 +- Source/Math/GPUSparseMatrix.cu | 35 ++- Source/Math/GPUSparseMatrix.h | 2 +- Source/Math/Math.vcxproj | 6 +- Source/Math/Math.vcxproj.filters | 15 +- Source/Math/Matrix.cpp | 17 +- Source/Math/Matrix.h | 5 +- Source/Math/NoGPU.cpp | 26 +- Source/Readers/ReaderLib/FramePacker.h | 5 +- Source/Readers/ReaderLib/PackerBase.cpp | 19 +- Source/Readers/ReaderLib/PackerBase.h | 16 +- Source/Readers/ReaderLib/ReaderShim.cpp | 225 ++++++++++++------ Source/Readers/ReaderLib/ReaderShim.h | 47 +++- Source/Readers/ReaderLib/SequencePacker.cpp | 9 +- Source/Readers/ReaderLib/SequencePacker.h | 5 +- .../Readers/ReaderLib/TruncatedBpttPacker.cpp | 26 +- .../Readers/ReaderLib/TruncatedBpttPacker.h | 3 +- .../ReaderTests/CNTKTextFormatReaderTests.cpp | 2 +- 25 files changed, 634 insertions(+), 188 deletions(-) create mode 100644 Source/Math/DataTransferer.cpp create mode 100644 Source/Math/DataTransferer.h diff --git a/Makefile b/Makefile index a00d5eb26..f44cf3819 100644 --- a/Makefile +++ b/Makefile @@ -287,6 +287,7 @@ MATH_SRC =\ $(SOURCEDIR)/Math/MatrixQuantizerCPU.cpp \ $(SOURCEDIR)/Math/Matrix.cpp \ $(SOURCEDIR)/Math/QuantizedMatrix.cpp \ + $(SOURCEDIR)/Math/DataTransferer.cpp \ $(SOURCEDIR)/Math/RNGHandle.cpp \ $(SOURCEDIR)/Math/TensorView.cpp \ diff --git a/Source/CNTKv2LibraryDll/MinibatchSource.cpp b/Source/CNTKv2LibraryDll/MinibatchSource.cpp index 26b4f4179..30b03dcf4 100644 --- a/Source/CNTKv2LibraryDll/MinibatchSource.cpp +++ b/Source/CNTKv2LibraryDll/MinibatchSource.cpp @@ -175,7 +175,7 @@ namespace CNTK size_t sampleSize = currentStreamDesc->m_sampleLayout->GetNumElements(); // TODO: Eliminate the unnecessary CPU to CPU copy - ReaderShim::FillMatrixFromStream(currentStreamDesc->m_storageType, dataMatrix.get(), sampleSize, currentStreamMinibatchData); + ReaderShim::FillMatrixFromStream(currentStreamDesc->m_storageType, dataMatrix.get(), sampleSize, currentStreamMinibatchData, nullptr); minibatchValuePtr = CompositeFunction::GetValueObjectFromCNTKImplMatrixAndMBLayout(sampleShape, *dataMatrix, currentStreamMinibatchData->m_layout, false); size_t numSamples = currentStreamMinibatchData->m_layout->GetActualNumSamples(); diff --git a/Source/Math/DataTransferer.cpp b/Source/Math/DataTransferer.cpp new file mode 100644 index 000000000..fb23dc271 --- /dev/null +++ b/Source/Math/DataTransferer.cpp @@ -0,0 +1,17 @@ +// +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE.md file in the project root for full license information. +// + +#include "stdafx.h" +#include "DataTransferer.h" +#include "GPUDataTransferer.h" + +namespace Microsoft { namespace MSR { namespace CNTK { + + DataTransfererPtr CreatePrefetchDataTransferer(int deviceId) + { + return std::make_shared(deviceId); + } + +} } } diff --git a/Source/Math/DataTransferer.h b/Source/Math/DataTransferer.h new file mode 100644 index 000000000..bfb15126a --- /dev/null +++ b/Source/Math/DataTransferer.h @@ -0,0 +1,68 @@ +// +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE.md file in the project root for full license information. +// + +#pragma once + +#include + +#ifdef _WIN32 +#ifdef MATH_EXPORTS +#define MATH_API __declspec(dllexport) +#else +#define MATH_API __declspec(dllimport) +#endif +#else // no DLLs on Linux +#define MATH_API +#endif + +namespace Microsoft { namespace MSR { namespace CNTK { + + // Interface to copy data from cpu to gpu and back. + // This interface allows low granularity operations, so that it is possible to issue several operations and wait on them at the end. + // I.e. + // CopyGPUToCPUAsync + // ... n copy operations started. + // CopyGPUToCPUAsync + // RecordGPUToCPUCopy + // and then WaitForCopyGPUToCPU when all the above asyncs are finished. + + // Currently this interface is used during data transfers between CPU and GPU for input data prefetching. + class MATH_API DataTransferer + { + public: + // Asynchronously starts copying data from gpu into cpu buffer on internal stream. + virtual void CopyGPUToCPUAsync(const void* gpuBuffer, size_t numElements, size_t elementSize, void* cpuBuffer) = 0; + + // Records event that copies were started on internal stream. + virtual void RecordGPUToCPUCopy() = 0; + + // Waits on the event that triggers when all copies have been finished. + virtual void WaitForCopyGPUToCPU() = 0; + + // Asynchronously starts copying data from cpu into gpu buffer. + virtual void CopyCPUToGPUAsync(const void* cpuBuffer, size_t numElements, size_t elementSize, void* gpuBuffer) = 0; + + // Records event that copies were started on internal stream. + virtual void RecordCPUToGPUCopy() = 0; + + // Waits on the event that triggers when all copies have been finished. + virtual void WaitForCopyCPUToGPU() = 0; + + // Records an event on a compute stream. + virtual void RecordComputeStreamSyncPoint() = 0; + + // Synchronizes GPU to CPU stream with recorded comput sync event. + virtual void WaitForSyncPointOnFetchStreamAsync() = 0; + + // Synchronizes CPU to GPU stream with recorded comput sync event. + virtual void WaitForSyncPointOnAssignStreamAsync() = 0; + + virtual ~DataTransferer() {} + }; + + typedef std::shared_ptr DataTransfererPtr; + + MATH_API DataTransfererPtr CreatePrefetchDataTransferer(int deviceId); +}}} diff --git a/Source/Math/GPUDataTransferer.cpp b/Source/Math/GPUDataTransferer.cpp index 48fdcbf57..ef726e233 100644 --- a/Source/Math/GPUDataTransferer.cpp +++ b/Source/Math/GPUDataTransferer.cpp @@ -47,6 +47,92 @@ static } } +//// Base class for different data transferers. +GranularGPUDataTransferer::GranularGPUDataTransferer(int deviceId, const cudaStream_t& fetchStream, const cudaStream_t& assignStream, bool blocking) + : m_fetchStream(fetchStream), + m_assignStream(assignStream), + m_deviceId(deviceId), + m_fetchCompleteEvent(nullptr), + m_assignCompleteEvent(nullptr), + m_syncEvent(nullptr) +{ + PrepareDevice(m_deviceId); + + // Note: Do NOT use cudaEventBlockingSync (which supposedly yields the process)--it will totally break cudaEventSynchronize(), causing it to take 50 or 100 ms randomly. + // NOTE: We never saw this in reading prefetch. + unsigned flags = cudaEventDisableTiming; + if (blocking) + flags |= cudaEventBlockingSync; + + // events + cudaEventCreateWithFlags(&m_fetchCompleteEvent, flags) || "cudaEventCreateWithFlags failed"; + cudaEventCreateWithFlags(&m_assignCompleteEvent, flags) || "cudaEventCreateWithFlags failed"; + cudaEventCreateWithFlags(&m_syncEvent, cudaEventDisableTiming) || "cudaEventCreateWithFlags failed"; +} + +GranularGPUDataTransferer::~GranularGPUDataTransferer() +{ + // TODO: Check for error code and throw if !std::uncaught_exception() + cudaEventDestroy(m_assignCompleteEvent); + cudaEventDestroy(m_fetchCompleteEvent); + cudaEventDestroy(m_syncEvent); +} + +void GranularGPUDataTransferer::CopyGPUToCPUAsync(const void* gpuBuffer, size_t numElements, size_t elementSize, void* cpuBuffer) +{ + PrepareDevice(m_deviceId); + + cudaMemcpyAsync(cpuBuffer, gpuBuffer, numElements * elementSize, cudaMemcpyDeviceToHost, m_fetchStream) || "cudaMemcpyAsync failed"; +} + +void GranularGPUDataTransferer::RecordGPUToCPUCopy() +{ + cudaEventRecord(m_fetchCompleteEvent, m_fetchStream) || "cudaEventRecord failed"; +} + +void GranularGPUDataTransferer::WaitForCopyGPUToCPU() +{ + PrepareDevice(m_deviceId); + cudaEventSynchronize(m_fetchCompleteEvent) || "cudaEventSynchronize failed"; +} + +void GranularGPUDataTransferer::CopyCPUToGPUAsync(const void* cpuBuffer, size_t numElements, size_t elementSize, void* gpuBuffer) +{ + PrepareDevice(m_deviceId); + cudaMemcpyAsync(gpuBuffer, cpuBuffer, numElements * elementSize, cudaMemcpyHostToDevice, m_assignStream) || "cudaMemcpyAsync failed"; +} + +void GranularGPUDataTransferer::RecordCPUToGPUCopy() +{ + cudaEventRecord(m_assignCompleteEvent, m_assignStream) || "cudaEventRecord failed"; +} + +void GranularGPUDataTransferer::WaitForCopyCPUToGPU() +{ + PrepareDevice(m_deviceId); + cudaEventSynchronize(m_assignCompleteEvent) || "cudaEventSynchronize failed"; +} + +void GranularGPUDataTransferer::RecordComputeStreamSyncPoint() +{ + PrepareDevice(m_deviceId); + cudaEventRecord(m_syncEvent, GetStream()) || "cudeEventRecord failed"; +} + +void GranularGPUDataTransferer::WaitForSyncPointOnFetchStreamAsync() +{ + PrepareDevice(m_deviceId); + cudaStreamWaitEvent(m_fetchStream, m_syncEvent, 0 /*flags 'must be 0'*/) || "cudaStreamWaitEvent failed"; +} + +void GranularGPUDataTransferer::WaitForSyncPointOnAssignStreamAsync() +{ + PrepareDevice(m_deviceId); + cudaStreamWaitEvent(m_assignStream, m_syncEvent, 0 /*flags 'must be 0'*/) || "cudaStreamWaitEvent failed"; +} + +//// GPUDataTransferer + // same but for event template void GPUDataTransferer::SyncEvent(cudaEvent_t ev) @@ -64,80 +150,86 @@ void GPUDataTransferer::SyncEvent(cudaEvent_t ev) //streams template -cudaStream_t GPUDataTransferer::m_fetchStream = NULL; +cudaStream_t GPUDataTransferer::s_fetchStream = NULL; template -cudaStream_t GPUDataTransferer::m_assignStream = NULL; +cudaStream_t GPUDataTransferer::s_assignStream = NULL; template cudaStream_t GPUDataTransferer::GetFetchStream() { - return m_fetchStream; + return s_fetchStream; } template -GPUDataTransferer::GPUDataTransferer(int deviceId, bool useConcurrentStreams) - : m_deviceId(deviceId) +GPUDataTransferer::GPUDataTransferer(int deviceId, bool useConcurrentStreams) { - PrepareDevice(m_deviceId); - - // events - // Note: Do NOT use cudaEventBlockingSync (which supposedly yields the process)--it will totally break cudaEventSynchronize(), causing it to take 50 or 100 ms randomly. - cudaEventCreateWithFlags(&m_fetchCompleteEvent, cudaEventDisableTiming) || "cudaEventCreateWithFlags failed"; - cudaEventCreateWithFlags(&m_assignCompleteEvent, cudaEventDisableTiming) || "cudaEventCreateWithFlags failed"; - #pragma warning(disable : 4127) - if (useConcurrentStreams && (m_fetchStream == NULL)) + if (useConcurrentStreams && (s_fetchStream == NULL)) { - cudaStreamCreateWithFlags(&m_fetchStream, cudaStreamNonBlocking) || "cudaStreamCreateWithFlags failed"; - cudaStreamCreateWithFlags(&m_assignStream, cudaStreamNonBlocking) || "cudaStreamCreateWithFlags failed"; + cudaStreamCreateWithFlags(&s_fetchStream, cudaStreamNonBlocking) || "cudaStreamCreateWithFlags failed"; + cudaStreamCreateWithFlags(&s_assignStream, cudaStreamNonBlocking) || "cudaStreamCreateWithFlags failed"; } + + m_inner = make_unique(deviceId, s_fetchStream, s_assignStream); } template GPUDataTransferer::~GPUDataTransferer() { // BUGBUG: we don't destroy our streams (they are static variables); we need a static destructor, I am too lazy now - // TODO: Check for error code and throw if !std::uncaught_exception() - cudaEventDestroy(m_assignCompleteEvent); - cudaEventDestroy(m_fetchCompleteEvent); } template void GPUDataTransferer::CopyGPUToCPUAsync(ElemType* gpuBuffer, size_t numElements, ElemType* cpuBuffer) { - PrepareDevice(m_deviceId); - - cudaMemcpyAsync(cpuBuffer, gpuBuffer, numElements * sizeof(ElemType), cudaMemcpyDeviceToHost, m_fetchStream) || "cudaMemcpyAsync failed"; - cudaEventRecord(m_fetchCompleteEvent, m_fetchStream) || "cudaEventRecord failed"; -} - -template -void GPUDataTransferer::WaitForCopyGPUToCPUAsync() -{ - PrepareDevice(m_deviceId); - - SyncEvent(m_fetchCompleteEvent); + m_inner->CopyGPUToCPUAsync(gpuBuffer, numElements, sizeof(ElemType), cpuBuffer); + m_inner->RecordGPUToCPUCopy(); } template void GPUDataTransferer::CopyCPUToGPUAsync(ElemType* cpuBuffer, size_t numElements, ElemType* gpuBuffer) { - PrepareDevice(m_deviceId); + m_inner->CopyCPUToGPUAsync(cpuBuffer, numElements, sizeof(ElemType), gpuBuffer); + m_inner->RecordCPUToGPUCopy(); +} - cudaMemcpyAsync(gpuBuffer, cpuBuffer, numElements * sizeof(ElemType), cudaMemcpyHostToDevice, m_assignStream) || "cudaMemcpyAsync failed"; - cudaEventRecord(m_assignCompleteEvent, m_assignStream) || "cudaEventRecord failed"; +template +void GPUDataTransferer::WaitForCopyGPUToCPUAsync() +{ + PrepareDevice(m_inner->m_deviceId); + + SyncEvent(m_inner->m_fetchCompleteEvent); } template void GPUDataTransferer::WaitForCopyCPUToGPUAsync() { - PrepareDevice(m_deviceId); + PrepareDevice(m_inner->m_deviceId); - SyncEvent(m_assignCompleteEvent); + SyncEvent(m_inner->m_assignCompleteEvent); } //explicit template class GPUDataTransferer; template class GPUDataTransferer; -} } } + +/// PrefetchGPUDataTransferer + +cudaStream_t PrefetchGPUDataTransferer::s_prefetchStream = nullptr; +cudaStream_t PrefetchGPUDataTransferer::s_prefetchAssignStream = nullptr; + +PrefetchGPUDataTransferer::PrefetchGPUDataTransferer(int deviceId) : GranularGPUDataTransferer(deviceId, s_prefetchStream, s_prefetchAssignStream, true) +{ +#pragma warning(disable : 4127) + if (s_prefetchStream == nullptr) + { + // Assign stream always stays null, not required for prefetch. + + // TODO: Currently the s_prefetchStream is not destroyed. + // As static it can be used in several readers with different lifecycle so we allow it to live till the end. + cudaStreamCreateWithFlags(&s_prefetchStream, cudaStreamNonBlocking) || "cudaStreamCreateWithFlags failed"; + } +} + +}}} diff --git a/Source/Math/GPUDataTransferer.h b/Source/Math/GPUDataTransferer.h index 1c57f9464..d3156444f 100644 --- a/Source/Math/GPUDataTransferer.h +++ b/Source/Math/GPUDataTransferer.h @@ -4,8 +4,8 @@ #include #include #endif // !CPUONLY -#include -#include + +#include "Basics.h" #ifdef _WIN32 #ifndef MATH_API @@ -19,11 +19,63 @@ #define MATH_API #endif +#include "DataTransferer.h" + namespace Microsoft { namespace MSR { namespace CNTK { +class MATH_API GranularGPUDataTransferer : public DataTransferer +{ +public: +#ifndef CPUONLY + GranularGPUDataTransferer(int deviceId, const cudaStream_t& fetchStream, const cudaStream_t& assignStream, bool blocking = false); +#else + GranularGPUDataTransferer() {} +#endif // !CPUONLY + + ~GranularGPUDataTransferer(); + + void CopyGPUToCPUAsync(const void* gpuBuffer, size_t numElements, size_t elementSize, void* cpuBuffer) override; + void RecordGPUToCPUCopy() override; + void WaitForCopyGPUToCPU() override; + + void CopyCPUToGPUAsync(const void* cpuBuffer, size_t numElements, size_t elementSize, void* gpuBuffer) override; + void RecordCPUToGPUCopy() override; + void WaitForCopyCPUToGPU() override; + + void RecordComputeStreamSyncPoint() override; + void WaitForSyncPointOnFetchStreamAsync() override; + void WaitForSyncPointOnAssignStreamAsync() override; + +#ifndef CPUONLY +private: + // Not owned by this class, are always injected. + const cudaStream_t& m_fetchStream; + const cudaStream_t& m_assignStream; + +protected: + mutable cudaEvent_t m_fetchCompleteEvent; + mutable cudaEvent_t m_assignCompleteEvent; + mutable cudaEvent_t m_syncEvent; +#endif // !CPUONLY + +protected: + int m_deviceId; + + // Disallow copy and move construction and assignment + DISABLE_COPY_AND_MOVE(GranularGPUDataTransferer); + + template + friend class GPUDataTransferer; +}; + template class MATH_API GPUDataTransferer { +#pragma warning(push) +#pragma warning(disable : 4251) // Using std::unique pointer on the dll boundary. + std::unique_ptr m_inner; +#pragma warning(pop) + public: GPUDataTransferer(int deviceId, bool useConcurrentStreams); ~GPUDataTransferer(); @@ -32,9 +84,9 @@ public: DISABLE_COPY_AND_MOVE(GPUDataTransferer); void CopyGPUToCPUAsync(ElemType* gpuBuffer, size_t numElements, ElemType* cpuBuffer); - void WaitForCopyGPUToCPUAsync(); - void CopyCPUToGPUAsync(ElemType* cpuBuffer, size_t numElements, ElemType* gpuBuffer); + + void WaitForCopyGPUToCPUAsync(); void WaitForCopyCPUToGPUAsync(); #ifndef CPUONLY @@ -44,17 +96,24 @@ public: private: #ifndef CPUONLY static void SyncEvent(cudaEvent_t ev); + + static cudaStream_t s_fetchStream; + static cudaStream_t s_assignStream; #endif // !CPUONLY +}; + +class PrefetchGPUDataTransferer : public GranularGPUDataTransferer +{ +public: + PrefetchGPUDataTransferer(int deviceId); private: #ifndef CPUONLY - static cudaStream_t m_fetchStream; - static cudaStream_t m_assignStream; + static cudaStream_t s_prefetchStream; + static cudaStream_t s_prefetchAssignStream; +#endif - mutable cudaEvent_t m_fetchCompleteEvent; - mutable cudaEvent_t m_assignCompleteEvent; -#endif // !CPUONLY - - int m_deviceId; + DISABLE_COPY_AND_MOVE(PrefetchGPUDataTransferer); }; -} } } + +}}} diff --git a/Source/Math/GPUMatrix.cu b/Source/Math/GPUMatrix.cu index 0e47b28f2..a2c0e1737 100644 --- a/Source/Math/GPUMatrix.cu +++ b/Source/Math/GPUMatrix.cu @@ -1180,7 +1180,7 @@ void GPUMatrix::SetValue(const GPUSparseMatrix& deepCopyFrom #endif template -void GPUMatrix::SetValue(const size_t numRows, const size_t numCols, int deviceId, ElemType* pArray, size_t matrixFlags) +void GPUMatrix::SetValue(const size_t numRows, const size_t numCols, int deviceId, ElemType* pArray, size_t matrixFlags, DataTransferer* transferer) { // handle externally managed case // BUGBUG: This is super super ugly, and needs to be fixed, but if matrixFlags has the right value, then we can't free anything, @@ -1201,6 +1201,9 @@ void GPUMatrix::SetValue(const size_t numRows, const size_t numCols, i } else { + if (transferer && (matrixFlags & matrixFlagSetValueOnDevice)) + RuntimeError("Asynchronous data copy from device to device is currently not supported."); + // if the devices are different move it now if (GetComputeDeviceId() != deviceId && deviceId >= 0) { @@ -1217,7 +1220,10 @@ void GPUMatrix::SetValue(const size_t numRows, const size_t numCols, i { if (!(matrixFlags & matrixFormatRowMajor)) { - CUDA_CALL(cudaMemcpy(Data(), pArray, sizeof(ElemType) * GetNumElements(), (matrixFlags & matrixFlagSetValueOnDevice) ? cudaMemcpyDeviceToDevice : cudaMemcpyHostToDevice)); + if (transferer) + transferer->CopyCPUToGPUAsync(pArray, GetNumElements(), sizeof(ElemType), Data()); + else + CUDA_CALL(cudaMemcpy(Data(), pArray, sizeof(ElemType) * GetNumElements(), (matrixFlags & matrixFlagSetValueOnDevice) ? cudaMemcpyDeviceToDevice : cudaMemcpyHostToDevice)); } else // row major: must transpose (this is not meant to be efficient, but very useful for defining inline matrices for test code) { @@ -1225,7 +1231,11 @@ void GPUMatrix::SetValue(const size_t numRows, const size_t numCols, i for (size_t i = 0; i < numRows; i++) for (size_t j = 0; j < numCols; j++) transposed[i + numRows * j] = pArray[j + numCols * i]; - CUDA_CALL(cudaMemcpy(Data(), transposed.data(), sizeof(ElemType) * GetNumElements(), (matrixFlags & matrixFlagSetValueOnDevice) ? cudaMemcpyDeviceToDevice : cudaMemcpyHostToDevice)); + + if (transferer) + transferer->CopyCPUToGPUAsync(transposed.data(), GetNumElements(), sizeof(ElemType), Data()); + else + CUDA_CALL(cudaMemcpy(Data(), transposed.data(), sizeof(ElemType) * GetNumElements(), (matrixFlags & matrixFlagSetValueOnDevice) ? cudaMemcpyDeviceToDevice : cudaMemcpyHostToDevice)); } } } @@ -4521,7 +4531,7 @@ template GPUMatrix GPUMatrix::ColumnSlice(size_t startColumn, size_t template GPUMatrix& GPUMatrix::operator=(GPUMatrix&&); template GPUMatrix::GPUMatrix(int); template void GPUMatrix::SetValue(const char); -template void GPUMatrix::SetValue(const size_t numRows, const size_t numCols, int deviceId, char* pArray, size_t matrixFlags); +template void GPUMatrix::SetValue(const size_t numRows, const size_t numCols, int deviceId, char* pArray, size_t matrixFlags, DataTransferer* transferer); //template void GPUMatrix::SetValue(CPUMatrix const&); template void GPUMatrix::SetValue(GPUMatrix const&); //template void GPUMatrix::SetValue(CPUSparseMatrix const&); @@ -4546,7 +4556,7 @@ template GPUMatrix GPUMatrix::ColumnSlice(size_t startColumn, size template GPUMatrix& GPUMatrix::operator=(GPUMatrix&&); template GPUMatrix::GPUMatrix(int); template void GPUMatrix::SetValue(const short); -template void GPUMatrix::SetValue(const size_t numRows, const size_t numCols, int deviceId, short* pArray, size_t matrixFlags); +template void GPUMatrix::SetValue(const size_t numRows, const size_t numCols, int deviceId, short* pArray, size_t matrixFlags, DataTransferer* transferer); //template void GPUMatrix::SetValue(CPUMatrix const&); template void GPUMatrix::SetValue(GPUMatrix const&); //template void GPUMatrix::SetValue(CPUSparseMatrix const&); diff --git a/Source/Math/GPUMatrix.h b/Source/Math/GPUMatrix.h index 830c25f56..3cf88b29c 100644 --- a/Source/Math/GPUMatrix.h +++ b/Source/Math/GPUMatrix.h @@ -61,6 +61,8 @@ cudaStream_t MATH_API GetStream(); namespace Microsoft { namespace MSR { namespace CNTK { +class DataTransferer; + // ----------------------------------------------------------------------- // SyncGuard -- synchronize around CUDA calls // ----------------------------------------------------------------------- @@ -252,7 +254,7 @@ public: void SetValue(const GPUMatrix& deepCopyFrom); //void SetValue(const CPUSparseMatrix& deepCopyFrom); //void SetValue(const GPUSparseMatrix& deepCopyFrom); - void SetValue(const size_t numRows, const size_t numCols, int deviceId, ElemType* pArray, size_t matrixFlags = matrixFlagNormal); + void SetValue(const size_t numRows, const size_t numCols, int deviceId, ElemType* pArray, size_t matrixFlags = matrixFlagNormal, DataTransferer* transferer = nullptr); void SetDiagonalValue(const ElemType v); void SetDiagonalValue(const GPUMatrix& vector); diff --git a/Source/Math/GPUSparseMatrix.cu b/Source/Math/GPUSparseMatrix.cu index d8e4bfe24..5d37d9df4 100644 --- a/Source/Math/GPUSparseMatrix.cu +++ b/Source/Math/GPUSparseMatrix.cu @@ -894,7 +894,7 @@ void GPUSparseMatrix::GetMatrixFromCSRFormat(CPUSPARSE_INDEX_TYPE*& h_ template void GPUSparseMatrix::SetMatrixFromCSCFormat(const CPUSPARSE_INDEX_TYPE* h_CSCCol, const CPUSPARSE_INDEX_TYPE* h_Row, const ElemType* h_Val, - const size_t nz, const size_t numRows, const size_t numCols, const bool IsOnDevice /*= false*/, const DEVICEID_TYPE devId /*= -1*/) + const size_t nz, const size_t numRows, const size_t numCols, const bool IsOnDevice /*= false*/, const DEVICEID_TYPE devId /*= -1*/, DataTransferer* transferer /*= nullptr*/) { VerifyWritable(__func__); @@ -905,14 +905,29 @@ void GPUSparseMatrix::SetMatrixFromCSCFormat(const CPUSPARSE_INDEX_TYP SetFormat(matrixFormatSparseCSC); RequireSizeAndAllocate(numRows, numCols, nz, true, false); + if (transferer && IsOnDevice) + RuntimeError("Currently it is prohibited to copy data asynchronous from device to device."); + // m_nz doesn't exist anymore. How are we going to deal with the NzSize, RowSize, and ColSize? Do it ourselves of course. + cudaMemcpyKind kind = IsOnDevice ? cudaMemcpyDeviceToDevice : cudaMemcpyHostToDevice; - CUDA_CALL(cudaMemcpy(Data(), h_Val, nz * sizeof(ElemType), kind)); + if (transferer) + transferer->CopyCPUToGPUAsync(h_Val, nz, sizeof(ElemType), Data()); + else + CUDA_CALL(cudaMemcpy(Data(), h_Val, nz * sizeof(ElemType), kind)); if (sizeof(CPUSPARSE_INDEX_TYPE) == sizeof(GPUSPARSE_INDEX_TYPE)) { - CUDA_CALL(cudaMemcpy(RowLocation(), h_Row, sizeof(GPUSPARSE_INDEX_TYPE) * nz, kind)); - CUDA_CALL(cudaMemcpy(ColLocation(), h_CSCCol, sizeof(GPUSPARSE_INDEX_TYPE) * (numCols+1), kind)); + if (transferer) + { + transferer->CopyCPUToGPUAsync(h_Row, nz, sizeof(GPUSPARSE_INDEX_TYPE), RowLocation()); + transferer->CopyCPUToGPUAsync(h_CSCCol, numCols + 1, sizeof(GPUSPARSE_INDEX_TYPE), ColLocation()); + } + else + { + CUDA_CALL(cudaMemcpy(RowLocation(), h_Row, sizeof(GPUSPARSE_INDEX_TYPE) * nz, kind)); + CUDA_CALL(cudaMemcpy(ColLocation(), h_CSCCol, sizeof(GPUSPARSE_INDEX_TYPE) * (numCols + 1), kind)); + } } else { @@ -923,8 +938,16 @@ void GPUSparseMatrix::SetMatrixFromCSCFormat(const CPUSPARSE_INDEX_TYP ConvertBuffer(pCol, h_CSCCol, (numCols+1)); ConvertBuffer(pRow, h_Row, nz); - CUDA_CALL(cudaMemcpy(RowLocation(), pRow, sizeof(GPUSPARSE_INDEX_TYPE) * nz, kind)); - CUDA_CALL(cudaMemcpy(ColLocation(), pCol, sizeof(GPUSPARSE_INDEX_TYPE) * (numCols+1), kind)); + if (transferer) + { + transferer->CopyCPUToGPUAsync(pRow, nz, sizeof(GPUSPARSE_INDEX_TYPE), RowLocation()); + transferer->CopyCPUToGPUAsync(pCol, numCols + 1, sizeof(GPUSPARSE_INDEX_TYPE), ColLocation()); + } + else + { + CUDA_CALL(cudaMemcpy(RowLocation(), pRow, sizeof(GPUSPARSE_INDEX_TYPE) * nz, kind)); + CUDA_CALL(cudaMemcpy(ColLocation(), pCol, sizeof(GPUSPARSE_INDEX_TYPE) * (numCols + 1), kind)); + } } } diff --git a/Source/Math/GPUSparseMatrix.h b/Source/Math/GPUSparseMatrix.h index 81caefb3c..9275baa3a 100644 --- a/Source/Math/GPUSparseMatrix.h +++ b/Source/Math/GPUSparseMatrix.h @@ -338,7 +338,7 @@ public: void SetMatrixFromCSRFormat(const CPUSPARSE_INDEX_TYPE* h_CSRRow, const CPUSPARSE_INDEX_TYPE* h_Col, const ElemType* h_Val, const size_t nz, const size_t numRows, const size_t numCols, const bool IsOnDevice = false, const DEVICEID_TYPE devId = -1); void SetMatrixFromCSCFormat(const CPUSPARSE_INDEX_TYPE* h_CSCCol, const CPUSPARSE_INDEX_TYPE* h_Row, const ElemType* h_Val, - const size_t nz, const size_t numRows, const size_t numCols, const bool IsOnDevice = false, const DEVICEID_TYPE devId = -1); + const size_t nz, const size_t numRows, const size_t numCols, const bool IsOnDevice = false, const DEVICEID_TYPE devId = -1, DataTransferer* transferer = nullptr); // Gets sparse matrix in CSR format. this acts as deep copy. All passed pointers must be NULL. the function will allocate memory itself. void GetMatrixFromCSRFormat(CPUSPARSE_INDEX_TYPE*& h_CSRRow, CPUSPARSE_INDEX_TYPE*& h_Col, ElemType*& h_Val, size_t& numElemAllocated, size_t& nz, size_t& numRows, size_t& numCols) const; diff --git a/Source/Math/Math.vcxproj b/Source/Math/Math.vcxproj index bed3daa54..0676c4643 100644 --- a/Source/Math/Math.vcxproj +++ b/Source/Math/Math.vcxproj @@ -1,4 +1,4 @@ - + @@ -171,6 +171,7 @@ + @@ -203,6 +204,7 @@ + false @@ -229,4 +231,4 @@ - + \ No newline at end of file diff --git a/Source/Math/Math.vcxproj.filters b/Source/Math/Math.vcxproj.filters index e24870052..5e4712c6d 100644 --- a/Source/Math/Math.vcxproj.filters +++ b/Source/Math/Math.vcxproj.filters @@ -46,8 +46,9 @@ CPU - CPU + CPU + @@ -115,18 +116,20 @@ RNN - CPU + CPU - CPU + CPU - CPU + CPU - CPU + CPU + + @@ -180,4 +183,4 @@ {ee6bf704-73f0-488d-8432-0d23f034de88} - + \ No newline at end of file diff --git a/Source/Math/Matrix.cpp b/Source/Math/Matrix.cpp index ba5892d97..6db23d793 100644 --- a/Source/Math/Matrix.cpp +++ b/Source/Math/Matrix.cpp @@ -1262,15 +1262,16 @@ void Matrix::AssignValuesOf(const Matrix& deepCopyFrom) } template -void Matrix::SetValue(const size_t numRows, const size_t numCols, int deviceId, ElemType* pArray, const size_t matrixFlags) +void Matrix::SetValue(const size_t numRows, const size_t numCols, int deviceId, ElemType* pArray, const size_t matrixFlags, DataTransferer* transferer) { if (((numRows * numCols) > 0) && (pArray == nullptr)) InvalidArgument("Invalid pArray."); + // Only gpu matrix supports async data transfers, so data transferer passed only to gpu matrix. DISPATCH_MATRIX_ON_FLAG(this, this, m_CPUMatrix->SetValue(numRows, numCols, pArray, matrixFlags), - m_GPUMatrix->SetValue(numRows, numCols, deviceId, pArray, matrixFlags), + m_GPUMatrix->SetValue(numRows, numCols, deviceId, pArray, matrixFlags, transferer), NOT_IMPLEMENTED, NOT_IMPLEMENTED); } @@ -1289,10 +1290,14 @@ void Matrix::SetValue(const size_t rIdx, const size_t cIdx, ElemType v // read features template void Matrix::SetMatrixFromCSCFormat(const CPUSPARSE_INDEX_TYPE* h_CSCCol, const CPUSPARSE_INDEX_TYPE* h_Row, const ElemType* h_Val, - const size_t nz, const size_t numRows, const size_t numCols) + const size_t nz, const size_t numRows, const size_t numCols, DataTransferer* transferer) { // Note: The current implementation uses the xPUSparseMatrix as temporary space. This allows for memory sharing between calls. If // xPUSparseMatrix is a view, this code will cause an error during runtime stating that the view is not writable nor resizable. + + // Only gpu matrix supports async data transfers, so data transferer passed only to gpu matrix in case we do not need to reassign to dense. + // When we have to reassign sparse to dense we cannot use async operation, because at the time when AssignColumnSliceToDense is called the + // data should already be copied to destination. DISPATCH_MATRIX_ON_FLAG(this, this, { if (!m_CPUSparseMatrix) m_CPUSparseMatrix = make_shared>(matrixFormatSparseCSC, numRows, numCols, nz); @@ -1305,7 +1310,7 @@ void Matrix::SetMatrixFromCSCFormat(const CPUSPARSE_INDEX_TYPE* h_CSCC m_GPUSparseMatrix->AssignColumnSliceToDense(*m_GPUMatrix, 0, numCols); }, { m_CPUSparseMatrix->SetMatrixFromCSCFormat(h_CSCCol, h_Row, h_Val, nz, numRows, numCols); }, - { m_GPUSparseMatrix->SetMatrixFromCSCFormat(h_CSCCol, h_Row, h_Val, nz, numRows, numCols); }); + { m_GPUSparseMatrix->SetMatrixFromCSCFormat(h_CSCCol, h_Row, h_Val, nz, numRows, numCols, false, -1, transferer); }); } template @@ -5483,7 +5488,7 @@ template void Matrix::TransferToDeviceIfNotThere(int id_to, bool isBeingMo template size_t Matrix::GetNumRows() const; template size_t Matrix::GetNumCols() const; template void Matrix::SetValue(const char); -template void Matrix::SetValue(size_t numRows, const size_t numCols, int deviceId, char* pArray, size_t matrixFlags); +template void Matrix::SetValue(size_t numRows, const size_t numCols, int deviceId, char* pArray, size_t matrixFlags, DataTransferer* transferer); //template void Matrix::SetValue(const Matrix&, MatrixFormat); template void Matrix::SetValue(const Matrix&); template void Matrix::AssignValuesOf (const Matrix&); @@ -5508,7 +5513,7 @@ template void Matrix::TransferToDeviceIfNotThere(int id_to, bool isBeingM template size_t Matrix::GetNumRows() const; template size_t Matrix::GetNumCols() const; template void Matrix::SetValue(const short); -template void Matrix::SetValue(size_t numRows, const size_t numCols, int deviceId, short* pArray, size_t matrixFlags); +template void Matrix::SetValue(size_t numRows, const size_t numCols, int deviceId, short* pArray, size_t matrixFlags, DataTransferer* transferer); //template void Matrix::SetValue(const Matrix&, MatrixFormat); template void Matrix::SetValue(const Matrix&); template void Matrix::AssignValuesOf(const Matrix&); diff --git a/Source/Math/Matrix.h b/Source/Math/Matrix.h index 35f79b2c4..2997991b4 100644 --- a/Source/Math/Matrix.h +++ b/Source/Math/Matrix.h @@ -14,6 +14,7 @@ #include "CommonMatrix.h" #include "TensorShape.h" // only for SmallVector; I was hoping to keep this out #include "RNGHandle.h" +#include "DataTransferer.h" #include #include // for shared_ptr #include @@ -245,7 +246,7 @@ public: void SetValue (const Matrix& deepCopyFrom); // AssignValuesOf respects the target matrix's information. It copies the values from the target into the memory of the source. void AssignValuesOf(const Matrix& deepCopyFrom); - void SetValue(const size_t numRows, const size_t numCols, int deviceId, ElemType* pArray, const size_t matrixFlags = matrixFlagNormal); + void SetValue(const size_t numRows, const size_t numCols, int deviceId, ElemType* pArray, const size_t matrixFlags = matrixFlagNormal, DataTransferer* transferer = nullptr); void SetValue(const size_t rIdx, const size_t cIdx, ElemType val); // set matrix sparsely void SetValue(const size_t numRows, const size_t numCols, std::initializer_list l) // SetValue(2,3, {1,2,3, 4,5,6}); { @@ -259,7 +260,7 @@ public: SetValue(MakeNan(__LINE__)); } void SetMatrixFromCSCFormat(const CPUSPARSE_INDEX_TYPE* h_CSCCol, const CPUSPARSE_INDEX_TYPE* h_Row, const ElemType* h_Val, - const size_t nz, const size_t numRows, const size_t numCols); + const size_t nz, const size_t numRows, const size_t numCols, DataTransferer* transferer = nullptr); void MaskColumnsValue(const Matrix& columnsMask, ElemType val); diff --git a/Source/Math/NoGPU.cpp b/Source/Math/NoGPU.cpp index 9b73078dc..0bb37704f 100644 --- a/Source/Math/NoGPU.cpp +++ b/Source/Math/NoGPU.cpp @@ -208,7 +208,7 @@ void GPUSparseMatrix::Reset() // copy features to GPU matrix template void GPUSparseMatrix::SetMatrixFromCSCFormat(const CPUSPARSE_INDEX_TYPE* h_CSCCol, const CPUSPARSE_INDEX_TYPE* h_Row, const ElemType* h_Val, - const size_t nz, const size_t numRows, const size_t numCols, const bool IsOnDevice /*= false*/, const DEVICEID_TYPE devId /*= -1*/) + const size_t nz, const size_t numRows, const size_t numCols, const bool IsOnDevice /*= false*/, const DEVICEID_TYPE devId /*= -1*/, DataTransferer* transferer) { } @@ -1014,7 +1014,7 @@ void GPUMatrix::SetValue(GPUSparseMatrix const&) #endif template -void GPUMatrix::SetValue(const size_t numRows, const size_t numCols, int deviceId, ElemType* pArray, size_t matrixFlags) +void GPUMatrix::SetValue(const size_t numRows, const size_t numCols, int deviceId, ElemType* pArray, size_t matrixFlags, DataTransferer* transferer) { } @@ -2187,6 +2187,28 @@ void GPUMatrixComputeStreamEvent::SynchronizeDataTransferFetchStreamWithEvent GPUDataTransferer::GPUDataTransferer(int, bool) { diff --git a/Source/Readers/ReaderLib/FramePacker.h b/Source/Readers/ReaderLib/FramePacker.h index dc89b81c5..52fce29d3 100644 --- a/Source/Readers/ReaderLib/FramePacker.h +++ b/Source/Readers/ReaderLib/FramePacker.h @@ -15,8 +15,9 @@ class FramePacker : public SequencePacker public: FramePacker( SequenceEnumeratorPtr sequenceEnumerator, - const std::vector& streams) : - SequencePacker(sequenceEnumerator, streams) + const std::vector& streams, + size_t numberOfBuffers = 2) : + SequencePacker(sequenceEnumerator, streams, numberOfBuffers) {} private: diff --git a/Source/Readers/ReaderLib/PackerBase.cpp b/Source/Readers/ReaderLib/PackerBase.cpp index d411158bb..542da0bdb 100644 --- a/Source/Readers/ReaderLib/PackerBase.cpp +++ b/Source/Readers/ReaderLib/PackerBase.cpp @@ -39,9 +39,14 @@ void PackerBase::StartEpoch(const EpochConfiguration& config, const std::vector< if (memoryProviders.size() != m_outputStreamDescriptions.size()) RuntimeError("Number of streams does not match the number of memory providers."); - m_streamBuffers.reserve(m_outputStreamDescriptions.size()); - for (size_t i = 0; i < m_outputStreamDescriptions.size(); ++i) - m_streamBuffers.push_back(StreamBuffer(memoryProviders[i])); + m_streamBuffers.resize(m_numberOfBuffers); + for (size_t i = 0; i < m_numberOfBuffers; ++i) + { + auto& currentBuffer = m_streamBuffers[i]; + currentBuffer.reserve(m_outputStreamDescriptions.size()); + for (size_t j = 0; j < m_outputStreamDescriptions.size(); ++j) + currentBuffer.push_back(StreamBuffer(memoryProviders[j])); + } } m_minibatchSize = config.m_minibatchSizeInSamples; @@ -52,11 +57,15 @@ void PackerBase::StartEpoch(const EpochConfiguration& config, const std::vector< } PackerBase::PackerBase(SequenceEnumeratorPtr sequenceEnumerator, - const std::vector& streams) : + const std::vector& streams, + size_t numberOfBuffers) : m_sequenceEnumerator(sequenceEnumerator), m_minibatchSize(0), - m_outputStreamDescriptions(streams) + m_outputStreamDescriptions(streams), + m_numberOfBuffers(numberOfBuffers), + m_currentBufferIndex(0) { + assert(m_numberOfBuffers >= 1); m_inputStreamDescriptions = sequenceEnumerator->GetStreamDescriptions(); assert(m_inputStreamDescriptions.size() != 0); assert(m_inputStreamDescriptions.size() == m_outputStreamDescriptions.size()); diff --git a/Source/Readers/ReaderLib/PackerBase.h b/Source/Readers/ReaderLib/PackerBase.h index 30ea73548..4634d1df1 100644 --- a/Source/Readers/ReaderLib/PackerBase.h +++ b/Source/Readers/ReaderLib/PackerBase.h @@ -33,7 +33,8 @@ protected: }; PackerBase(SequenceEnumeratorPtr sequenceEnumerator, - const std::vector& streams); + const std::vector& streams, + size_t numberOfBuffers); typedef std::vector StreamBatch; @@ -63,8 +64,17 @@ protected: // Output stream descriptions expected by the network. std::vector m_inputStreamDescriptions; - // Buffers for allocated data. - std::vector m_streamBuffers; + // Indicates how many internal buffers with pinned memory are supported. + // If N - then N sequential calls to PackMinibatch are valid, and N+1 call will overwrite + // the memory of the first call. + size_t m_numberOfBuffers; + + // Buffers for allocated data. Outer vector size == m_numberOfBuffers, + // inner vector contains buffers for all streams. + std::vector> m_streamBuffers; + + // Cyclic index of the current buffer. m_currentBufferIndex < m_numberOfBuffers; + size_t m_currentBufferIndex; // Minibatch size in samples. size_t m_minibatchSize; diff --git a/Source/Readers/ReaderLib/ReaderShim.cpp b/Source/Readers/ReaderLib/ReaderShim.cpp index ef61cf13f..0dad551cd 100644 --- a/Source/Readers/ReaderLib/ReaderShim.cpp +++ b/Source/Readers/ReaderLib/ReaderShim.cpp @@ -17,12 +17,13 @@ #define DATAREADER_EXPORTS // creating the exports here #include "DataReader.h" #include "ReaderShim.h" +#include "DataTransferer.h" namespace Microsoft { namespace MSR { namespace CNTK { template ReaderShim::ReaderShim(ReaderFactory factory) - : m_factory(factory) + : m_factory(factory), m_deviceId(CPUDEVICE), m_dataTransferers(2, DataTransfererPtr()), m_currentDataTransferIndex(0) { } @@ -75,23 +76,57 @@ void ReaderShim::StartDistributedMinibatchLoop( config.m_totalEpochSizeInSamples = requestedEpochSamples; config.m_epochIndex = epoch; + // Let's check that there is no outstanding copies. + // Wait on all events if there are any pending copy operations are in flight. + if (m_dataTransferers[m_currentDataTransferIndex]) + m_dataTransferers[m_currentDataTransferIndex]->WaitForCopyCPUToGPU(); + + // Now we can be sure, no prefetch thread is running and there are no outstanding memcopies. + // Let's check that requested devices are ok and see whether we need to change our data transferers. + auto device = std::find_if(inputs.begin(), inputs.end(), + [](const InputStreamDescription& d) { return d.m_deviceId != CPUDEVICE; }); + auto deviceId = device != inputs.end() ? device->m_deviceId : CPUDEVICE; + + // Check that all devices either the same as m_deviceId or CPU. + auto secondDevice = std::find_if(inputs.begin(), inputs.end(), + [deviceId](const InputStreamDescription& d) { return d.m_deviceId != CPUDEVICE && d.m_deviceId != deviceId; }); + if (secondDevice != inputs.end()) + { + LogicError("Readers do not support running on several GPUs in the same process, at least two devices found '%d', '%d'", deviceId, secondDevice->m_deviceId); + } + + if (m_deviceId != deviceId) + { + // Device changed. Let's change the data transferers. + m_deviceId = deviceId; + m_dataTransferers.clear(); + // We need two in order to support two operations in flight. + m_dataTransferers.push_back(m_deviceId == CPUDEVICE ? nullptr : CreatePrefetchDataTransferer(m_deviceId)); + m_dataTransferers.push_back(m_deviceId == CPUDEVICE ? nullptr : CreatePrefetchDataTransferer(m_deviceId)); + } + + // Let's create the buffers for the prefetch thread. std::map inputDescriptions; for (const auto& i : inputs) + { inputDescriptions[i.m_name] = i.m_deviceId; + m_prefetchBuffers[i.m_name] = StreamPrefetchBuffer{ std::make_shared>(i.m_deviceId), nullptr }; + } - m_reader->StartEpoch(config, inputDescriptions); m_endOfEpoch = false; + m_reader->StartEpoch(config, inputDescriptions); // Starting the prefetch task. There is always a single async read in flight. - // When the network requests a new minibatch, we wait for the current async to finish, - // return the result and kick off a new one. - m_prefetchTask = std::async(m_launchType, [this]() + // When the network requests a new minibatch, we wait for the current async to finish, swap the buffers + // and kick off the new prefetch. + m_prefetchTask = std::async(m_launchType, + [this]() { - return m_reader->ReadMinibatch(); + return PrefetchMinibatch(); }); } -string EnumerateInputs(const map &nameToStreamId) +string EnumerateInputs(const unordered_map& nameToStreamId) { // TODO use boost::algorithm::join, boost::adapters::transformed, make this a generic function std::stringstream str; @@ -111,110 +146,152 @@ string EnumerateInputs(const map &nameToStreamId) template bool ReaderShim::GetMinibatch(StreamMinibatchInputs& matrices) { - // TODO: verify that the set of matrix names is identical + // TODO: verify that the set of matrix names is identical // to the set of reader input names. Warn if it's a subset, throw // if it's a superset. - if (m_endOfEpoch) { return false; } + //TODO: Set proper format on matrices? + // Check that all matrices have the same device id. - // If not we should inject the IMemoryProvider per stream. + // If not we should inject the MemoryProvider per stream. int deviceId = matrices.begin()->second.matrix->GetDeviceId(); for (auto mx : matrices) assert(mx.second.matrix->GetDeviceId() == deviceId), UNUSED(deviceId); + // Do sanity checks: requested streams should be exposed by low level deserializers. + for (const auto& mx : matrices) + { + if (m_nameToStreamId.find(mx.first) == m_nameToStreamId.end()) + { + string inputNames = EnumerateInputs(m_nameToStreamId); + RuntimeError("Could not map input '%ls' to the reader. Reader outputs only [%s].", + mx.first.c_str(), inputNames.c_str()); + } + } + + // Make sure the prefetch has finished. assert(m_prefetchTask.valid()); + auto result = m_prefetchTask.get(); - Minibatch minibatch = m_prefetchTask.get(); - if (minibatch.m_endOfEpoch) + // Ok, prefetch is done. + m_endOfEpoch = result.m_isEndOfEpoch; + if (m_endOfEpoch && !result.m_isDataAvailable) { - m_endOfEpoch = true; - if (minibatch.m_data.empty()) - { - return false; - } + // No data and end of epoch, simply return. + return false; } - // Reset stale mb layouts. - // BUGBUG: This seems incorrect. (1) layouts should all be updated below, and (2) some of these layouts are the same, we are resetting them twice. - for (const auto& iter : matrices) + // Remember current data transfer, async memcpy for it already started on the prefetch thread. + auto currentDataTransfer = m_currentDataTransferIndex; + + // Let's update the current data transferer. + m_currentDataTransferIndex = (m_currentDataTransferIndex + 1) % 2; + + // We need to make sure that the compute for the current transfer is finished before we start prefetch. + if (m_dataTransferers[m_currentDataTransferIndex]) + m_dataTransferers[m_currentDataTransferIndex]->RecordComputeStreamSyncPoint(); + + // We have some data - let's swap the matrices. + // We cannot simply change pointers because it seems they are remembered deeper in the network. + for (auto i = matrices.begin(); i != matrices.end(); ++i) { - iter.second.pMBLayout->Init(1, 0); + std::swap(i->second.GetMatrix(), *m_prefetchBuffers[i->first].m_matrix); + + // Resetting layouts. + i->second.pMBLayout->Init(1, 0); } - // a map to generate error messages when checking layout constraints. + // a map to generate error messages when checking layout constraints. map layoutToInputMap; - if (!minibatch.m_data.empty()) + + // Let's now check the layouts and throw if the same layout is beeing assigned twice. + for (auto i = matrices.begin(); i != matrices.end(); ++i) { - // TODO: Use alternating pinned buffer in the packer, do not copy anything, but pack into the pinned memory. - // Copy returned minibatch to the matrices. - for (const auto& mx : matrices) + auto streamLayout = m_prefetchBuffers[i->first].m_mbLayout; + auto& layout = i->second.pMBLayout; + if (layout->GetNumCols() == 0) // just initialized, let's take the layout of the reader. { - if (m_nameToStreamId.find(mx.first) == m_nameToStreamId.end()) - { - string inputNames = EnumerateInputs(m_nameToStreamId); - RuntimeError("Could not map input '%ls' to the reader. Reader outputs only [%s].", - mx.first.c_str(), inputNames.c_str()); - } - - size_t streamId = m_nameToStreamId[mx.first]; - - const auto& stream = minibatch.m_data[streamId]; - - m_numParallelSequences = stream->m_layout->GetNumParallelSequences(); - - // This assert no longer holds - different inputs have different sequence lengths, resulting in different number - // of parallel samples. - // assert(m_numParallelSequences == minibatch.m_data.front()->m_layout->GetNumParallelSequences()); - - auto& layout = mx.second.pMBLayout; - - if (layout->GetNumCols() == 0) - { - // layout is empty, copy layout info from the reader - layout->CopyFrom(stream->m_layout, /*keepName*/ true); - layoutToInputMap[layout->GetAxisName()] = mx.first; - } - else if (*layout != *stream->m_layout) // this does a deep value-level comparison - { - RuntimeError("Dynamic axis layout '%ls' is shared between inputs '%ls' and '%ls', but layouts generated " - "from the input data are incompatible on this axis. Are you using different sequence lengths? " - "Did you consider adding a DynamicAxis() to the Input nodes?", - layout->GetAxisName(), layoutToInputMap[layout->GetAxisName()].c_str(), mx.first.c_str()); - } - - size_t sampleSize = m_streams[streamId]->m_sampleLayout->GetNumElements(); - auto& matrix = matrices.GetInputMatrix(mx.first); - FillMatrixFromStream(m_streams[streamId]->m_storageType, &matrix, sampleSize, stream); + // layout is empty, copy layout info from the reader + layout->CopyFrom(streamLayout, /*keepName*/ true); + layoutToInputMap[layout->GetAxisName()] = i->first; + } + else if (*layout != *streamLayout) // this does a deep value-level comparison + { + RuntimeError("Dynamic axis layout '%ls' is shared between inputs '%ls' and '%ls', but layouts generated " + "from the input data are incompatible on this axis. Are you using different sequence lengths? " + "Did you consider adding a DynamicAxis() to the Input nodes?", + layout->GetAxisName(), layoutToInputMap[layout->GetAxisName()].c_str(), i->first.c_str()); } } + // Number of logical sequences should be the same across all streams. + // So pick up the first one. + m_numParallelSequences = matrices.begin()->second.pMBLayout->GetNumParallelSequences(); + + + // It is time to issue the next prefetch. if (!m_endOfEpoch) { // Starting the prefetch task. There is always a single async read in flight. - // When the network requests a new minibatch, we wait for the current async to finish, - // return the result and kick off a new one. - m_prefetchTask = std::async(m_launchType, [this]() - { - return m_reader->ReadMinibatch(); - }); + // When the network requests a new minibatch, we wait for the current async to finish, swap the buffers + // and kick off the new prefetch. + m_prefetchTask = std::async(m_launchType, [this]() { return PrefetchMinibatch(); }); } - return !minibatch.m_data.empty(); + // Let's wait till the previous memcopy has finished. + if (m_dataTransferers[currentDataTransfer]) + m_dataTransferers[currentDataTransfer]->WaitForCopyCPUToGPU(); + + return result.m_isDataAvailable; } template -/*static*/ void ReaderShim::FillMatrixFromStream(StorageType type, Matrix* matrix, size_t numRows, const StreamMinibatchPtr& stream) +typename ReaderShim::PrefetchResult ReaderShim::PrefetchMinibatch() +{ + Minibatch minibatch = m_reader->ReadMinibatch(); + + // If there is no data we can simply return. + if (minibatch.m_data.empty()) + return PrefetchResult{ minibatch.m_endOfEpoch, false }; + + // Ok we have some data. Let's load it to GPU. + // But before we need to make sure that corresponding compute has already finished from the last iteration. + + // We need to make sure that the compute for the current transfer is finished before we start prefetch. + if (m_dataTransferers[m_currentDataTransferIndex]) + m_dataTransferers[m_currentDataTransferIndex]->WaitForSyncPointOnAssignStreamAsync(); + + for (auto& mx : m_prefetchBuffers) + { + size_t streamId = m_nameToStreamId[mx.first]; + const auto& stream = minibatch.m_data[streamId]; + mx.second.m_mbLayout = stream->m_layout; + + size_t sampleSize = m_streams[streamId]->m_sampleLayout->GetNumElements(); + FillMatrixFromStream(m_streams[streamId]->m_storageType, mx.second.m_matrix.get(), sampleSize, stream, m_dataTransferers[m_currentDataTransferIndex].get()); + } + + // Let's record that we started the copy, so that the main thread can wait afterwards. + if (m_dataTransferers[m_currentDataTransferIndex]) + m_dataTransferers[m_currentDataTransferIndex]->RecordCPUToGPUCopy(); + + return PrefetchResult{ minibatch.m_endOfEpoch, true }; +} + + +template +/*static*/ void ReaderShim::FillMatrixFromStream(StorageType type, Matrix* matrix, size_t numRows, const StreamMinibatchPtr& stream, DataTransferer* transferer) { size_t numCols = stream->m_layout->GetNumCols(); if (type == StorageType::dense) { auto data = reinterpret_cast(stream->m_data); - matrix->SetValue(numRows, numCols, matrix->GetDeviceId(), const_cast(data), matrixFlagNormal); + matrix->SetValue(numRows, numCols, matrix->GetDeviceId(), const_cast(data), matrixFlagNormal, transferer); } else if (type == StorageType::sparse_csc) { @@ -225,12 +302,10 @@ template ElemType* values = reinterpret_cast(data + 1); IndexType* rows = reinterpret_cast(values + nnzCount); IndexType* columns = reinterpret_cast(rows + nnzCount); - matrix->SetMatrixFromCSCFormat(columns, rows, values, nnzCount, numRows, numCols); + matrix->SetMatrixFromCSCFormat(columns, rows, values, nnzCount, numRows, numCols, transferer); } - else - { + else RuntimeError("Storage type %d is not supported.", (int)type); - } } template diff --git a/Source/Readers/ReaderLib/ReaderShim.h b/Source/Readers/ReaderLib/ReaderShim.h index 93868e72d..e78c66672 100644 --- a/Source/Readers/ReaderLib/ReaderShim.h +++ b/Source/Readers/ReaderLib/ReaderShim.h @@ -8,10 +8,10 @@ #pragma once -#include +#include #include -#include "DataReader.h" #include +#include "DataReader.h" #include "Reader.h" namespace CNTK @@ -40,6 +40,8 @@ public: virtual void Destroy() override { // Make sure there are no outstanding reads. + // Future destructor does not wait as of 2013 so probably it is not in VS2013: + // More info can be found here http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2013/n3679.html. if (m_prefetchTask.valid()) { // If there are some, give them time to finish. @@ -76,18 +78,53 @@ public: virtual size_t GetNumParallelSequencesForFixingBPTTMode() override; private: - std::future m_prefetchTask; + struct PrefetchResult + { + bool m_isEndOfEpoch; + bool m_isDataAvailable; + }; + + PrefetchResult PrefetchMinibatch(); + + std::future m_prefetchTask; ReaderPtr m_reader; ReaderFactory m_factory; bool m_endOfEpoch; size_t m_numParallelSequences; - std::map m_nameToStreamId; + std::unordered_map m_nameToStreamId; std::vector m_streams; launch m_launchType; - static void FillMatrixFromStream(StorageType type, Matrix* matrix, size_t numRows, const StreamMinibatchPtr& stream); + // Data structure required for prefetch. + struct StreamPrefetchBuffer + { + std::shared_ptr> m_matrix; + MBLayoutPtr m_mbLayout; + }; + + // Intermediate buffer where the prefetch thread puts its data to. + // When the main thread enters GetMinibatch it swaps the matrices from this buffer, + // triggers the next prefetch and waits if memCpy is still in progress. + std::unordered_map m_prefetchBuffers; + + // Alternating data transfer operations. In the current version these are only two - + // currently waiting on the main thread and the one that can be started by the prefetch thread + // in the meantime. + std::vector m_dataTransferers; + // Current data transfer. Flips 0 and 1. + size_t m_currentDataTransferIndex; + + // Device id. + int m_deviceId; + + static void FillMatrixFromStream( + StorageType type, + Matrix* matrix, + size_t numRows, + const StreamMinibatchPtr& stream, + DataTransferer* transferer); }; }}} diff --git a/Source/Readers/ReaderLib/SequencePacker.cpp b/Source/Readers/ReaderLib/SequencePacker.cpp index 9cedbae28..5ac0aeaa1 100644 --- a/Source/Readers/ReaderLib/SequencePacker.cpp +++ b/Source/Readers/ReaderLib/SequencePacker.cpp @@ -47,6 +47,8 @@ Minibatch SequencePacker::ReadMinibatch() return minibatch; } + auto& currentBuffer = m_streamBuffers[m_currentBufferIndex]; + assert(m_outputStreamDescriptions.size() == batch.size()); for (int streamIndex = 0; streamIndex < batch.size(); ++streamIndex) @@ -62,7 +64,7 @@ Minibatch SequencePacker::ReadMinibatch() auto pMBLayout = (type == StorageType::dense) ? PackDenseStream(streamBatch, streamIndex) : PackSparseStream(streamBatch, streamIndex); - auto& buffer = m_streamBuffers[streamIndex]; + auto& buffer = currentBuffer[streamIndex]; auto streamMinibatch = std::make_shared(); streamMinibatch->m_data = buffer.m_data.get(); @@ -70,6 +72,7 @@ Minibatch SequencePacker::ReadMinibatch() minibatch.m_data.push_back(streamMinibatch); } + m_currentBufferIndex = (m_currentBufferIndex + 1) % m_numberOfBuffers; return minibatch; } @@ -106,7 +109,7 @@ MBLayoutPtr SequencePacker::PackDenseStream(const StreamBatch& batch, size_t str { assert(m_outputStreamDescriptions[streamIndex]->m_storageType == StorageType::dense); const auto& stream = m_inputStreamDescriptions[streamIndex]; - auto& buffer = m_streamBuffers[streamIndex]; + auto& buffer = m_streamBuffers[m_currentBufferIndex][streamIndex]; size_t sampleSize = GetSampleSize(m_outputStreamDescriptions[streamIndex]); auto pMBLayout = CreateMBLayout(batch); size_t requiredSize = pMBLayout->GetNumCols() * sampleSize; @@ -208,7 +211,7 @@ MBLayoutPtr SequencePacker::PackSparseStream(const StreamBatch& batch, size_t st nnzCount * (elementSize + indexSize) + indexSize * (pMBLayout->GetNumCols() + 1); - auto& buffer = m_streamBuffers[streamIndex]; + auto& buffer = m_streamBuffers[m_currentBufferIndex][streamIndex]; if (buffer.m_size < requiredSize) { buffer.Resize(requiredSize); diff --git a/Source/Readers/ReaderLib/SequencePacker.h b/Source/Readers/ReaderLib/SequencePacker.h index e7014af13..8ea7e1b79 100644 --- a/Source/Readers/ReaderLib/SequencePacker.h +++ b/Source/Readers/ReaderLib/SequencePacker.h @@ -16,8 +16,9 @@ class SequencePacker : public PackerBase public: SequencePacker( SequenceEnumeratorPtr sequenceEnumerator, - const std::vector& streams) : - PackerBase(sequenceEnumerator, streams) + const std::vector& streams, + size_t numberOfBuffers = 2) : + PackerBase(sequenceEnumerator, streams, numberOfBuffers) {} virtual Minibatch ReadMinibatch() override; diff --git a/Source/Readers/ReaderLib/TruncatedBpttPacker.cpp b/Source/Readers/ReaderLib/TruncatedBpttPacker.cpp index 21952e54d..24fc4ed97 100644 --- a/Source/Readers/ReaderLib/TruncatedBpttPacker.cpp +++ b/Source/Readers/ReaderLib/TruncatedBpttPacker.cpp @@ -107,8 +107,9 @@ struct SequenceBuffer TruncatedBPTTPacker::TruncatedBPTTPacker( SequenceEnumeratorPtr sequenceEnumerator, - const vector& streams) - : PackerBase(sequenceEnumerator, streams), + const vector& streams, + size_t numberOfBuffers) + : PackerBase(sequenceEnumerator, streams, numberOfBuffers), m_truncationSize(0) { auto sparseOutput = find_if(m_outputStreamDescriptions.begin(), m_outputStreamDescriptions.end(), [](const StreamDescriptionPtr& s){ return s->m_storageType == StorageType::sparse_csc; }); @@ -161,13 +162,14 @@ void TruncatedBPTTPacker::StartEpoch(const EpochConfiguration& config, const std m_sequenceBufferPerStream.clear(); // Preparing the buffers. - for (int i = 0; i < m_outputStreamDescriptions.size(); ++i) - { - const auto& stream = m_outputStreamDescriptions[i]; - auto& buffer = m_streamBuffers[i]; - buffer.Resize(m_numParallelSequences * m_truncationSize * GetSampleSize(stream)); - m_sequenceBufferPerStream.push_back(make_shared(m_numParallelSequences)); - } + for (int j = 0; j < m_streamBuffers.size(); ++j) + for (int i = 0; i < m_outputStreamDescriptions.size(); ++i) + { + const auto& stream = m_outputStreamDescriptions[i]; + auto& buffer = m_streamBuffers[j][i]; + buffer.Resize(m_numParallelSequences * m_truncationSize * GetSampleSize(stream)); + m_sequenceBufferPerStream.push_back(make_shared(m_numParallelSequences)); + } } // Filling in the initial set of sequences @@ -200,11 +202,13 @@ Minibatch TruncatedBPTTPacker::ReadMinibatch() } StreamMinibatchPtr m = make_shared(); - m->m_data = m_streamBuffers[streamIndex].m_data.get(); + m->m_data = m_streamBuffers[m_currentBufferIndex][streamIndex].m_data.get(); m->m_layout = m_currentLayouts[streamIndex]; result.m_data.push_back(m); } + m_currentBufferIndex = (m_currentBufferIndex + 1) % m_numberOfBuffers; + return result; } @@ -262,7 +266,7 @@ void TruncatedBPTTPacker::PackSlot(size_t streamIndex, size_t slotIndex, size_t& // Fill in the data from the first sequence in the slot. auto data = slot.FrontSequence(); // Get buffer destination for the current sample. - auto& buffer = m_streamBuffers[streamIndex]; + auto& buffer = m_streamBuffers[m_currentBufferIndex][streamIndex]; auto offset = strideSize * currentTimestep + slotIndex * sampleSize; assert(offset >= 0 && offset < buffer.m_size); char* destination = buffer.m_data.get() + offset; diff --git a/Source/Readers/ReaderLib/TruncatedBpttPacker.h b/Source/Readers/ReaderLib/TruncatedBpttPacker.h index b2498d888..05cbffd98 100644 --- a/Source/Readers/ReaderLib/TruncatedBpttPacker.h +++ b/Source/Readers/ReaderLib/TruncatedBpttPacker.h @@ -22,7 +22,8 @@ class TruncatedBPTTPacker : public PackerBase public: TruncatedBPTTPacker( SequenceEnumeratorPtr sequenceEnumerator, - const std::vector& streams); + const std::vector& streams, + size_t numberOfBuffers = 2); virtual Minibatch ReadMinibatch() override; diff --git a/Tests/UnitTests/ReaderTests/CNTKTextFormatReaderTests.cpp b/Tests/UnitTests/ReaderTests/CNTKTextFormatReaderTests.cpp index 59ace316e..74deafd49 100644 --- a/Tests/UnitTests/ReaderTests/CNTKTextFormatReaderTests.cpp +++ b/Tests/UnitTests/ReaderTests/CNTKTextFormatReaderTests.cpp @@ -557,7 +557,7 @@ BOOST_AUTO_TEST_CASE(CNTKTextFormatReader_duplicate_inputs) 1, // epoch size 1, // mb size 1, // num epochs - 1, + 2, 0, 0, 1),