This commit is contained in:
Eldar Akchurin 2016-09-07 15:46:20 +02:00
Родитель 7c5fb2d7d8
Коммит 773dca2b4c
25 изменённых файлов: 634 добавлений и 188 удалений

Просмотреть файл

@ -287,6 +287,7 @@ MATH_SRC =\
$(SOURCEDIR)/Math/MatrixQuantizerCPU.cpp \
$(SOURCEDIR)/Math/Matrix.cpp \
$(SOURCEDIR)/Math/QuantizedMatrix.cpp \
$(SOURCEDIR)/Math/DataTransferer.cpp \
$(SOURCEDIR)/Math/RNGHandle.cpp \
$(SOURCEDIR)/Math/TensorView.cpp \

Просмотреть файл

@ -175,7 +175,7 @@ namespace CNTK
size_t sampleSize = currentStreamDesc->m_sampleLayout->GetNumElements();
// TODO: Eliminate the unnecessary CPU to CPU copy
ReaderShim<float>::FillMatrixFromStream(currentStreamDesc->m_storageType, dataMatrix.get(), sampleSize, currentStreamMinibatchData);
ReaderShim<float>::FillMatrixFromStream(currentStreamDesc->m_storageType, dataMatrix.get(), sampleSize, currentStreamMinibatchData, nullptr);
minibatchValuePtr = CompositeFunction::GetValueObjectFromCNTKImplMatrixAndMBLayout<float>(sampleShape, *dataMatrix, currentStreamMinibatchData->m_layout, false);
size_t numSamples = currentStreamMinibatchData->m_layout->GetActualNumSamples();

Просмотреть файл

@ -0,0 +1,17 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE.md file in the project root for full license information.
//
#include "stdafx.h"
#include "DataTransferer.h"
#include "GPUDataTransferer.h"
namespace Microsoft { namespace MSR { namespace CNTK {
DataTransfererPtr CreatePrefetchDataTransferer(int deviceId)
{
return std::make_shared<PrefetchGPUDataTransferer>(deviceId);
}
} } }

Просмотреть файл

@ -0,0 +1,68 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE.md file in the project root for full license information.
//
#pragma once
#include <memory>
#ifdef _WIN32
#ifdef MATH_EXPORTS
#define MATH_API __declspec(dllexport)
#else
#define MATH_API __declspec(dllimport)
#endif
#else // no DLLs on Linux
#define MATH_API
#endif
namespace Microsoft { namespace MSR { namespace CNTK {
// Interface to copy data from cpu to gpu and back.
// This interface allows low granularity operations, so that it is possible to issue several operations and wait on them at the end.
// I.e.
// CopyGPUToCPUAsync
// ... n copy operations started.
// CopyGPUToCPUAsync
// RecordGPUToCPUCopy
// and then WaitForCopyGPUToCPU when all the above asyncs are finished.
// Currently this interface is used during data transfers between CPU and GPU for input data prefetching.
class MATH_API DataTransferer
{
public:
// Asynchronously starts copying data from gpu into cpu buffer on internal stream.
virtual void CopyGPUToCPUAsync(const void* gpuBuffer, size_t numElements, size_t elementSize, void* cpuBuffer) = 0;
// Records event that copies were started on internal stream.
virtual void RecordGPUToCPUCopy() = 0;
// Waits on the event that triggers when all copies have been finished.
virtual void WaitForCopyGPUToCPU() = 0;
// Asynchronously starts copying data from cpu into gpu buffer.
virtual void CopyCPUToGPUAsync(const void* cpuBuffer, size_t numElements, size_t elementSize, void* gpuBuffer) = 0;
// Records event that copies were started on internal stream.
virtual void RecordCPUToGPUCopy() = 0;
// Waits on the event that triggers when all copies have been finished.
virtual void WaitForCopyCPUToGPU() = 0;
// Records an event on a compute stream.
virtual void RecordComputeStreamSyncPoint() = 0;
// Synchronizes GPU to CPU stream with recorded comput sync event.
virtual void WaitForSyncPointOnFetchStreamAsync() = 0;
// Synchronizes CPU to GPU stream with recorded comput sync event.
virtual void WaitForSyncPointOnAssignStreamAsync() = 0;
virtual ~DataTransferer() {}
};
typedef std::shared_ptr<DataTransferer> DataTransfererPtr;
MATH_API DataTransfererPtr CreatePrefetchDataTransferer(int deviceId);
}}}

Просмотреть файл

@ -47,6 +47,92 @@ static
}
}
//// Base class for different data transferers.
GranularGPUDataTransferer::GranularGPUDataTransferer(int deviceId, const cudaStream_t& fetchStream, const cudaStream_t& assignStream, bool blocking)
: m_fetchStream(fetchStream),
m_assignStream(assignStream),
m_deviceId(deviceId),
m_fetchCompleteEvent(nullptr),
m_assignCompleteEvent(nullptr),
m_syncEvent(nullptr)
{
PrepareDevice(m_deviceId);
// Note: Do NOT use cudaEventBlockingSync (which supposedly yields the process)--it will totally break cudaEventSynchronize(), causing it to take 50 or 100 ms randomly.
// NOTE: We never saw this in reading prefetch.
unsigned flags = cudaEventDisableTiming;
if (blocking)
flags |= cudaEventBlockingSync;
// events
cudaEventCreateWithFlags(&m_fetchCompleteEvent, flags) || "cudaEventCreateWithFlags failed";
cudaEventCreateWithFlags(&m_assignCompleteEvent, flags) || "cudaEventCreateWithFlags failed";
cudaEventCreateWithFlags(&m_syncEvent, cudaEventDisableTiming) || "cudaEventCreateWithFlags failed";
}
GranularGPUDataTransferer::~GranularGPUDataTransferer()
{
// TODO: Check for error code and throw if !std::uncaught_exception()
cudaEventDestroy(m_assignCompleteEvent);
cudaEventDestroy(m_fetchCompleteEvent);
cudaEventDestroy(m_syncEvent);
}
void GranularGPUDataTransferer::CopyGPUToCPUAsync(const void* gpuBuffer, size_t numElements, size_t elementSize, void* cpuBuffer)
{
PrepareDevice(m_deviceId);
cudaMemcpyAsync(cpuBuffer, gpuBuffer, numElements * elementSize, cudaMemcpyDeviceToHost, m_fetchStream) || "cudaMemcpyAsync failed";
}
void GranularGPUDataTransferer::RecordGPUToCPUCopy()
{
cudaEventRecord(m_fetchCompleteEvent, m_fetchStream) || "cudaEventRecord failed";
}
void GranularGPUDataTransferer::WaitForCopyGPUToCPU()
{
PrepareDevice(m_deviceId);
cudaEventSynchronize(m_fetchCompleteEvent) || "cudaEventSynchronize failed";
}
void GranularGPUDataTransferer::CopyCPUToGPUAsync(const void* cpuBuffer, size_t numElements, size_t elementSize, void* gpuBuffer)
{
PrepareDevice(m_deviceId);
cudaMemcpyAsync(gpuBuffer, cpuBuffer, numElements * elementSize, cudaMemcpyHostToDevice, m_assignStream) || "cudaMemcpyAsync failed";
}
void GranularGPUDataTransferer::RecordCPUToGPUCopy()
{
cudaEventRecord(m_assignCompleteEvent, m_assignStream) || "cudaEventRecord failed";
}
void GranularGPUDataTransferer::WaitForCopyCPUToGPU()
{
PrepareDevice(m_deviceId);
cudaEventSynchronize(m_assignCompleteEvent) || "cudaEventSynchronize failed";
}
void GranularGPUDataTransferer::RecordComputeStreamSyncPoint()
{
PrepareDevice(m_deviceId);
cudaEventRecord(m_syncEvent, GetStream()) || "cudeEventRecord failed";
}
void GranularGPUDataTransferer::WaitForSyncPointOnFetchStreamAsync()
{
PrepareDevice(m_deviceId);
cudaStreamWaitEvent(m_fetchStream, m_syncEvent, 0 /*flags 'must be 0'*/) || "cudaStreamWaitEvent failed";
}
void GranularGPUDataTransferer::WaitForSyncPointOnAssignStreamAsync()
{
PrepareDevice(m_deviceId);
cudaStreamWaitEvent(m_assignStream, m_syncEvent, 0 /*flags 'must be 0'*/) || "cudaStreamWaitEvent failed";
}
//// GPUDataTransferer
// same but for event
template <class ElemType>
void GPUDataTransferer<ElemType>::SyncEvent(cudaEvent_t ev)
@ -64,80 +150,86 @@ void GPUDataTransferer<ElemType>::SyncEvent(cudaEvent_t ev)
//streams
template <class ElemType>
cudaStream_t GPUDataTransferer<ElemType>::m_fetchStream = NULL;
cudaStream_t GPUDataTransferer<ElemType>::s_fetchStream = NULL;
template <class ElemType>
cudaStream_t GPUDataTransferer<ElemType>::m_assignStream = NULL;
cudaStream_t GPUDataTransferer<ElemType>::s_assignStream = NULL;
template <class ElemType>
cudaStream_t GPUDataTransferer<ElemType>::GetFetchStream()
{
return m_fetchStream;
return s_fetchStream;
}
template <class ElemType>
GPUDataTransferer<ElemType>::GPUDataTransferer(int deviceId, bool useConcurrentStreams)
: m_deviceId(deviceId)
GPUDataTransferer<ElemType>::GPUDataTransferer(int deviceId, bool useConcurrentStreams)
{
PrepareDevice(m_deviceId);
// events
// Note: Do NOT use cudaEventBlockingSync (which supposedly yields the process)--it will totally break cudaEventSynchronize(), causing it to take 50 or 100 ms randomly.
cudaEventCreateWithFlags(&m_fetchCompleteEvent, cudaEventDisableTiming) || "cudaEventCreateWithFlags failed";
cudaEventCreateWithFlags(&m_assignCompleteEvent, cudaEventDisableTiming) || "cudaEventCreateWithFlags failed";
#pragma warning(disable : 4127)
if (useConcurrentStreams && (m_fetchStream == NULL))
if (useConcurrentStreams && (s_fetchStream == NULL))
{
cudaStreamCreateWithFlags(&m_fetchStream, cudaStreamNonBlocking) || "cudaStreamCreateWithFlags failed";
cudaStreamCreateWithFlags(&m_assignStream, cudaStreamNonBlocking) || "cudaStreamCreateWithFlags failed";
cudaStreamCreateWithFlags(&s_fetchStream, cudaStreamNonBlocking) || "cudaStreamCreateWithFlags failed";
cudaStreamCreateWithFlags(&s_assignStream, cudaStreamNonBlocking) || "cudaStreamCreateWithFlags failed";
}
m_inner = make_unique<GranularGPUDataTransferer>(deviceId, s_fetchStream, s_assignStream);
}
template <class ElemType>
GPUDataTransferer<ElemType>::~GPUDataTransferer()
{
// BUGBUG: we don't destroy our streams (they are static variables); we need a static destructor, I am too lazy now
// TODO: Check for error code and throw if !std::uncaught_exception()
cudaEventDestroy(m_assignCompleteEvent);
cudaEventDestroy(m_fetchCompleteEvent);
}
template <class ElemType>
void GPUDataTransferer<ElemType>::CopyGPUToCPUAsync(ElemType* gpuBuffer, size_t numElements, ElemType* cpuBuffer)
{
PrepareDevice(m_deviceId);
cudaMemcpyAsync(cpuBuffer, gpuBuffer, numElements * sizeof(ElemType), cudaMemcpyDeviceToHost, m_fetchStream) || "cudaMemcpyAsync failed";
cudaEventRecord(m_fetchCompleteEvent, m_fetchStream) || "cudaEventRecord failed";
}
template <class ElemType>
void GPUDataTransferer<ElemType>::WaitForCopyGPUToCPUAsync()
{
PrepareDevice(m_deviceId);
SyncEvent(m_fetchCompleteEvent);
m_inner->CopyGPUToCPUAsync(gpuBuffer, numElements, sizeof(ElemType), cpuBuffer);
m_inner->RecordGPUToCPUCopy();
}
template <class ElemType>
void GPUDataTransferer<ElemType>::CopyCPUToGPUAsync(ElemType* cpuBuffer, size_t numElements, ElemType* gpuBuffer)
{
PrepareDevice(m_deviceId);
m_inner->CopyCPUToGPUAsync(cpuBuffer, numElements, sizeof(ElemType), gpuBuffer);
m_inner->RecordCPUToGPUCopy();
}
cudaMemcpyAsync(gpuBuffer, cpuBuffer, numElements * sizeof(ElemType), cudaMemcpyHostToDevice, m_assignStream) || "cudaMemcpyAsync failed";
cudaEventRecord(m_assignCompleteEvent, m_assignStream) || "cudaEventRecord failed";
template <class ElemType>
void GPUDataTransferer<ElemType>::WaitForCopyGPUToCPUAsync()
{
PrepareDevice(m_inner->m_deviceId);
SyncEvent(m_inner->m_fetchCompleteEvent);
}
template <class ElemType>
void GPUDataTransferer<ElemType>::WaitForCopyCPUToGPUAsync()
{
PrepareDevice(m_deviceId);
PrepareDevice(m_inner->m_deviceId);
SyncEvent(m_assignCompleteEvent);
SyncEvent(m_inner->m_assignCompleteEvent);
}
//explicit
template class GPUDataTransferer<float>;
template class GPUDataTransferer<double>;
} } }
/// PrefetchGPUDataTransferer
cudaStream_t PrefetchGPUDataTransferer::s_prefetchStream = nullptr;
cudaStream_t PrefetchGPUDataTransferer::s_prefetchAssignStream = nullptr;
PrefetchGPUDataTransferer::PrefetchGPUDataTransferer(int deviceId) : GranularGPUDataTransferer(deviceId, s_prefetchStream, s_prefetchAssignStream, true)
{
#pragma warning(disable : 4127)
if (s_prefetchStream == nullptr)
{
// Assign stream always stays null, not required for prefetch.
// TODO: Currently the s_prefetchStream is not destroyed.
// As static it can be used in several readers with different lifecycle so we allow it to live till the end.
cudaStreamCreateWithFlags(&s_prefetchStream, cudaStreamNonBlocking) || "cudaStreamCreateWithFlags failed";
}
}
}}}

Просмотреть файл

@ -4,8 +4,8 @@
#include <cuda_runtime_api.h>
#include <cuda.h>
#endif // !CPUONLY
#include <vector>
#include <memory>
#include "Basics.h"
#ifdef _WIN32
#ifndef MATH_API
@ -19,11 +19,63 @@
#define MATH_API
#endif
#include "DataTransferer.h"
namespace Microsoft { namespace MSR { namespace CNTK {
class MATH_API GranularGPUDataTransferer : public DataTransferer
{
public:
#ifndef CPUONLY
GranularGPUDataTransferer(int deviceId, const cudaStream_t& fetchStream, const cudaStream_t& assignStream, bool blocking = false);
#else
GranularGPUDataTransferer() {}
#endif // !CPUONLY
~GranularGPUDataTransferer();
void CopyGPUToCPUAsync(const void* gpuBuffer, size_t numElements, size_t elementSize, void* cpuBuffer) override;
void RecordGPUToCPUCopy() override;
void WaitForCopyGPUToCPU() override;
void CopyCPUToGPUAsync(const void* cpuBuffer, size_t numElements, size_t elementSize, void* gpuBuffer) override;
void RecordCPUToGPUCopy() override;
void WaitForCopyCPUToGPU() override;
void RecordComputeStreamSyncPoint() override;
void WaitForSyncPointOnFetchStreamAsync() override;
void WaitForSyncPointOnAssignStreamAsync() override;
#ifndef CPUONLY
private:
// Not owned by this class, are always injected.
const cudaStream_t& m_fetchStream;
const cudaStream_t& m_assignStream;
protected:
mutable cudaEvent_t m_fetchCompleteEvent;
mutable cudaEvent_t m_assignCompleteEvent;
mutable cudaEvent_t m_syncEvent;
#endif // !CPUONLY
protected:
int m_deviceId;
// Disallow copy and move construction and assignment
DISABLE_COPY_AND_MOVE(GranularGPUDataTransferer);
template <class ElemType>
friend class GPUDataTransferer;
};
template <class ElemType>
class MATH_API GPUDataTransferer
{
#pragma warning(push)
#pragma warning(disable : 4251) // Using std::unique pointer on the dll boundary.
std::unique_ptr<GranularGPUDataTransferer> m_inner;
#pragma warning(pop)
public:
GPUDataTransferer(int deviceId, bool useConcurrentStreams);
~GPUDataTransferer();
@ -32,9 +84,9 @@ public:
DISABLE_COPY_AND_MOVE(GPUDataTransferer);
void CopyGPUToCPUAsync(ElemType* gpuBuffer, size_t numElements, ElemType* cpuBuffer);
void WaitForCopyGPUToCPUAsync();
void CopyCPUToGPUAsync(ElemType* cpuBuffer, size_t numElements, ElemType* gpuBuffer);
void WaitForCopyGPUToCPUAsync();
void WaitForCopyCPUToGPUAsync();
#ifndef CPUONLY
@ -44,17 +96,24 @@ public:
private:
#ifndef CPUONLY
static void SyncEvent(cudaEvent_t ev);
static cudaStream_t s_fetchStream;
static cudaStream_t s_assignStream;
#endif // !CPUONLY
};
class PrefetchGPUDataTransferer : public GranularGPUDataTransferer
{
public:
PrefetchGPUDataTransferer(int deviceId);
private:
#ifndef CPUONLY
static cudaStream_t m_fetchStream;
static cudaStream_t m_assignStream;
static cudaStream_t s_prefetchStream;
static cudaStream_t s_prefetchAssignStream;
#endif
mutable cudaEvent_t m_fetchCompleteEvent;
mutable cudaEvent_t m_assignCompleteEvent;
#endif // !CPUONLY
int m_deviceId;
DISABLE_COPY_AND_MOVE(PrefetchGPUDataTransferer);
};
} } }
}}}

Просмотреть файл

@ -1180,7 +1180,7 @@ void GPUMatrix<ElemType>::SetValue(const GPUSparseMatrix<ElemType>& deepCopyFrom
#endif
template <class ElemType>
void GPUMatrix<ElemType>::SetValue(const size_t numRows, const size_t numCols, int deviceId, ElemType* pArray, size_t matrixFlags)
void GPUMatrix<ElemType>::SetValue(const size_t numRows, const size_t numCols, int deviceId, ElemType* pArray, size_t matrixFlags, DataTransferer* transferer)
{
// handle externally managed case
// BUGBUG: This is super super ugly, and needs to be fixed, but if matrixFlags has the right value, then we can't free anything,
@ -1201,6 +1201,9 @@ void GPUMatrix<ElemType>::SetValue(const size_t numRows, const size_t numCols, i
}
else
{
if (transferer && (matrixFlags & matrixFlagSetValueOnDevice))
RuntimeError("Asynchronous data copy from device to device is currently not supported.");
// if the devices are different move it now
if (GetComputeDeviceId() != deviceId && deviceId >= 0)
{
@ -1217,7 +1220,10 @@ void GPUMatrix<ElemType>::SetValue(const size_t numRows, const size_t numCols, i
{
if (!(matrixFlags & matrixFormatRowMajor))
{
CUDA_CALL(cudaMemcpy(Data(), pArray, sizeof(ElemType) * GetNumElements(), (matrixFlags & matrixFlagSetValueOnDevice) ? cudaMemcpyDeviceToDevice : cudaMemcpyHostToDevice));
if (transferer)
transferer->CopyCPUToGPUAsync(pArray, GetNumElements(), sizeof(ElemType), Data());
else
CUDA_CALL(cudaMemcpy(Data(), pArray, sizeof(ElemType) * GetNumElements(), (matrixFlags & matrixFlagSetValueOnDevice) ? cudaMemcpyDeviceToDevice : cudaMemcpyHostToDevice));
}
else // row major: must transpose (this is not meant to be efficient, but very useful for defining inline matrices for test code)
{
@ -1225,7 +1231,11 @@ void GPUMatrix<ElemType>::SetValue(const size_t numRows, const size_t numCols, i
for (size_t i = 0; i < numRows; i++)
for (size_t j = 0; j < numCols; j++)
transposed[i + numRows * j] = pArray[j + numCols * i];
CUDA_CALL(cudaMemcpy(Data(), transposed.data(), sizeof(ElemType) * GetNumElements(), (matrixFlags & matrixFlagSetValueOnDevice) ? cudaMemcpyDeviceToDevice : cudaMemcpyHostToDevice));
if (transferer)
transferer->CopyCPUToGPUAsync(transposed.data(), GetNumElements(), sizeof(ElemType), Data());
else
CUDA_CALL(cudaMemcpy(Data(), transposed.data(), sizeof(ElemType) * GetNumElements(), (matrixFlags & matrixFlagSetValueOnDevice) ? cudaMemcpyDeviceToDevice : cudaMemcpyHostToDevice));
}
}
}
@ -4521,7 +4531,7 @@ template GPUMatrix<char> GPUMatrix<char>::ColumnSlice(size_t startColumn, size_t
template GPUMatrix<char>& GPUMatrix<char>::operator=(GPUMatrix<char>&&);
template GPUMatrix<char>::GPUMatrix(int);
template void GPUMatrix<char>::SetValue(const char);
template void GPUMatrix<char>::SetValue(const size_t numRows, const size_t numCols, int deviceId, char* pArray, size_t matrixFlags);
template void GPUMatrix<char>::SetValue(const size_t numRows, const size_t numCols, int deviceId, char* pArray, size_t matrixFlags, DataTransferer* transferer);
//template void GPUMatrix<char>::SetValue(CPUMatrix<char> const&);
template void GPUMatrix<char>::SetValue(GPUMatrix<char> const&);
//template void GPUMatrix<char>::SetValue(CPUSparseMatrix<char> const&);
@ -4546,7 +4556,7 @@ template GPUMatrix<short> GPUMatrix<short>::ColumnSlice(size_t startColumn, size
template GPUMatrix<short>& GPUMatrix<short>::operator=(GPUMatrix<short>&&);
template GPUMatrix<short>::GPUMatrix(int);
template void GPUMatrix<short>::SetValue(const short);
template void GPUMatrix<short>::SetValue(const size_t numRows, const size_t numCols, int deviceId, short* pArray, size_t matrixFlags);
template void GPUMatrix<short>::SetValue(const size_t numRows, const size_t numCols, int deviceId, short* pArray, size_t matrixFlags, DataTransferer* transferer);
//template void GPUMatrix<short>::SetValue(CPUMatrix<short> const&);
template void GPUMatrix<short>::SetValue(GPUMatrix<short> const&);
//template void GPUMatrix<short>::SetValue(CPUSparseMatrix<short> const&);

Просмотреть файл

@ -61,6 +61,8 @@ cudaStream_t MATH_API GetStream();
namespace Microsoft { namespace MSR { namespace CNTK {
class DataTransferer;
// -----------------------------------------------------------------------
// SyncGuard -- synchronize around CUDA calls
// -----------------------------------------------------------------------
@ -252,7 +254,7 @@ public:
void SetValue(const GPUMatrix<ElemType>& deepCopyFrom);
//void SetValue(const CPUSparseMatrix<ElemType>& deepCopyFrom);
//void SetValue(const GPUSparseMatrix<ElemType>& deepCopyFrom);
void SetValue(const size_t numRows, const size_t numCols, int deviceId, ElemType* pArray, size_t matrixFlags = matrixFlagNormal);
void SetValue(const size_t numRows, const size_t numCols, int deviceId, ElemType* pArray, size_t matrixFlags = matrixFlagNormal, DataTransferer* transferer = nullptr);
void SetDiagonalValue(const ElemType v);
void SetDiagonalValue(const GPUMatrix<ElemType>& vector);

Просмотреть файл

@ -894,7 +894,7 @@ void GPUSparseMatrix<ElemType>::GetMatrixFromCSRFormat(CPUSPARSE_INDEX_TYPE*& h_
template <class ElemType>
void GPUSparseMatrix<ElemType>::SetMatrixFromCSCFormat(const CPUSPARSE_INDEX_TYPE* h_CSCCol, const CPUSPARSE_INDEX_TYPE* h_Row, const ElemType* h_Val,
const size_t nz, const size_t numRows, const size_t numCols, const bool IsOnDevice /*= false*/, const DEVICEID_TYPE devId /*= -1*/)
const size_t nz, const size_t numRows, const size_t numCols, const bool IsOnDevice /*= false*/, const DEVICEID_TYPE devId /*= -1*/, DataTransferer* transferer /*= nullptr*/)
{
VerifyWritable(__func__);
@ -905,14 +905,29 @@ void GPUSparseMatrix<ElemType>::SetMatrixFromCSCFormat(const CPUSPARSE_INDEX_TYP
SetFormat(matrixFormatSparseCSC);
RequireSizeAndAllocate(numRows, numCols, nz, true, false);
if (transferer && IsOnDevice)
RuntimeError("Currently it is prohibited to copy data asynchronous from device to device.");
// m_nz doesn't exist anymore. How are we going to deal with the NzSize, RowSize, and ColSize? Do it ourselves of course.
cudaMemcpyKind kind = IsOnDevice ? cudaMemcpyDeviceToDevice : cudaMemcpyHostToDevice;
CUDA_CALL(cudaMemcpy(Data(), h_Val, nz * sizeof(ElemType), kind));
if (transferer)
transferer->CopyCPUToGPUAsync(h_Val, nz, sizeof(ElemType), Data());
else
CUDA_CALL(cudaMemcpy(Data(), h_Val, nz * sizeof(ElemType), kind));
if (sizeof(CPUSPARSE_INDEX_TYPE) == sizeof(GPUSPARSE_INDEX_TYPE))
{
CUDA_CALL(cudaMemcpy(RowLocation(), h_Row, sizeof(GPUSPARSE_INDEX_TYPE) * nz, kind));
CUDA_CALL(cudaMemcpy(ColLocation(), h_CSCCol, sizeof(GPUSPARSE_INDEX_TYPE) * (numCols+1), kind));
if (transferer)
{
transferer->CopyCPUToGPUAsync(h_Row, nz, sizeof(GPUSPARSE_INDEX_TYPE), RowLocation());
transferer->CopyCPUToGPUAsync(h_CSCCol, numCols + 1, sizeof(GPUSPARSE_INDEX_TYPE), ColLocation());
}
else
{
CUDA_CALL(cudaMemcpy(RowLocation(), h_Row, sizeof(GPUSPARSE_INDEX_TYPE) * nz, kind));
CUDA_CALL(cudaMemcpy(ColLocation(), h_CSCCol, sizeof(GPUSPARSE_INDEX_TYPE) * (numCols + 1), kind));
}
}
else
{
@ -923,8 +938,16 @@ void GPUSparseMatrix<ElemType>::SetMatrixFromCSCFormat(const CPUSPARSE_INDEX_TYP
ConvertBuffer(pCol, h_CSCCol, (numCols+1));
ConvertBuffer(pRow, h_Row, nz);
CUDA_CALL(cudaMemcpy(RowLocation(), pRow, sizeof(GPUSPARSE_INDEX_TYPE) * nz, kind));
CUDA_CALL(cudaMemcpy(ColLocation(), pCol, sizeof(GPUSPARSE_INDEX_TYPE) * (numCols+1), kind));
if (transferer)
{
transferer->CopyCPUToGPUAsync(pRow, nz, sizeof(GPUSPARSE_INDEX_TYPE), RowLocation());
transferer->CopyCPUToGPUAsync(pCol, numCols + 1, sizeof(GPUSPARSE_INDEX_TYPE), ColLocation());
}
else
{
CUDA_CALL(cudaMemcpy(RowLocation(), pRow, sizeof(GPUSPARSE_INDEX_TYPE) * nz, kind));
CUDA_CALL(cudaMemcpy(ColLocation(), pCol, sizeof(GPUSPARSE_INDEX_TYPE) * (numCols + 1), kind));
}
}
}

Просмотреть файл

@ -338,7 +338,7 @@ public:
void SetMatrixFromCSRFormat(const CPUSPARSE_INDEX_TYPE* h_CSRRow, const CPUSPARSE_INDEX_TYPE* h_Col, const ElemType* h_Val,
const size_t nz, const size_t numRows, const size_t numCols, const bool IsOnDevice = false, const DEVICEID_TYPE devId = -1);
void SetMatrixFromCSCFormat(const CPUSPARSE_INDEX_TYPE* h_CSCCol, const CPUSPARSE_INDEX_TYPE* h_Row, const ElemType* h_Val,
const size_t nz, const size_t numRows, const size_t numCols, const bool IsOnDevice = false, const DEVICEID_TYPE devId = -1);
const size_t nz, const size_t numRows, const size_t numCols, const bool IsOnDevice = false, const DEVICEID_TYPE devId = -1, DataTransferer* transferer = nullptr);
// Gets sparse matrix in CSR format. this acts as deep copy. All passed pointers must be NULL. the function will allocate memory itself.
void GetMatrixFromCSRFormat(CPUSPARSE_INDEX_TYPE*& h_CSRRow, CPUSPARSE_INDEX_TYPE*& h_Col, ElemType*& h_Val, size_t& numElemAllocated, size_t& nz, size_t& numRows, size_t& numCols) const;

Просмотреть файл

@ -1,4 +1,4 @@
<?xml version="1.0" encoding="utf-8"?>
<?xml version="1.0" encoding="utf-8"?>
<Project DefaultTargets="Build" InitialTargets="CheckDependencies" ToolsVersion="12.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<ItemGroup Label="ProjectConfigurations">
<ProjectConfiguration Include="Debug|x64">
@ -171,6 +171,7 @@
<ClInclude Include="ConvolveGeometry.h" />
<ClInclude Include="CPUMatrix.h" />
<ClInclude Include="CPURNGHandle.h" />
<ClInclude Include="DataTransferer.h" />
<ClInclude Include="MatrixQuantizerImpl.h" />
<ClInclude Include="RNGHandle.h" />
<ClInclude Include="RNNCommon.h" />
@ -203,6 +204,7 @@
<ClCompile Include="CPURNGHandle.cpp" />
<ClCompile Include="CPUSparseMatrix.cpp" />
<ClCompile Include="CUDAPageLockedMemAllocator.cpp" />
<ClCompile Include="DataTransferer.cpp" />
<ClCompile Include="dllmain.cpp">
<CompileAsManaged>false</CompileAsManaged>
<PrecompiledHeader>
@ -229,4 +231,4 @@
<Error Condition="'$(MathLibrary)' == 'MKL' And '$(CNTK_MKL_PATH)' == ''" Text="CNTK custom MKL location not specified, see https://github.com/Microsoft/CNTK/wiki/Setup-CNTK-on-Windows#mkl for instructions." />
<Error Condition="'$(MathLibrary)' == 'MKL' And !Exists('$(CNTKCustomMKLPath)')" Text="CNTK custom MKL not found. See https://github.com/Microsoft/CNTK/wiki/Setup-CNTK-on-Windows#mkl for instructions." />
</Target>
</Project>
</Project>

Просмотреть файл

@ -46,8 +46,9 @@
<Filter>CPU</Filter>
</ClCompile>
<ClCompile Include="BlockHandlerSSE.cpp">
<Filter>CPU</Filter>
<Filter>CPU</Filter>
</ClCompile>
<ClCompile Include="DataTransferer.cpp" />
</ItemGroup>
<ItemGroup>
<ClInclude Include="CommonMatrix.h" />
@ -115,18 +116,20 @@
<Filter>RNN</Filter>
</ClInclude>
<ClInclude Include="BlockHandlerAVX.h">
<Filter>CPU</Filter>
<Filter>CPU</Filter>
</ClInclude>
<ClInclude Include="BlockHandlerSSE.h">
<Filter>CPU</Filter>
<Filter>CPU</Filter>
</ClInclude>
<ClInclude Include="BlockMultiplier.h">
<Filter>CPU</Filter>
<Filter>CPU</Filter>
</ClInclude>
<ClInclude Include="BlockMultiplierPlatform.h">
<Filter>CPU</Filter>
<Filter>CPU</Filter>
</ClInclude>
<ClInclude Include="Quantizers.h" />
<ClInclude Include="BlockMultiplierMatrixUtil.h" />
<ClInclude Include="DataTransferer.h" />
</ItemGroup>
<ItemGroup>
<None Include="GPUMatrix.h">
@ -180,4 +183,4 @@
<UniqueIdentifier>{ee6bf704-73f0-488d-8432-0d23f034de88}</UniqueIdentifier>
</Filter>
</ItemGroup>
</Project>
</Project>

Просмотреть файл

@ -1262,15 +1262,16 @@ void Matrix<ElemType>::AssignValuesOf(const Matrix<ElemType>& deepCopyFrom)
}
template <class ElemType>
void Matrix<ElemType>::SetValue(const size_t numRows, const size_t numCols, int deviceId, ElemType* pArray, const size_t matrixFlags)
void Matrix<ElemType>::SetValue(const size_t numRows, const size_t numCols, int deviceId, ElemType* pArray, const size_t matrixFlags, DataTransferer* transferer)
{
if (((numRows * numCols) > 0) && (pArray == nullptr))
InvalidArgument("Invalid pArray.");
// Only gpu matrix supports async data transfers, so data transferer passed only to gpu matrix.
DISPATCH_MATRIX_ON_FLAG(this,
this,
m_CPUMatrix->SetValue(numRows, numCols, pArray, matrixFlags),
m_GPUMatrix->SetValue(numRows, numCols, deviceId, pArray, matrixFlags),
m_GPUMatrix->SetValue(numRows, numCols, deviceId, pArray, matrixFlags, transferer),
NOT_IMPLEMENTED,
NOT_IMPLEMENTED);
}
@ -1289,10 +1290,14 @@ void Matrix<ElemType>::SetValue(const size_t rIdx, const size_t cIdx, ElemType v
// read features
template <class ElemType>
void Matrix<ElemType>::SetMatrixFromCSCFormat(const CPUSPARSE_INDEX_TYPE* h_CSCCol, const CPUSPARSE_INDEX_TYPE* h_Row, const ElemType* h_Val,
const size_t nz, const size_t numRows, const size_t numCols)
const size_t nz, const size_t numRows, const size_t numCols, DataTransferer* transferer)
{
// Note: The current implementation uses the xPUSparseMatrix as temporary space. This allows for memory sharing between calls. If
// xPUSparseMatrix is a view, this code will cause an error during runtime stating that the view is not writable nor resizable.
// Only gpu matrix supports async data transfers, so data transferer passed only to gpu matrix in case we do not need to reassign to dense.
// When we have to reassign sparse to dense we cannot use async operation, because at the time when AssignColumnSliceToDense is called the
// data should already be copied to destination.
DISPATCH_MATRIX_ON_FLAG(this, this,
{
if (!m_CPUSparseMatrix) m_CPUSparseMatrix = make_shared<CPUSparseMatrix<ElemType>>(matrixFormatSparseCSC, numRows, numCols, nz);
@ -1305,7 +1310,7 @@ void Matrix<ElemType>::SetMatrixFromCSCFormat(const CPUSPARSE_INDEX_TYPE* h_CSCC
m_GPUSparseMatrix->AssignColumnSliceToDense(*m_GPUMatrix, 0, numCols);
},
{ m_CPUSparseMatrix->SetMatrixFromCSCFormat(h_CSCCol, h_Row, h_Val, nz, numRows, numCols); },
{ m_GPUSparseMatrix->SetMatrixFromCSCFormat(h_CSCCol, h_Row, h_Val, nz, numRows, numCols); });
{ m_GPUSparseMatrix->SetMatrixFromCSCFormat(h_CSCCol, h_Row, h_Val, nz, numRows, numCols, false, -1, transferer); });
}
template <class ElemType>
@ -5483,7 +5488,7 @@ template void Matrix<char>::TransferToDeviceIfNotThere(int id_to, bool isBeingMo
template size_t Matrix<char>::GetNumRows() const;
template size_t Matrix<char>::GetNumCols() const;
template void Matrix<char>::SetValue(const char);
template void Matrix<char>::SetValue(size_t numRows, const size_t numCols, int deviceId, char* pArray, size_t matrixFlags);
template void Matrix<char>::SetValue(size_t numRows, const size_t numCols, int deviceId, char* pArray, size_t matrixFlags, DataTransferer* transferer);
//template void Matrix<char>::SetValue(const Matrix<char>&, MatrixFormat);
template void Matrix<char>::SetValue(const Matrix<char>&);
template void Matrix<char>::AssignValuesOf (const Matrix<char>&);
@ -5508,7 +5513,7 @@ template void Matrix<short>::TransferToDeviceIfNotThere(int id_to, bool isBeingM
template size_t Matrix<short>::GetNumRows() const;
template size_t Matrix<short>::GetNumCols() const;
template void Matrix<short>::SetValue(const short);
template void Matrix<short>::SetValue(size_t numRows, const size_t numCols, int deviceId, short* pArray, size_t matrixFlags);
template void Matrix<short>::SetValue(size_t numRows, const size_t numCols, int deviceId, short* pArray, size_t matrixFlags, DataTransferer* transferer);
//template void Matrix<short>::SetValue(const Matrix<short>&, MatrixFormat);
template void Matrix<short>::SetValue(const Matrix<short>&);
template void Matrix<short>::AssignValuesOf(const Matrix<short>&);

Просмотреть файл

@ -14,6 +14,7 @@
#include "CommonMatrix.h"
#include "TensorShape.h" // only for SmallVector; I was hoping to keep this out
#include "RNGHandle.h"
#include "DataTransferer.h"
#include <limits.h>
#include <memory> // for shared_ptr
#include <array>
@ -245,7 +246,7 @@ public:
void SetValue (const Matrix<ElemType>& deepCopyFrom);
// AssignValuesOf respects the target matrix's information. It copies the values from the target into the memory of the source.
void AssignValuesOf(const Matrix<ElemType>& deepCopyFrom);
void SetValue(const size_t numRows, const size_t numCols, int deviceId, ElemType* pArray, const size_t matrixFlags = matrixFlagNormal);
void SetValue(const size_t numRows, const size_t numCols, int deviceId, ElemType* pArray, const size_t matrixFlags = matrixFlagNormal, DataTransferer* transferer = nullptr);
void SetValue(const size_t rIdx, const size_t cIdx, ElemType val); // set matrix sparsely
void SetValue(const size_t numRows, const size_t numCols, std::initializer_list<ElemType> l) // SetValue(2,3, {1,2,3, 4,5,6});
{
@ -259,7 +260,7 @@ public:
SetValue(MakeNan(__LINE__));
}
void SetMatrixFromCSCFormat(const CPUSPARSE_INDEX_TYPE* h_CSCCol, const CPUSPARSE_INDEX_TYPE* h_Row, const ElemType* h_Val,
const size_t nz, const size_t numRows, const size_t numCols);
const size_t nz, const size_t numRows, const size_t numCols, DataTransferer* transferer = nullptr);
void MaskColumnsValue(const Matrix<char>& columnsMask, ElemType val);

Просмотреть файл

@ -208,7 +208,7 @@ void GPUSparseMatrix<ElemType>::Reset()
// copy features to GPU matrix
template <class ElemType>
void GPUSparseMatrix<ElemType>::SetMatrixFromCSCFormat(const CPUSPARSE_INDEX_TYPE* h_CSCCol, const CPUSPARSE_INDEX_TYPE* h_Row, const ElemType* h_Val,
const size_t nz, const size_t numRows, const size_t numCols, const bool IsOnDevice /*= false*/, const DEVICEID_TYPE devId /*= -1*/)
const size_t nz, const size_t numRows, const size_t numCols, const bool IsOnDevice /*= false*/, const DEVICEID_TYPE devId /*= -1*/, DataTransferer* transferer)
{
}
@ -1014,7 +1014,7 @@ void GPUMatrix<ElemType>::SetValue(GPUSparseMatrix<ElemType> const&)
#endif
template <class ElemType>
void GPUMatrix<ElemType>::SetValue(const size_t numRows, const size_t numCols, int deviceId, ElemType* pArray, size_t matrixFlags)
void GPUMatrix<ElemType>::SetValue(const size_t numRows, const size_t numCols, int deviceId, ElemType* pArray, size_t matrixFlags, DataTransferer* transferer)
{
}
@ -2187,6 +2187,28 @@ void GPUMatrixComputeStreamEvent::SynchronizeDataTransferFetchStreamWithEvent<do
#pragma region GPUDataTransferer functions
GranularGPUDataTransferer::~GranularGPUDataTransferer() {}
void GranularGPUDataTransferer::CopyGPUToCPUAsync(const void* /*gpuBuffer*/, size_t /*numElements*/, size_t /*elementSize*/, void* /*cpuBuffer*/) {}
void GranularGPUDataTransferer::RecordGPUToCPUCopy() {}
void GranularGPUDataTransferer::WaitForCopyGPUToCPU() {}
void GranularGPUDataTransferer::CopyCPUToGPUAsync(const void* /*cpuBuffer*/, size_t /*numElements*/, size_t /*elementSize*/, void* /*gpuBuffer*/) {}
void GranularGPUDataTransferer::RecordCPUToGPUCopy() {}
void GranularGPUDataTransferer::WaitForCopyCPUToGPU() {}
void GranularGPUDataTransferer::RecordComputeStreamSyncPoint() {}
void GranularGPUDataTransferer::WaitForSyncPointOnFetchStreamAsync() {}
void GranularGPUDataTransferer::WaitForSyncPointOnAssignStreamAsync() {}
PrefetchGPUDataTransferer::PrefetchGPUDataTransferer(int /*deviceId*/) : GranularGPUDataTransferer() {}
template <class ElemType>
GPUDataTransferer<ElemType>::GPUDataTransferer(int, bool)
{

Просмотреть файл

@ -15,8 +15,9 @@ class FramePacker : public SequencePacker
public:
FramePacker(
SequenceEnumeratorPtr sequenceEnumerator,
const std::vector<StreamDescriptionPtr>& streams) :
SequencePacker(sequenceEnumerator, streams)
const std::vector<StreamDescriptionPtr>& streams,
size_t numberOfBuffers = 2) :
SequencePacker(sequenceEnumerator, streams, numberOfBuffers)
{}
private:

Просмотреть файл

@ -39,9 +39,14 @@ void PackerBase::StartEpoch(const EpochConfiguration& config, const std::vector<
if (memoryProviders.size() != m_outputStreamDescriptions.size())
RuntimeError("Number of streams does not match the number of memory providers.");
m_streamBuffers.reserve(m_outputStreamDescriptions.size());
for (size_t i = 0; i < m_outputStreamDescriptions.size(); ++i)
m_streamBuffers.push_back(StreamBuffer(memoryProviders[i]));
m_streamBuffers.resize(m_numberOfBuffers);
for (size_t i = 0; i < m_numberOfBuffers; ++i)
{
auto& currentBuffer = m_streamBuffers[i];
currentBuffer.reserve(m_outputStreamDescriptions.size());
for (size_t j = 0; j < m_outputStreamDescriptions.size(); ++j)
currentBuffer.push_back(StreamBuffer(memoryProviders[j]));
}
}
m_minibatchSize = config.m_minibatchSizeInSamples;
@ -52,11 +57,15 @@ void PackerBase::StartEpoch(const EpochConfiguration& config, const std::vector<
}
PackerBase::PackerBase(SequenceEnumeratorPtr sequenceEnumerator,
const std::vector<StreamDescriptionPtr>& streams) :
const std::vector<StreamDescriptionPtr>& streams,
size_t numberOfBuffers) :
m_sequenceEnumerator(sequenceEnumerator),
m_minibatchSize(0),
m_outputStreamDescriptions(streams)
m_outputStreamDescriptions(streams),
m_numberOfBuffers(numberOfBuffers),
m_currentBufferIndex(0)
{
assert(m_numberOfBuffers >= 1);
m_inputStreamDescriptions = sequenceEnumerator->GetStreamDescriptions();
assert(m_inputStreamDescriptions.size() != 0);
assert(m_inputStreamDescriptions.size() == m_outputStreamDescriptions.size());

Просмотреть файл

@ -33,7 +33,8 @@ protected:
};
PackerBase(SequenceEnumeratorPtr sequenceEnumerator,
const std::vector<StreamDescriptionPtr>& streams);
const std::vector<StreamDescriptionPtr>& streams,
size_t numberOfBuffers);
typedef std::vector<SequenceDataPtr> StreamBatch;
@ -63,8 +64,17 @@ protected:
// Output stream descriptions expected by the network.
std::vector<StreamDescriptionPtr> m_inputStreamDescriptions;
// Buffers for allocated data.
std::vector<StreamBuffer> m_streamBuffers;
// Indicates how many internal buffers with pinned memory are supported.
// If N - then N sequential calls to PackMinibatch are valid, and N+1 call will overwrite
// the memory of the first call.
size_t m_numberOfBuffers;
// Buffers for allocated data. Outer vector size == m_numberOfBuffers,
// inner vector contains buffers for all streams.
std::vector<std::vector<StreamBuffer>> m_streamBuffers;
// Cyclic index of the current buffer. m_currentBufferIndex < m_numberOfBuffers;
size_t m_currentBufferIndex;
// Minibatch size in samples.
size_t m_minibatchSize;

Просмотреть файл

@ -17,12 +17,13 @@
#define DATAREADER_EXPORTS // creating the exports here
#include "DataReader.h"
#include "ReaderShim.h"
#include "DataTransferer.h"
namespace Microsoft { namespace MSR { namespace CNTK {
template <class ElemType>
ReaderShim<ElemType>::ReaderShim(ReaderFactory factory)
: m_factory(factory)
: m_factory(factory), m_deviceId(CPUDEVICE), m_dataTransferers(2, DataTransfererPtr()), m_currentDataTransferIndex(0)
{
}
@ -75,23 +76,57 @@ void ReaderShim<ElemType>::StartDistributedMinibatchLoop(
config.m_totalEpochSizeInSamples = requestedEpochSamples;
config.m_epochIndex = epoch;
// Let's check that there is no outstanding copies.
// Wait on all events if there are any pending copy operations are in flight.
if (m_dataTransferers[m_currentDataTransferIndex])
m_dataTransferers[m_currentDataTransferIndex]->WaitForCopyCPUToGPU();
// Now we can be sure, no prefetch thread is running and there are no outstanding memcopies.
// Let's check that requested devices are ok and see whether we need to change our data transferers.
auto device = std::find_if(inputs.begin(), inputs.end(),
[](const InputStreamDescription& d) { return d.m_deviceId != CPUDEVICE; });
auto deviceId = device != inputs.end() ? device->m_deviceId : CPUDEVICE;
// Check that all devices either the same as m_deviceId or CPU.
auto secondDevice = std::find_if(inputs.begin(), inputs.end(),
[deviceId](const InputStreamDescription& d) { return d.m_deviceId != CPUDEVICE && d.m_deviceId != deviceId; });
if (secondDevice != inputs.end())
{
LogicError("Readers do not support running on several GPUs in the same process, at least two devices found '%d', '%d'", deviceId, secondDevice->m_deviceId);
}
if (m_deviceId != deviceId)
{
// Device changed. Let's change the data transferers.
m_deviceId = deviceId;
m_dataTransferers.clear();
// We need two in order to support two operations in flight.
m_dataTransferers.push_back(m_deviceId == CPUDEVICE ? nullptr : CreatePrefetchDataTransferer(m_deviceId));
m_dataTransferers.push_back(m_deviceId == CPUDEVICE ? nullptr : CreatePrefetchDataTransferer(m_deviceId));
}
// Let's create the buffers for the prefetch thread.
std::map<std::wstring, int> inputDescriptions;
for (const auto& i : inputs)
{
inputDescriptions[i.m_name] = i.m_deviceId;
m_prefetchBuffers[i.m_name] = StreamPrefetchBuffer{ std::make_shared<Matrix<ElemType>>(i.m_deviceId), nullptr };
}
m_reader->StartEpoch(config, inputDescriptions);
m_endOfEpoch = false;
m_reader->StartEpoch(config, inputDescriptions);
// Starting the prefetch task. There is always a single async read in flight.
// When the network requests a new minibatch, we wait for the current async to finish,
// return the result and kick off a new one.
m_prefetchTask = std::async(m_launchType, [this]()
// When the network requests a new minibatch, we wait for the current async to finish, swap the buffers
// and kick off the new prefetch.
m_prefetchTask = std::async(m_launchType,
[this]()
{
return m_reader->ReadMinibatch();
return PrefetchMinibatch();
});
}
string EnumerateInputs(const map<wstring, size_t> &nameToStreamId)
string EnumerateInputs(const unordered_map<wstring, size_t>& nameToStreamId)
{
// TODO use boost::algorithm::join, boost::adapters::transformed, make this a generic function
std::stringstream str;
@ -111,110 +146,152 @@ string EnumerateInputs(const map<wstring, size_t> &nameToStreamId)
template <class ElemType>
bool ReaderShim<ElemType>::GetMinibatch(StreamMinibatchInputs& matrices)
{
// TODO: verify that the set of matrix names is identical
// TODO: verify that the set of matrix names is identical
// to the set of reader input names. Warn if it's a subset, throw
// if it's a superset.
if (m_endOfEpoch)
{
return false;
}
//TODO: Set proper format on matrices?
// Check that all matrices have the same device id.
// If not we should inject the IMemoryProvider per stream.
// If not we should inject the MemoryProvider per stream.
int deviceId = matrices.begin()->second.matrix->GetDeviceId();
for (auto mx : matrices)
assert(mx.second.matrix->GetDeviceId() == deviceId), UNUSED(deviceId);
// Do sanity checks: requested streams should be exposed by low level deserializers.
for (const auto& mx : matrices)
{
if (m_nameToStreamId.find(mx.first) == m_nameToStreamId.end())
{
string inputNames = EnumerateInputs(m_nameToStreamId);
RuntimeError("Could not map input '%ls' to the reader. Reader outputs only [%s].",
mx.first.c_str(), inputNames.c_str());
}
}
// Make sure the prefetch has finished.
assert(m_prefetchTask.valid());
auto result = m_prefetchTask.get();
Minibatch minibatch = m_prefetchTask.get();
if (minibatch.m_endOfEpoch)
// Ok, prefetch is done.
m_endOfEpoch = result.m_isEndOfEpoch;
if (m_endOfEpoch && !result.m_isDataAvailable)
{
m_endOfEpoch = true;
if (minibatch.m_data.empty())
{
return false;
}
// No data and end of epoch, simply return.
return false;
}
// Reset stale mb layouts.
// BUGBUG: This seems incorrect. (1) layouts should all be updated below, and (2) some of these layouts are the same, we are resetting them twice.
for (const auto& iter : matrices)
// Remember current data transfer, async memcpy for it already started on the prefetch thread.
auto currentDataTransfer = m_currentDataTransferIndex;
// Let's update the current data transferer.
m_currentDataTransferIndex = (m_currentDataTransferIndex + 1) % 2;
// We need to make sure that the compute for the current transfer is finished before we start prefetch.
if (m_dataTransferers[m_currentDataTransferIndex])
m_dataTransferers[m_currentDataTransferIndex]->RecordComputeStreamSyncPoint();
// We have some data - let's swap the matrices.
// We cannot simply change pointers because it seems they are remembered deeper in the network.
for (auto i = matrices.begin(); i != matrices.end(); ++i)
{
iter.second.pMBLayout->Init(1, 0);
std::swap(i->second.GetMatrix<ElemType>(), *m_prefetchBuffers[i->first].m_matrix);
// Resetting layouts.
i->second.pMBLayout->Init(1, 0);
}
// a map to generate error messages when checking layout constraints.
// a map to generate error messages when checking layout constraints.
map<wstring, wstring> layoutToInputMap;
if (!minibatch.m_data.empty())
// Let's now check the layouts and throw if the same layout is beeing assigned twice.
for (auto i = matrices.begin(); i != matrices.end(); ++i)
{
// TODO: Use alternating pinned buffer in the packer, do not copy anything, but pack into the pinned memory.
// Copy returned minibatch to the matrices.
for (const auto& mx : matrices)
auto streamLayout = m_prefetchBuffers[i->first].m_mbLayout;
auto& layout = i->second.pMBLayout;
if (layout->GetNumCols() == 0) // just initialized, let's take the layout of the reader.
{
if (m_nameToStreamId.find(mx.first) == m_nameToStreamId.end())
{
string inputNames = EnumerateInputs(m_nameToStreamId);
RuntimeError("Could not map input '%ls' to the reader. Reader outputs only [%s].",
mx.first.c_str(), inputNames.c_str());
}
size_t streamId = m_nameToStreamId[mx.first];
const auto& stream = minibatch.m_data[streamId];
m_numParallelSequences = stream->m_layout->GetNumParallelSequences();
// This assert no longer holds - different inputs have different sequence lengths, resulting in different number
// of parallel samples.
// assert(m_numParallelSequences == minibatch.m_data.front()->m_layout->GetNumParallelSequences());
auto& layout = mx.second.pMBLayout;
if (layout->GetNumCols() == 0)
{
// layout is empty, copy layout info from the reader
layout->CopyFrom(stream->m_layout, /*keepName*/ true);
layoutToInputMap[layout->GetAxisName()] = mx.first;
}
else if (*layout != *stream->m_layout) // this does a deep value-level comparison
{
RuntimeError("Dynamic axis layout '%ls' is shared between inputs '%ls' and '%ls', but layouts generated "
"from the input data are incompatible on this axis. Are you using different sequence lengths? "
"Did you consider adding a DynamicAxis() to the Input nodes?",
layout->GetAxisName(), layoutToInputMap[layout->GetAxisName()].c_str(), mx.first.c_str());
}
size_t sampleSize = m_streams[streamId]->m_sampleLayout->GetNumElements();
auto& matrix = matrices.GetInputMatrix<ElemType>(mx.first);
FillMatrixFromStream(m_streams[streamId]->m_storageType, &matrix, sampleSize, stream);
// layout is empty, copy layout info from the reader
layout->CopyFrom(streamLayout, /*keepName*/ true);
layoutToInputMap[layout->GetAxisName()] = i->first;
}
else if (*layout != *streamLayout) // this does a deep value-level comparison
{
RuntimeError("Dynamic axis layout '%ls' is shared between inputs '%ls' and '%ls', but layouts generated "
"from the input data are incompatible on this axis. Are you using different sequence lengths? "
"Did you consider adding a DynamicAxis() to the Input nodes?",
layout->GetAxisName(), layoutToInputMap[layout->GetAxisName()].c_str(), i->first.c_str());
}
}
// Number of logical sequences should be the same across all streams.
// So pick up the first one.
m_numParallelSequences = matrices.begin()->second.pMBLayout->GetNumParallelSequences();
// It is time to issue the next prefetch.
if (!m_endOfEpoch)
{
// Starting the prefetch task. There is always a single async read in flight.
// When the network requests a new minibatch, we wait for the current async to finish,
// return the result and kick off a new one.
m_prefetchTask = std::async(m_launchType, [this]()
{
return m_reader->ReadMinibatch();
});
// When the network requests a new minibatch, we wait for the current async to finish, swap the buffers
// and kick off the new prefetch.
m_prefetchTask = std::async(m_launchType, [this]() { return PrefetchMinibatch(); });
}
return !minibatch.m_data.empty();
// Let's wait till the previous memcopy has finished.
if (m_dataTransferers[currentDataTransfer])
m_dataTransferers[currentDataTransfer]->WaitForCopyCPUToGPU();
return result.m_isDataAvailable;
}
template <class ElemType>
/*static*/ void ReaderShim<ElemType>::FillMatrixFromStream(StorageType type, Matrix<ElemType>* matrix, size_t numRows, const StreamMinibatchPtr& stream)
typename ReaderShim<ElemType>::PrefetchResult ReaderShim<ElemType>::PrefetchMinibatch()
{
Minibatch minibatch = m_reader->ReadMinibatch();
// If there is no data we can simply return.
if (minibatch.m_data.empty())
return PrefetchResult{ minibatch.m_endOfEpoch, false };
// Ok we have some data. Let's load it to GPU.
// But before we need to make sure that corresponding compute has already finished from the last iteration.
// We need to make sure that the compute for the current transfer is finished before we start prefetch.
if (m_dataTransferers[m_currentDataTransferIndex])
m_dataTransferers[m_currentDataTransferIndex]->WaitForSyncPointOnAssignStreamAsync();
for (auto& mx : m_prefetchBuffers)
{
size_t streamId = m_nameToStreamId[mx.first];
const auto& stream = minibatch.m_data[streamId];
mx.second.m_mbLayout = stream->m_layout;
size_t sampleSize = m_streams[streamId]->m_sampleLayout->GetNumElements();
FillMatrixFromStream(m_streams[streamId]->m_storageType, mx.second.m_matrix.get(), sampleSize, stream, m_dataTransferers[m_currentDataTransferIndex].get());
}
// Let's record that we started the copy, so that the main thread can wait afterwards.
if (m_dataTransferers[m_currentDataTransferIndex])
m_dataTransferers[m_currentDataTransferIndex]->RecordCPUToGPUCopy();
return PrefetchResult{ minibatch.m_endOfEpoch, true };
}
template <class ElemType>
/*static*/ void ReaderShim<ElemType>::FillMatrixFromStream(StorageType type, Matrix<ElemType>* matrix, size_t numRows, const StreamMinibatchPtr& stream, DataTransferer* transferer)
{
size_t numCols = stream->m_layout->GetNumCols();
if (type == StorageType::dense)
{
auto data = reinterpret_cast<const ElemType*>(stream->m_data);
matrix->SetValue(numRows, numCols, matrix->GetDeviceId(), const_cast<ElemType*>(data), matrixFlagNormal);
matrix->SetValue(numRows, numCols, matrix->GetDeviceId(), const_cast<ElemType*>(data), matrixFlagNormal, transferer);
}
else if (type == StorageType::sparse_csc)
{
@ -225,12 +302,10 @@ template <class ElemType>
ElemType* values = reinterpret_cast<ElemType*>(data + 1);
IndexType* rows = reinterpret_cast<IndexType*>(values + nnzCount);
IndexType* columns = reinterpret_cast<IndexType*>(rows + nnzCount);
matrix->SetMatrixFromCSCFormat(columns, rows, values, nnzCount, numRows, numCols);
matrix->SetMatrixFromCSCFormat(columns, rows, values, nnzCount, numRows, numCols, transferer);
}
else
{
else
RuntimeError("Storage type %d is not supported.", (int)type);
}
}
template <class ElemType>

Просмотреть файл

@ -8,10 +8,10 @@
#pragma once
#include <map>
#include <unordered_map>
#include <string>
#include "DataReader.h"
#include <future>
#include "DataReader.h"
#include "Reader.h"
namespace CNTK
@ -40,6 +40,8 @@ public:
virtual void Destroy() override
{
// Make sure there are no outstanding reads.
// Future destructor does not wait as of 2013 so probably it is not in VS2013:
// More info can be found here http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2013/n3679.html.
if (m_prefetchTask.valid())
{
// If there are some, give them time to finish.
@ -76,18 +78,53 @@ public:
virtual size_t GetNumParallelSequencesForFixingBPTTMode() override;
private:
std::future<Minibatch> m_prefetchTask;
struct PrefetchResult
{
bool m_isEndOfEpoch;
bool m_isDataAvailable;
};
PrefetchResult PrefetchMinibatch();
std::future<PrefetchResult> m_prefetchTask;
ReaderPtr m_reader;
ReaderFactory m_factory;
bool m_endOfEpoch;
size_t m_numParallelSequences;
std::map<std::wstring, size_t> m_nameToStreamId;
std::unordered_map<std::wstring, size_t> m_nameToStreamId;
std::vector<StreamDescriptionPtr> m_streams;
launch m_launchType;
static void FillMatrixFromStream(StorageType type, Matrix<ElemType>* matrix, size_t numRows, const StreamMinibatchPtr& stream);
// Data structure required for prefetch.
struct StreamPrefetchBuffer
{
std::shared_ptr<Matrix<ElemType>> m_matrix;
MBLayoutPtr m_mbLayout;
};
// Intermediate buffer where the prefetch thread puts its data to.
// When the main thread enters GetMinibatch it swaps the matrices from this buffer,
// triggers the next prefetch and waits if memCpy is still in progress.
std::unordered_map<std::wstring, StreamPrefetchBuffer> m_prefetchBuffers;
// Alternating data transfer operations. In the current version these are only two -
// currently waiting on the main thread and the one that can be started by the prefetch thread
// in the meantime.
std::vector<DataTransfererPtr> m_dataTransferers;
// Current data transfer. Flips 0 and 1.
size_t m_currentDataTransferIndex;
// Device id.
int m_deviceId;
static void FillMatrixFromStream(
StorageType type,
Matrix<ElemType>* matrix,
size_t numRows,
const StreamMinibatchPtr& stream,
DataTransferer* transferer);
};
}}}

Просмотреть файл

@ -47,6 +47,8 @@ Minibatch SequencePacker::ReadMinibatch()
return minibatch;
}
auto& currentBuffer = m_streamBuffers[m_currentBufferIndex];
assert(m_outputStreamDescriptions.size() == batch.size());
for (int streamIndex = 0; streamIndex < batch.size(); ++streamIndex)
@ -62,7 +64,7 @@ Minibatch SequencePacker::ReadMinibatch()
auto pMBLayout = (type == StorageType::dense) ?
PackDenseStream(streamBatch, streamIndex) : PackSparseStream(streamBatch, streamIndex);
auto& buffer = m_streamBuffers[streamIndex];
auto& buffer = currentBuffer[streamIndex];
auto streamMinibatch = std::make_shared<StreamMinibatch>();
streamMinibatch->m_data = buffer.m_data.get();
@ -70,6 +72,7 @@ Minibatch SequencePacker::ReadMinibatch()
minibatch.m_data.push_back(streamMinibatch);
}
m_currentBufferIndex = (m_currentBufferIndex + 1) % m_numberOfBuffers;
return minibatch;
}
@ -106,7 +109,7 @@ MBLayoutPtr SequencePacker::PackDenseStream(const StreamBatch& batch, size_t str
{
assert(m_outputStreamDescriptions[streamIndex]->m_storageType == StorageType::dense);
const auto& stream = m_inputStreamDescriptions[streamIndex];
auto& buffer = m_streamBuffers[streamIndex];
auto& buffer = m_streamBuffers[m_currentBufferIndex][streamIndex];
size_t sampleSize = GetSampleSize(m_outputStreamDescriptions[streamIndex]);
auto pMBLayout = CreateMBLayout(batch);
size_t requiredSize = pMBLayout->GetNumCols() * sampleSize;
@ -208,7 +211,7 @@ MBLayoutPtr SequencePacker::PackSparseStream(const StreamBatch& batch, size_t st
nnzCount * (elementSize + indexSize) +
indexSize * (pMBLayout->GetNumCols() + 1);
auto& buffer = m_streamBuffers[streamIndex];
auto& buffer = m_streamBuffers[m_currentBufferIndex][streamIndex];
if (buffer.m_size < requiredSize)
{
buffer.Resize(requiredSize);

Просмотреть файл

@ -16,8 +16,9 @@ class SequencePacker : public PackerBase
public:
SequencePacker(
SequenceEnumeratorPtr sequenceEnumerator,
const std::vector<StreamDescriptionPtr>& streams) :
PackerBase(sequenceEnumerator, streams)
const std::vector<StreamDescriptionPtr>& streams,
size_t numberOfBuffers = 2) :
PackerBase(sequenceEnumerator, streams, numberOfBuffers)
{}
virtual Minibatch ReadMinibatch() override;

Просмотреть файл

@ -107,8 +107,9 @@ struct SequenceBuffer
TruncatedBPTTPacker::TruncatedBPTTPacker(
SequenceEnumeratorPtr sequenceEnumerator,
const vector<StreamDescriptionPtr>& streams)
: PackerBase(sequenceEnumerator, streams),
const vector<StreamDescriptionPtr>& streams,
size_t numberOfBuffers)
: PackerBase(sequenceEnumerator, streams, numberOfBuffers),
m_truncationSize(0)
{
auto sparseOutput = find_if(m_outputStreamDescriptions.begin(), m_outputStreamDescriptions.end(), [](const StreamDescriptionPtr& s){ return s->m_storageType == StorageType::sparse_csc; });
@ -161,13 +162,14 @@ void TruncatedBPTTPacker::StartEpoch(const EpochConfiguration& config, const std
m_sequenceBufferPerStream.clear();
// Preparing the buffers.
for (int i = 0; i < m_outputStreamDescriptions.size(); ++i)
{
const auto& stream = m_outputStreamDescriptions[i];
auto& buffer = m_streamBuffers[i];
buffer.Resize(m_numParallelSequences * m_truncationSize * GetSampleSize(stream));
m_sequenceBufferPerStream.push_back(make_shared<SequenceBuffer>(m_numParallelSequences));
}
for (int j = 0; j < m_streamBuffers.size(); ++j)
for (int i = 0; i < m_outputStreamDescriptions.size(); ++i)
{
const auto& stream = m_outputStreamDescriptions[i];
auto& buffer = m_streamBuffers[j][i];
buffer.Resize(m_numParallelSequences * m_truncationSize * GetSampleSize(stream));
m_sequenceBufferPerStream.push_back(make_shared<SequenceBuffer>(m_numParallelSequences));
}
}
// Filling in the initial set of sequences
@ -200,11 +202,13 @@ Minibatch TruncatedBPTTPacker::ReadMinibatch()
}
StreamMinibatchPtr m = make_shared<StreamMinibatch>();
m->m_data = m_streamBuffers[streamIndex].m_data.get();
m->m_data = m_streamBuffers[m_currentBufferIndex][streamIndex].m_data.get();
m->m_layout = m_currentLayouts[streamIndex];
result.m_data.push_back(m);
}
m_currentBufferIndex = (m_currentBufferIndex + 1) % m_numberOfBuffers;
return result;
}
@ -262,7 +266,7 @@ void TruncatedBPTTPacker::PackSlot(size_t streamIndex, size_t slotIndex, size_t&
// Fill in the data from the first sequence in the slot.
auto data = slot.FrontSequence();
// Get buffer destination for the current sample.
auto& buffer = m_streamBuffers[streamIndex];
auto& buffer = m_streamBuffers[m_currentBufferIndex][streamIndex];
auto offset = strideSize * currentTimestep + slotIndex * sampleSize;
assert(offset >= 0 && offset < buffer.m_size);
char* destination = buffer.m_data.get() + offset;

Просмотреть файл

@ -22,7 +22,8 @@ class TruncatedBPTTPacker : public PackerBase
public:
TruncatedBPTTPacker(
SequenceEnumeratorPtr sequenceEnumerator,
const std::vector<StreamDescriptionPtr>& streams);
const std::vector<StreamDescriptionPtr>& streams,
size_t numberOfBuffers = 2);
virtual Minibatch ReadMinibatch() override;

Просмотреть файл

@ -557,7 +557,7 @@ BOOST_AUTO_TEST_CASE(CNTKTextFormatReader_duplicate_inputs)
1, // epoch size
1, // mb size
1, // num epochs
1,
2,
0,
0,
1),