This commit is contained in:
Guoli Ye 2018-09-04 18:11:00 -07:00
Родитель ac09b402a4
Коммит e210f47337
12 изменённых файлов: 33847 добавлений и 6220 удалений

Просмотреть файл

@ -1,631 +0,0 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE.md file in the project root for full license information.
//
#pragma once
#include "IDistGradAggregator.h"
#include "CUDAPageLockedMemAllocator.h"
#include "QuantizedMatrix.h"
#include "MatrixQuantizer.h"
#include "MatrixQuantizerGPU.h"
#include <future>
#include "TimerUtility.h"
namespace Microsoft { namespace MSR { namespace CNTK {
// =======================================================================
// AllReduceDistGradAggregator -- 1-bit SGD.
// This implements
// Frank Seide, Hao Fu, Jasha Droppo, Gang Li, and Dong Yu:
// "1-bit stochastic gradient descent and its application to data-parallel distributed training of speech DNNs"
// In Proc. Interspeech 2014.
// =======================================================================
template <class ElemType>
class AllReduceDistGradAggregator : public IDistGradAggregator<ElemType>
{
struct Stripe
{
size_t m_startCol;
size_t m_numCols;
};
UsingIDistGradAggregatorMembers;
static const int DEBUG_OUTPUT_TRACE_LEVEL = 3;
public:
AllReduceDistGradAggregator(const std::shared_ptr<MPIWrapper>& mpi, int nBits, bool zeroThresholdFor1Bit, bool useQuantizationForSelfStripe, bool useAsyncAggregation, int traceLevel, int syncStatsTrace)
: IDistGradAggregator<ElemType>(mpi), m_numQuantizationBits(nBits), m_zeroThresholdFor1Bit(zeroThresholdFor1Bit), m_useQuantizationForSelfStripe(useQuantizationForSelfStripe),
m_traceLevel(traceLevel), m_initialized(false), m_useAsyncAggregation(useAsyncAggregation), m_bufferedGradHeader(nullptr), m_syncStatsTrace(syncStatsTrace), m_iterationCount(0)
{}
~AllReduceDistGradAggregator()
{
for (size_t i = 0; i < m_recvHeaders.size(); ++i)
DistGradHeader::Destroy(m_recvHeaders[i]);
if (m_bufferedGradHeader != nullptr)
DistGradHeader::Destroy(m_bufferedGradHeader);
}
// Gets the range of columns to be processed by the node with the specified rank
// when parallel processing using 'numNodes' nodes
static Stripe GetStripeForNode(size_t numCols, size_t nodeRank, size_t numNodes)
{
// Determine which stripe of the gradient is this node responsible for
size_t numColsPerNode = numCols / numNodes;
size_t residue = numCols % numNodes;
size_t startColNumofStripe = (numColsPerNode * nodeRank) + min(residue, nodeRank);
size_t numColsinStripe = numColsPerNode + ((nodeRank < residue) ? 1 : 0);
return Stripe({startColNumofStripe, numColsinStripe});
}
void ResetState(const std::vector<Matrix<ElemType>*>& gradients, int numEvalNodes, bool resetState)
{
// When called the first time let's setup the quantizers and matrices for holding quantized values.
// These can live for the lifetime of the aggregator since the gradient matrix dimensions for learnable parameters
// do not change
if (!m_initialized)
{
m_initialized = true;
int deviceId = gradients[0]->GetDeviceId();
if (deviceId != CPUDEVICE)
m_allocator.reset(new CUDAPageLockedMemAllocator(deviceId));
for (size_t i = 0; i < gradients.size(); i++)
{
// Make sure none of the gradient matrices are sparse - we currently do not support aggregation of sparse gradient matrices
if (gradients[i]->GetMatrixType() != DENSE)
RuntimeError("Gradient aggregation for sparse gradient matrices is currently unsupported!");
size_t nRow = gradients[i]->GetNumRows();
size_t nCol = gradients[i]->GetNumCols();
m_preAggGradQuantizers.push_back(std::unique_ptr<MatrixQuantizer<ElemType>>(new MatrixQuantizer<ElemType>(nRow, nCol, deviceId, m_useAsyncAggregation)));
m_gradQuantized.push_back(std::unique_ptr<QuantizedMatrix<ElemType>>(new QuantizedMatrix<ElemType>(nRow, nCol, m_numQuantizationBits, CPUDEVICE, m_allocator.get())));
// Determine which stripe of the gradient is this node responsible for
Stripe stripe = GetStripeForNode(nCol, MyRank(), NumProc());
MatrixQuantizer<ElemType>* currAggGradQuantizer = nullptr;
std::vector<std::unique_ptr<QuantizedMatrix<ElemType>>> currRecvGradStripesQuantized;
if (stripe.m_numCols > 0)
{
currAggGradQuantizer = new MatrixQuantizer<ElemType>(nRow, stripe.m_numCols, deviceId, m_useAsyncAggregation);
for (size_t j = 0; j < NumProc() - 1; ++j)
currRecvGradStripesQuantized.push_back(std::unique_ptr<QuantizedMatrix<ElemType>>(new QuantizedMatrix<ElemType>(nRow, stripe.m_numCols, m_numQuantizationBits, CPUDEVICE, m_allocator.get())));
}
m_aggGradStripeQuantizers.push_back(std::unique_ptr<MatrixQuantizer<ElemType>>(currAggGradQuantizer));
m_recvGradStripesQuantized.push_back(std::move(currRecvGradStripesQuantized));
if (m_useAsyncAggregation)
m_bufferedGradients[gradients[i]].reset(new Matrix<ElemType>(gradients[i]->GetNumRows(), gradients[i]->GetNumCols(), deviceId));
}
if (m_useAsyncAggregation)
{
m_bufferedGradHeader = DistGradHeader::Create(numEvalNodes);
m_bufferedGradHeader->Clear();
}
if (m_mpi->IsMainNode())
{
for (size_t i = 0; i < NumProc() - 1; ++i)
m_recvHeaders.push_back(DistGradHeader::Create(numEvalNodes));
}
}
else if (resetState)
{
// If we are resetting state, let's clear previous quantization residues
// Make sure there is no pending async aggregation
if (m_useAsyncAggregation && m_pendingAsyncAggregation.valid())
LogicError("Unexpected pending async gradient aggregation found when resetting aggregator state!");
for (size_t i = 0; i < m_preAggGradQuantizers.size(); ++i)
m_preAggGradQuantizers[i]->ResetResidue();
for (size_t i = 0; i < m_aggGradStripeQuantizers.size(); ++i)
{
if (m_aggGradStripeQuantizers[i] != nullptr)
m_aggGradStripeQuantizers[i]->ResetResidue();
}
// Zero out the buffered gradients if resetting state
if (m_useAsyncAggregation)
{
for (size_t i = 0; i < gradients.size(); i++)
m_bufferedGradients[gradients[i]]->SetValue(0);
m_bufferedGradHeader->Clear();
}
}
}
// Aggregate the gradient matrices across all nodes
bool AggregateGradients(const std::vector<Matrix<ElemType>*>& gradients, DistGradHeader* headerCPU, bool resetState) override
{
ResetState(gradients, headerCPU->numEvalNode, resetState);
bool showSyncPerfStats = (m_syncStatsTrace > 0) && ((m_iterationCount % m_syncStatsTrace) == 0);
m_iterationCount++;
if (m_useAsyncAggregation)
{
// If we are performing async gradient aggregation, let's wait for the pending gradient aggregation to finish
// then swap the contents of the buffered gradients and the new gradient matrices and fire an async aggreagation
// of the new gradient matrices
if (m_pendingAsyncAggregation.valid())
{
Timer aggregationTimer;
if (showSyncPerfStats)
aggregationTimer.Start();
m_pendingAsyncAggregation.get();
if (showSyncPerfStats)
{
aggregationTimer.Stop();
double gradientAggregationTime = aggregationTimer.ElapsedSeconds();
fprintf(stderr, "Async gradient aggregation wait time: %.6g\n", gradientAggregationTime);
}
}
std::vector<Matrix<ElemType>*> newGradients;
size_t numGradMatrices = gradients.size();
for (size_t i = 0; i < numGradMatrices; i++)
{
Matrix<ElemType>* bufferedGradientMatrix = m_bufferedGradients[gradients[i]].get();
if ((bufferedGradientMatrix == nullptr) ||
(bufferedGradientMatrix->GetNumCols() != gradients[i]->GetNumCols()) ||
(bufferedGradientMatrix->GetNumRows() != gradients[i]->GetNumRows()) ||
(bufferedGradientMatrix->GetDeviceId() != gradients[i]->GetDeviceId()))
{
LogicError("No buffered gradient matrix found corresponding to a gradient matrix to be aggregated!");
}
// Swap the gradient matrix contents with the buffered matrices
std::swap(*(gradients[i]), *bufferedGradientMatrix);
newGradients.push_back(bufferedGradientMatrix);
}
// Swap the grad header contents with the buffered grad header
swap(*headerCPU, *m_bufferedGradHeader);
// Initiate aggregation only if any samples were processed in previous iteration
if (resetState || (headerCPU->numSamples != 0))
{
int deviceId = gradients[0]->GetDeviceId();
DistGradHeader* newGradHeader = m_bufferedGradHeader;
// Since we will be aggregating the gradients asynchronously, let us
// ensure that the gradient matrices have been computed before starting to aggregate
// them asynchronously on another thread. This essentially means that when we are using
// a GPU device, we will synchronize on the main GPU compute stream before starting
// the gradient aggregation asynchronously on a separate stream
MatrixComputeStreamEvent* mainStreamSyncEvent = MatrixComputeStreamEvent::Create(deviceId);
m_pendingAsyncAggregation = std::async(std::launch::async, [=] {
// We are starting on a new thread. Make sure the new thread is
// setup to use the right device
Matrix<ElemType>::SetDevice(deviceId);
// Synchronize the Quantization compute stream with the completion of
// compute of the gradient matrices on the main compute stream
mainStreamSyncEvent->SynchronizeQuantizationComputeStreamWithEvent<ElemType>();
delete mainStreamSyncEvent;
AggregateGradientsImpl(newGradients, newGradHeader, showSyncPerfStats);
});
return true;
}
return false;
}
else
{
AggregateGradientsImpl(gradients, headerCPU, showSyncPerfStats);
return (headerCPU->numSamples != 0);
}
}
void AggregateGradientsImpl(const std::vector<Matrix<ElemType>*>& gradients, DistGradHeader* headerCPU, bool showSyncPerfStats)
{
Timer aggregationTimer;
int deviceId = gradients[0]->GetDeviceId();
if (showSyncPerfStats)
{
std::unique_ptr<MatrixComputeStreamEvent> mainStreamSyncEvent(MatrixComputeStreamEvent::Create(deviceId));
mainStreamSyncEvent->SynchronizeEvent();
aggregationTimer.Start();
}
size_t numGradMatrices = gradients.size();
if (headerCPU->numSamples == 0)
{
assert(headerCPU->criterion == 0.0);
assert(headerCPU->numSamplesWithLabel == 0);
for (int i = 0; i < headerCPU->numEvalNode; ++i)
assert(headerCPU->evalErrors[i].first == 0 && headerCPU->evalErrors[i].second == 0);
// If the current node did not process any samples, the gradients should be zero'd
for (size_t i = 0; i < numGradMatrices; ++i)
gradients[i]->SetValue(0);
if (m_useAsyncAggregation)
{
std::unique_ptr<MatrixComputeStreamEvent> mainStreamSyncEvent(MatrixComputeStreamEvent::Create(deviceId));
mainStreamSyncEvent->SynchronizeQuantizationComputeStreamWithEvent<ElemType>();
}
}
std::vector<std::unique_ptr<Matrix<ElemType>>> aggGradStripes;
std::vector<std::unique_ptr<QuantizedMatrix<ElemType>>> aggGradStripesQuantized;
for (size_t i = 0; i < gradients.size(); i++)
{
size_t nCol = gradients[i]->GetNumCols();
// Determine which stripe of the gradient is this node responsible for
Stripe stripe = GetStripeForNode(nCol, MyRank(), NumProc());
Matrix<ElemType>* currAggGradStripe = nullptr;
QuantizedMatrix<ElemType>* currAggGradStripeQuantized = nullptr;
if (stripe.m_numCols > 0)
{
currAggGradStripe = new Matrix<ElemType>(gradients[i]->ColumnSlice(stripe.m_startCol, stripe.m_numCols));
currAggGradStripeQuantized = new QuantizedMatrix<ElemType>(m_gradQuantized[i]->ColumnSlice(stripe.m_startCol, stripe.m_numCols));
}
aggGradStripes.push_back(std::unique_ptr<Matrix<ElemType>>(currAggGradStripe));
aggGradStripesQuantized.push_back(std::unique_ptr<QuantizedMatrix<ElemType>>(currAggGradStripeQuantized));
}
// Initiate quantization of the gradient matrices
for (size_t i = 0; i < numGradMatrices; ++i)
{
if (m_traceLevel >= DEBUG_OUTPUT_TRACE_LEVEL)
{
char printHeaderBuf[1024];
sprintf(printHeaderBuf, "MPI Rank: %d, Original Gradient Matrix No. %d", (int) MyRank(), (int) i);
PrintMatrix(printHeaderBuf, gradients[i]);
}
m_preAggGradQuantizers[i]->QuantizeAsync(*(gradients[i]), *(m_gradQuantized[i]), m_zeroThresholdFor1Bit);
}
// Initiate receive of the stripe to be aggregated by the current node, from all other nodes
std::vector<MPI_Request> recvGradStripesQuantizedRequests;
std::vector<int> recvRequestIdxToGradientMatrixIdxMap;
for (size_t i = 0; i < numGradMatrices; ++i)
{
Stripe stripe = GetStripeForNode(gradients[i]->GetNumCols(), MyRank(), NumProc());
if (stripe.m_numCols > 0)
{
recvRequestIdxToGradientMatrixIdxMap.push_back(i);
for (size_t j = 0; j < NumProc() - 1; ++j)
{
int source = (j >= MyRank()) ? (j + 1) : j;
recvGradStripesQuantizedRequests.push_back(MPI_Request());
int recvRequestIdx = recvGradStripesQuantizedRequests.size() - 1;
m_mpi->Irecv(m_recvGradStripesQuantized[i][j]->Buffer(), m_recvGradStripesQuantized[i][j]->GetSize(), MPI_CHAR, source, i, &(recvGradStripesQuantizedRequests[recvRequestIdx])) || MpiFail("MPI_Irecv");
}
}
}
// Initiate receive of the header on the main node
std::vector<MPI_Request> recvHeaderRequests(NumProc() - 1);
if (m_mpi->IsMainNode())
{
for (size_t j = 0; j < NumProc() - 1; ++j)
{
int source = (j >= MyRank()) ? (j + 1) : j;
// We use a tag of 'numGradMatrices' for the pre-aggregation header
m_mpi->Irecv(m_recvHeaders[j], m_recvHeaders[j]->Size(), MPI_CHAR, source, numGradMatrices, &(recvHeaderRequests[j])) || MpiFail("MPI_Irecv");
}
}
// Asynchronously send stripes of the quantized gradient matrices to the respective nodes that own aggregation of that stripe
std::vector<std::vector<MPI_Request>> sendGradStripesQuantizedRequests(numGradMatrices);
for (size_t i = 0; i < numGradMatrices; ++i)
{
m_preAggGradQuantizers[i]->WaitQuantizeAsyncDone();
size_t sendRequestIdx = 0;
for (size_t j = 0; j < NumProc(); ++j)
{
Stripe stripe = GetStripeForNode(gradients[i]->GetNumCols(), j, NumProc());
if (stripe.m_numCols > 0)
{
// Do not send stripe for self
if (j != MyRank())
{
sendGradStripesQuantizedRequests[i].push_back(MPI_Request());
QuantizedMatrix<ElemType> quantizedStripe = m_gradQuantized[i]->ColumnSlice(stripe.m_startCol, stripe.m_numCols);
if (m_traceLevel >= DEBUG_OUTPUT_TRACE_LEVEL)
{
char printHeaderBuf[1024];
sprintf(printHeaderBuf, "MPI Rank: %d, Sending Gradient Matrix No. %d slice", (int) MyRank(), (int) i);
const size_t numRowsToPeek = 3;
const size_t numColsToPeek = 3;
size_t numRowsToPrint = (std::min)(numRowsToPeek, quantizedStripe.GetNumRows());
size_t numColsToPrint = (std::min)(numColsToPeek, quantizedStripe.GetNumCols());
quantizedStripe.Print(printHeaderBuf, 0, numRowsToPrint - 1, 0, numColsToPrint - 1);
}
m_mpi->Isend(quantizedStripe.Buffer(), quantizedStripe.GetSize(), MPI_CHAR, j, i, &(sendGradStripesQuantizedRequests[i][sendRequestIdx])) || MpiFail("MPI_Isend");
sendRequestIdx++;
}
else
{
// Initialize the aggregate for the stripe with the quantized gradients instead of the original
// gradients themselves, if so desired
if (m_useQuantizationForSelfStripe)
{
QuantizedMatrix<ElemType> preAggGradSelfStripeQuantized = m_gradQuantized[i]->ColumnSlice(stripe.m_startCol, stripe.m_numCols);
m_aggGradStripeQuantizers[i]->UnquantizeAsync(preAggGradSelfStripeQuantized, *(aggGradStripes[i]), false);
}
}
}
}
}
// Send the headers from all nodes but the main node
MPI_Request sendHeaderRequest;
if (!m_mpi->IsMainNode())
m_mpi->Isend(headerCPU, headerCPU->Size(), MPI_CHAR, m_mpi->MainNodeRank(), numGradMatrices, &sendHeaderRequest) || MpiFail("MPI_Isend");
// Wait for the stripes to arrive from each node and unquantize and aggregate
size_t numReceivesExpected = recvGradStripesQuantizedRequests.size();
size_t numActualReceives = 0;
std::vector<int> perGradMatrixReceiveCount(recvRequestIdxToGradientMatrixIdxMap.size(), 0);
while (numActualReceives < numReceivesExpected)
{
int idx = MPI_UNDEFINED;
m_mpi->Waitany(recvGradStripesQuantizedRequests.size(), recvGradStripesQuantizedRequests.data(), &idx, MPI_STATUS_IGNORE) || MpiFail("MPI_Waitany");
if (idx == MPI_UNDEFINED)
{
break;
}
numActualReceives++;
int gradMatrixIdxPosition = idx / (NumProc() - 1);
int recvBufferSubIndex = idx % (NumProc() - 1);
// Map idx back to the actual gradient matrix index
int gradMatrixIdx = recvRequestIdxToGradientMatrixIdxMap[gradMatrixIdxPosition];
// Wait for the previous Unquantize to finish before issuing a new one
if (m_useQuantizationForSelfStripe || (perGradMatrixReceiveCount[gradMatrixIdxPosition] > 0))
m_aggGradStripeQuantizers[gradMatrixIdx]->WaitUnquantizeAsyncDone();
if (m_traceLevel >= DEBUG_OUTPUT_TRACE_LEVEL)
{
char printHeaderBuf[1024];
sprintf(printHeaderBuf, "MPI Rank: %d, Received Gradient Matrix No. %d slice", (int) MyRank(), gradMatrixIdx);
const size_t numRowsToPeek = 3;
const size_t numColsToPeek = 3;
size_t numRowsToPrint = (std::min)(numRowsToPeek, m_recvGradStripesQuantized[gradMatrixIdx][recvBufferSubIndex]->GetNumRows());
size_t numColsToPrint = (std::min)(numColsToPeek, m_recvGradStripesQuantized[gradMatrixIdx][recvBufferSubIndex]->GetNumCols());
m_recvGradStripesQuantized[gradMatrixIdx][recvBufferSubIndex]->Print(printHeaderBuf, 0, numRowsToPrint - 1, 0, numColsToPrint - 1);
}
m_aggGradStripeQuantizers[gradMatrixIdx]->UnquantizeAsync(*(m_recvGradStripesQuantized[gradMatrixIdx][recvBufferSubIndex]), *(aggGradStripes[gradMatrixIdx]), true);
perGradMatrixReceiveCount[gradMatrixIdxPosition]++;
// Also issue the quantization if this stripe was the last one expected for this matrix
// Note: We issue the quantization without waiting for the unquantization since the same stream
// is used for both and they are implicitly sequenced
// We reuse the buffer that we used for quantizing and sending out the pre-aggregation gradient
if (perGradMatrixReceiveCount[gradMatrixIdxPosition] == (NumProc() - 1))
{
Stripe stripe = GetStripeForNode(gradients[gradMatrixIdx]->GetNumCols(), MyRank(), NumProc());
UNUSED(stripe);
assert(stripe.m_numCols > 0);
m_aggGradStripeQuantizers[gradMatrixIdx]->QuantizeAsync(*(aggGradStripes[gradMatrixIdx]), *(aggGradStripesQuantized[gradMatrixIdx]), m_zeroThresholdFor1Bit);
}
}
assert(numActualReceives == numReceivesExpected);
// On the main node wait for the headers to arrive and aggregate
if (m_mpi->IsMainNode())
{
size_t numNodesHeadersReceivedFrom = 0;
while (numNodesHeadersReceivedFrom < (NumProc() - 1))
{
int idx = MPI_UNDEFINED;
m_mpi->Waitany(recvHeaderRequests.size(), recvHeaderRequests.data(), &idx, MPI_STATUS_IGNORE) || MpiFail("MPI_Waitany");
if (idx == MPI_UNDEFINED)
break;
numNodesHeadersReceivedFrom++;
headerCPU->Aggregate(m_recvHeaders[idx], true);
}
assert(numNodesHeadersReceivedFrom == (NumProc() - 1));
}
std::vector<std::vector<MPI_Request>> recvAggGradStripesQuantizedRequests(numGradMatrices);
// Initiate receive of stripes of quantized aggregated gradients from different nodes
for (size_t i = 0; i < numGradMatrices; ++i)
{
size_t recvRequestIdx = 0;
for (size_t j = 0; j < NumProc(); ++j)
{
// Do not recv stripe for self
if (j != MyRank())
{
Stripe stripe = GetStripeForNode(gradients[i]->GetNumCols(), j, NumProc());
if (stripe.m_numCols > 0)
{
recvAggGradStripesQuantizedRequests[i].push_back(MPI_Request());
QuantizedMatrix<ElemType> quantizedStripe = m_gradQuantized[i]->ColumnSlice(stripe.m_startCol, stripe.m_numCols);
m_mpi->Irecv(quantizedStripe.Buffer(), quantizedStripe.GetSize(), MPI_CHAR, j, numGradMatrices + 1 + i, &(recvAggGradStripesQuantizedRequests[i][recvRequestIdx])) || MpiFail("MPI_Irecv");
recvRequestIdx++;
}
}
}
}
MPI_Request recvAggHeaderRequest;
// Initiate receive of the aggregate header
if (!m_mpi->IsMainNode())
m_mpi->Irecv(headerCPU, headerCPU->Size(), MPI_CHAR, m_mpi->MainNodeRank(), numGradMatrices + 1 + numGradMatrices, &recvAggHeaderRequest) || MpiFail("MPI_Irecv");
// Initiate broadcast of quantized aggregated gradient stripes to all other nodes
std::vector<std::vector<MPI_Request>> sendAggGradStripeQuantizedRequests(numGradMatrices);
for (size_t i = 0; i < numGradMatrices; ++i)
{
Stripe stripe = GetStripeForNode(gradients[i]->GetNumCols(), MyRank(), NumProc());
if (stripe.m_numCols > 0)
{
sendAggGradStripeQuantizedRequests[i] = std::vector<MPI_Request>(NumProc() - 1);
m_aggGradStripeQuantizers[i]->WaitQuantizeAsyncDone();
for (size_t j = 0; j < NumProc() - 1; ++j)
{
int dest = (j >= MyRank()) ? (j + 1) : j;
// TODO: Should we use MPI_Bcast instead for better performance
m_mpi->Isend(aggGradStripesQuantized[i]->Buffer(), aggGradStripesQuantized[i]->GetSize(), MPI_CHAR, dest, numGradMatrices + 1 + i, &(sendAggGradStripeQuantizedRequests[i][j])) || MpiFail("MPI_Irecv");
}
}
}
// Initiate send of the aggregate header from main node
std::vector<MPI_Request> sendAggHeaderRequests(NumProc() - 1);
if (m_mpi->IsMainNode())
{
for (size_t j = 0; j < NumProc() - 1; ++j)
{
int dest = (j >= MyRank()) ? (j + 1) : j;
// TODO: Should we use MPI_Bcast instead for better performance
m_mpi->Isend(headerCPU, headerCPU->Size(), MPI_CHAR, dest, numGradMatrices + 1 + numGradMatrices, &(sendAggHeaderRequests[j])) || MpiFail("MPI_Isend");
}
}
// Wait to receive all aggregated stripes and unquantize
for (size_t i = 0; i < numGradMatrices; ++i)
{
m_mpi->Waitall(recvAggGradStripesQuantizedRequests[i].size(), recvAggGradStripesQuantizedRequests[i].data(), MPI_STATUSES_IGNORE) || MpiFail("MPI_Waitall");
m_preAggGradQuantizers[i]->UnquantizeAsync(*(m_gradQuantized[i]), *(gradients[i]), false);
}
// Wait to receive aggregate header
if (!m_mpi->IsMainNode())
m_mpi->Wait(&recvAggHeaderRequest, MPI_STATUSES_IGNORE) || MpiFail("MPI_Wait");
// Wait for all the unquantizations to finish
for (size_t i = 0; i < numGradMatrices; ++i)
{
m_preAggGradQuantizers[i]->WaitUnquantizeAsyncDone();
if (m_traceLevel >= DEBUG_OUTPUT_TRACE_LEVEL)
{
char printHeaderBuf[1024];
sprintf(printHeaderBuf, "MPI Rank: %d, Aggregated Gradient Matrix No. %d", (int) MyRank(), (int) i);
PrintMatrix(printHeaderBuf, gradients[i]);
}
}
// Wait for completion of the async send requests
for (int i = 0; i < sendGradStripesQuantizedRequests.size(); ++i)
{
if (sendGradStripesQuantizedRequests[i].size() > 0)
m_mpi->Waitall(sendGradStripesQuantizedRequests[i].size(), sendGradStripesQuantizedRequests[i].data(), MPI_STATUSES_IGNORE) || MpiFail("MPI_Waitall");
}
if (!m_mpi->IsMainNode())
m_mpi->Wait(&sendHeaderRequest, MPI_STATUSES_IGNORE) || MpiFail("MPI_Wait");
for (int i = 0; i < sendAggGradStripeQuantizedRequests.size(); ++i)
{
if (sendAggGradStripeQuantizedRequests[i].size() > 0)
m_mpi->Waitall(sendAggGradStripeQuantizedRequests[i].size(), sendAggGradStripeQuantizedRequests[i].data(), MPI_STATUSES_IGNORE) || MpiFail("MPI_Waitall");
}
if (m_mpi->IsMainNode())
m_mpi->Waitall(sendAggHeaderRequests.size(), sendAggHeaderRequests.data(), MPI_STATUSES_IGNORE) || MpiFail("MPI_Waitall");
if (showSyncPerfStats)
{
aggregationTimer.Stop();
double gradientAggregationTime = aggregationTimer.ElapsedSeconds();
fprintf(stderr, "Actual gradient aggregation time: %.6g\n", gradientAggregationTime);
}
}
// Debug helper to print matrix contents
static void PrintMatrix(const char* printHeader, Matrix<ElemType>* matrixToPrint, bool peek = true)
{
if (peek)
{
const size_t numRowsToPeek = 3;
const size_t numColsToPeek = 3;
size_t numRowsToPrint = (std::min)(numRowsToPeek, matrixToPrint->GetNumRows());
size_t numColsToPrint = (std::min)(numColsToPeek, matrixToPrint->GetNumCols());
matrixToPrint->Print(printHeader, 0, numRowsToPrint - 1, 0, numColsToPrint - 1);
}
else
{
matrixToPrint->Print(printHeader);
}
fflush(stderr);
}
private:
std::unique_ptr<CUDAPageLockedMemAllocator> m_allocator;
std::vector<std::unique_ptr<MatrixQuantizer<ElemType>>> m_preAggGradQuantizers;
std::vector<std::unique_ptr<QuantizedMatrix<ElemType>>> m_gradQuantized;
std::vector<std::unique_ptr<MatrixQuantizer<ElemType>>> m_aggGradStripeQuantizers;
std::vector<std::vector<std::unique_ptr<QuantizedMatrix<ElemType>>>> m_recvGradStripesQuantized;
std::vector<DistGradHeader*> m_recvHeaders;
// Number of bits that each gradient value is quantized to before communication
// with other nodes
int m_numQuantizationBits;
// option for handling the mean for 1-bit quantization
// force 1-bit quant to threshold against 0 rather than the midpoint between lower and upper
bool m_zeroThresholdFor1Bit;
// Since the self-stripe in an all-reduce is not communicated, there is really no reason to
// quantize it for reduced communication. However, we add this as an option for for consistency
// across all stripes if desired
bool m_useQuantizationForSelfStripe;
// Perform asynchronous gradient aggregation using double buffering of the gradient matrices
bool m_useAsyncAggregation;
// Future corresponding to the current in-flight async gradient aggregation
std::future<void> m_pendingAsyncAggregation;
// Buffered gradients that we asynchronously aggregate
std::unordered_map<Matrix<ElemType>*, std::unique_ptr<Matrix<ElemType>>> m_bufferedGradients;
DistGradHeader* m_bufferedGradHeader;
int m_traceLevel;
int m_syncStatsTrace;
// Only used for controlling frequency of measuring/showing gradient aggregation perf stats
size_t m_iterationCount;
bool m_initialized;
};
} } }

Просмотреть файл

@ -1,660 +0,0 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE.md file in the project root for full license information.
//
#pragma once
#include <vector>
#include "CNTKLibrary.h"
#include "DistributedLearnerBase.h"
#include <numeric>
#include <iostream>
#include <sstream>
namespace CNTK
{
///
/// Block Momentum Trainer.
///
class BlockMomentumDistributedLearner : public DistributedLearnerBase
{
private:
enum class Action;
friend std::ostream& operator<<(std::ostream& out, const Action action)
{
static std::map<Action, std::string> actionStr;
if (actionStr.size() == 0)
{
actionStr[Action::Aggregate] = "Aggregate";
actionStr[Action::AggregateMetrics] = "AggregateMetrics";
actionStr[Action::Checkpoint] = "Checkpoint";
actionStr[Action::Shutdown] = "Shutdown";
actionStr[Action::Wait] = "Wait";
}
return out << actionStr[action];
}
// Print debug info about synchronization action requested and granted
void DebugPrintSynchronizeInfo(Action requestedAction, Action grantedAction)
{
if (GetTraceLevel() >= TraceLevel::Info)
{
std::ostringstream outString;
outString << "BMUF Rank " << m_communicator->CurrentWorker().m_globalRank << " Action requested " << requestedAction << " Action returned " << grantedAction << std::endl;
std::cerr << outString.str(); //stderr output
}
}
template<class T> using Matrix = Microsoft::MSR::CNTK::Matrix<T>;
public:
BlockMomentumDistributedLearner(
DistributedCommunicatorPtr communicator,
LearnerPtr learner,
size_t distributedAfterSamples,
size_t globalModelAggregationBlockSize,
bool useNesterovMomentum,
bool resetSGDMomentumAfterAggregation,
double blockLearningRate)
: BlockMomentumDistributedLearner(
communicator,
learner,
distributedAfterSamples,
globalModelAggregationBlockSize,
useNesterovMomentum,
resetSGDMomentumAfterAggregation,
blockLearningRate,
Momentum2TimeConstant(1.0 - 1.0 / (double)communicator->Workers().size(), globalModelAggregationBlockSize))
{}
BlockMomentumDistributedLearner(
DistributedCommunicatorPtr communicator,
LearnerPtr learner,
size_t distributedAfterSamples,
size_t globalModelAggregationBlockSize,
bool useNesterovMomentum,
bool resetSGDMomentumAfterAggregation,
double blockLearningRate,
double blockMomentumAsTimeConstant)
: DistributedLearnerBase(communicator, learner, distributedAfterSamples),
m_useNesterovMomentum(useNesterovMomentum),
m_resetSGDMomentumAfterAggregation(resetSGDMomentumAfterAggregation),
m_blockLearningRate(blockLearningRate),
m_blockMomentumAsTimeConstantPerWorker(blockMomentumAsTimeConstant / communicator->Workers().size()),
m_globalModelAggregationBlockSize(globalModelAggregationBlockSize),
m_numSamplesSeenInCurrentBlock(0),
m_endOfDataReached(false),
m_localTotalNumSamplesSeen(0),
m_syncPeriodPerWorker(globalModelAggregationBlockSize / communicator->Workers().size())
{
if (m_syncPeriodPerWorker == 0)
InvalidArgument("Sync period is too small.");
// Need to allocate memory here to make sure not hitting OOM
std::vector<NDArrayViewPtr> parameterValues;
GetParameterValues(learner->Parameters(), parameterValues);
m_blockLevelSmoothedGradient.resize(parameterValues.size());
m_prevParameters.resize(parameterValues.size());
m_tempBlockGradient.resize(parameterValues.size());
Reset(parameterValues);
}
size_t MinibatchSizeScaleFactor() override
{
return m_communicator->Workers().size();
}
bool Update(std::unordered_map<Parameter, NDArrayViewPtr>& gradientValues, MinibatchInfo& info) override
{
// mark start of block before local update
std::vector<NDArrayViewPtr> values;
GetParameterValues(m_learner->Parameters(), values);
// note this is only for the first update, after that SyncBlock handles the bookkeeping
if (!m_prevParamInitialized)
{
Reset(values);
m_prevParamInitialized = true;
}
// do local update first, then block update. Local update would have different gradient for each worker,
// and this order is to make sure all workers got the same model after block update
if (!info.IsEmpty())
{
// For block momentum the number of aggreagate/checkpoints should match, so for now we ignore the return value of local learners.
auto profWeights = Microsoft::MSR::CNTK::ScopeProfile(Microsoft::MSR::CNTK::profilerEvtMainWeights);
m_learner->Update(gradientValues, info.numberOfSamples, info.atEndOfSweep);
// after local update, use the latest model for block update
values.clear();
GetParameterValues(m_learner->Parameters(), values);
}
auto profGradientAgg = Microsoft::MSR::CNTK::ProfilerTimeBegin();
bool updated = PerformDistributedUpdateIfNeeded(values, info);
Microsoft::MSR::CNTK::ProfilerTimeEnd(profGradientAgg, Microsoft::MSR::CNTK::profilerEvtMainGradient);
return updated;
}
// Optionally overridable method to get checkpoint state associated with this Distributed train method
Dictionary CreateCheckpoint() override
{
std::vector<NDArrayViewPtr> values;
GetParameterValues(m_learner->Parameters(), values);
// During checkpoint, other workers could be in aggregation state. Let's allow them to finish aggregation.
Action action;
while ((action = SynchronizeAction(Action::Checkpoint)) != Action::Checkpoint)
{
DebugPrintSynchronizeInfo(Action::Checkpoint, action);
if (action == Action::Wait)
continue;
if (action == Action::Aggregate)
AggregateImpl(values);
else
RuntimeError("Unexpected action received.");
}
DebugPrintSynchronizeInfo(Action::Checkpoint, action);
// Always aggregate before the checkpoint, so prevParameter and m_numSamplesSeenInCurrentBlock don't need to be saved
SynchronizeAction(Action::Aggregate);
AggregateImpl(values);
std::vector<DictionaryValue> serializedSmoothedGradients;
for (auto sg : m_blockLevelSmoothedGradient)
{
serializedSmoothedGradients.push_back(*sg);
}
Dictionary result;
result[L"base"] = DistributedLearnerBase::CreateCheckpoint();
result[L"localTotalNumSamplesSeen"] = m_localTotalNumSamplesSeen;
result[L"blockLevelSmoothedGradient"] = serializedSmoothedGradients;
return result;
}
void RestoreFromCheckpoint(const Dictionary& checkpoint) override
{
DistributedLearnerBase::RestoreFromCheckpoint(checkpoint[L"base"].Value<Dictionary>());
m_localTotalNumSamplesSeen = checkpoint[L"localTotalNumSamplesSeen"].Value<size_t>();
const auto& smoothedGradients = checkpoint[L"blockLevelSmoothedGradient"].Value<std::vector<DictionaryValue>>();
if (m_blockLevelSmoothedGradient.size() != smoothedGradients.size())
RuntimeError("Inconsistent parameter size between learner and checkpoint");
for (size_t i = 0; i < m_blockLevelSmoothedGradient.size(); i++)
{
m_blockLevelSmoothedGradient[i]->CopyFrom(smoothedGradients[i].Value<NDArrayView>());
}
m_prevParamInitialized = false;
}
private:
// Block momentum needs to do aggregation of loss and eval across workers.
virtual void DoAggregateMetricsIfNeeded(NDArrayViewPtr& localTrainingLoss, NDArrayViewPtr& localEvalCriterion) override
{
m_shutDownSeenBefore = false;
// If shutdown has been agreed upon before, then return from metrics aggregation. Other shutdown workers won't be able to sync now.
if (m_communicator->Workers().size() == 1 || m_shutDownSeenBefore)
{
return;
}
Action action;
while ((action = SynchronizeAction(Action::AggregateMetrics)) != Action::AggregateMetrics)
{
DebugPrintSynchronizeInfo(Action::AggregateMetrics, action);
std::vector<NDArrayViewPtr> paramValues;
GetParameterValues(m_learner->Parameters(), paramValues);
switch (action)
{
// Aggregate params first and try for aggregate metrics again
case Action::Aggregate:
AggregateImpl(paramValues);
break;
// Can't do checkpointing here since not called from checkpointing code, so return. Checkpointing will be called again eventually.
case Action::Checkpoint:
return;
// Can't aggregate metrics since others are going in shutdown.
case Action::Shutdown:
m_shutDownSeenBefore = true;
return; // Can't aggregate if another worker is in shutdown mode
}
}
DebugPrintSynchronizeInfo(Action::AggregateMetrics, action);
// Synchronization complete - Start the loss and eval aggregation
float averageTrainingLoss = 0;
if (localTrainingLoss)
{
averageTrainingLoss = localTrainingLoss->AsScalar<float>();
}
float averageEvalCriterion = 0;
if (localEvalCriterion)
{
averageEvalCriterion = localEvalCriterion->AsScalar<float>();
}
NDArrayViewPtr inPlaceAggregateTrainingLoss = std::make_shared<NDArrayView>(averageTrainingLoss, NDShape{}, DeviceDescriptor::CPUDevice());
NDArrayViewPtr inPlaceAggregateEvalCriterion = std::make_shared<NDArrayView>(averageEvalCriterion, NDShape{}, DeviceDescriptor::CPUDevice());
vector<NDArrayViewPtr> inPlaceAggregateVector = { inPlaceAggregateTrainingLoss, inPlaceAggregateEvalCriterion };
m_communicator->AggregateInPlace(inPlaceAggregateVector, m_communicator->Workers());
if (localTrainingLoss)
{
inPlaceAggregateTrainingLoss->SetValue(inPlaceAggregateTrainingLoss->AsScalar<float>() / m_communicator->Workers().size());
localTrainingLoss->CopyFrom(*inPlaceAggregateTrainingLoss);
}
if (localEvalCriterion)
{
inPlaceAggregateEvalCriterion->SetValue(inPlaceAggregateEvalCriterion->AsScalar<float>() / m_communicator->Workers().size());
localEvalCriterion->CopyFrom(*inPlaceAggregateEvalCriterion);
}
}
// Optional override that gets called per minibatch after finishing gradient computation but before updating model parameters
bool PerformDistributedUpdateIfNeeded(std::vector<NDArrayViewPtr>& parameterValues, MinibatchInfo& info)
{
// If the last minibatch, set the end of data state.
if (info.atEndOfData)
m_endOfDataReached = true;
m_localTotalNumSamplesSeen += info.numberOfSamples;
m_sampleCount += info.numberOfSamples;
if (m_distributeAfterSamples > m_sampleCount)
{
if (m_endOfDataReached)
{
// We have not even reached distributed state,
// simply stop processing by returning false.
return false;
}
return true;
}
if (!m_endOfDataReached)
{
m_numSamplesSeenInCurrentBlock += info.numberOfSamples;
if (m_numSamplesSeenInCurrentBlock < m_syncPeriodPerWorker)
return true;
Aggregate(parameterValues);
return true;
}
return Shutdown(parameterValues);
}
// Before doing any work, the distributed learner synchronizes with other learners to
// decide what to do next.
// The priority of actons are:
// 1) If any worker wants to aggregate - aggregation is done.
// 2) If any worker wants to checkpoint and nobody wants to aggregate - checkpointing is done. If anyone wants to aggregate metrics, wait to allow it to come in checkpoint state.
// 3) If all want to shutdown - it means we reached the end of the data and shutdown can be done.If anyone wants to aggregate metrics, wait to allow it to come in shutdown state.
// 4) If all worker wants to aggregate metrics - metrics aggregation is done. Otherwise return aggregate, checkpoint or shutdown if anyone else wants it
// The priority above eliminate resolves situations when some of the workers run out of data
// and other workers require checkpointing or aggregation.
enum class Action
{
Wait, // Waits in the current state without doing anything.
Aggregate,
AggregateMetrics, // Used to allow aggregation of loss and eval metrics.
Checkpoint,
Shutdown
};
void GetParameterValues(const std::vector<Parameter>& parameters, std::vector<NDArrayViewPtr>& result)
{
for (auto p : parameters)
result.push_back(p.Value());
}
void Aggregate(std::vector<NDArrayViewPtr>& parameters)
{
// Synchronization action. Aggregate has the highest priority, so the expected result is aggregate.
Action action = SynchronizeAction(Action::Aggregate);
if (action != Action::Aggregate)
LogicError("Unexpected action during aggregation.");
AggregateImpl(parameters);
}
bool Shutdown(std::vector<NDArrayViewPtr>& parameters)
{
// During shutdown, other workers could be in checkpointing or aggregation state.
// Finished workers should properly behave in this case.
Action action;
while ((action = SynchronizeAction(Action::Shutdown)) != Action::Shutdown)
{
DebugPrintSynchronizeInfo(Action::Shutdown, action);
switch (action)
{
case Action::Aggregate:
AggregateImpl(parameters);
break;
case Action::Checkpoint:
// Somebody still has to call the checkpoint from the outside.
return true;
case Action::Wait:
// Someone is in aggregate metrics. Wait for it to come to shutdown.
continue;
default:
RuntimeError("Unexpected action received.");
}
}
DebugPrintSynchronizeInfo(Action::Shutdown, action);
// Last synchronization
AggregateImpl(parameters);
return false; // Make compiler happy.
}
// Synchronize(Agree) on action before doing it. This is needed to prevent deadlock in MPI.
// Aggregate is highest priority. So AggregateImpl can be called after calling SynchronizeAction(Action::Aggreagte).
// Others need to ask for permission in a loop
Action SynchronizeAction(Action self)
{
assert(self == Action::Checkpoint || self == Action::Aggregate || self == Action::Shutdown || self == Action::AggregateMetrics);
double data[2] = { static_cast<double>(self), static_cast<double>(m_localTotalNumSamplesSeen) };
auto a = std::make_shared<NDArrayView>(DataType::Double, NDShape{ 2 }, &data, sizeof(double) * 2, DeviceDescriptor::CPUDevice());
m_communicator->Concatenate(std::vector<NDArrayViewPtr> { a }, m_actionBuffer, m_communicator->Workers());
assert(m_actionBuffer.size() == 1);
auto buffer = m_actionBuffer.front()->DataBuffer<double>();
auto bufferSize = m_actionBuffer.front()->Shape().TotalSize();
auto bufferEnd = buffer + bufferSize;
std::vector<Action> actions;
actions.reserve(m_communicator->Workers().size());
std::vector<size_t> localNumberOfSamples;
localNumberOfSamples.reserve(m_communicator->Workers().size());
for (const double* start = buffer; start != bufferEnd; start +=2)
{
actions.push_back(static_cast<Action>((int)*start));
localNumberOfSamples.push_back(static_cast<size_t>(*(start + 1)));
}
m_sampleCount = std::accumulate(localNumberOfSamples.begin(), localNumberOfSamples.end(), (size_t)0);
// If all want to aggregate metrics, only then we aggregate metrics.
if (std::all_of(actions.begin(), actions.end(), [](Action c) { return c == Action::AggregateMetrics; }))
return Action::AggregateMetrics;
// If all want to shutdown - we shutdown.
if (std::all_of(actions.begin(), actions.end(), [](Action c) { return c == Action::Shutdown; }))
return Action::Shutdown;
// If all want to checkpoint - we checkpoint.
if (std::all_of(actions.begin(), actions.end(), [](Action c) { return c == Action::Checkpoint; }))
return Action::Checkpoint;
// If all are either in Checkpoint, Shutdown or AggregateMetrics,
// Then AggregateMetrics state has lowest priority. Workers in it return without doing anything. Other workers wait for Aggregate Metrics to come in their state.
// Between Checkpoint and Shutdown, Shutdown has lower priority. Shutdown worker will return and checkpoint worker will wait for others to come in checkpoint state.
if (std::all_of(actions.begin(), actions.end(), [](Action c) { return c == Action::Checkpoint || c == Action::Shutdown || c == Action::AggregateMetrics; }))
{
bool isAnyCheckpoint = std::any_of(actions.begin(), actions.end(), [](Action c) { return c == Action::Checkpoint; });
bool isAnyShutdown = std::any_of(actions.begin(), actions.end(), [](Action c) { return c == Action::Shutdown; });
bool isAnyAggregateMetrics = std::any_of(actions.begin(), actions.end(), [](Action c) { return c == Action::AggregateMetrics; });
if (self == Action::Shutdown)
{
// Do checkpoint first if any other requests checkpoint. Then come back to shutdown.
if (isAnyCheckpoint)
{
return Action::Checkpoint;
}
// Allow the aggregate metrics to come in shutdown state and request again.
if (isAnyAggregateMetrics)
{
return Action::Wait;
}
return Action::Shutdown;
}
else if (self == Action::Checkpoint)
{
// Wait for other in shutdown or aggregate metrics state to come to checkpoint state
if (isAnyShutdown || isAnyAggregateMetrics)
{
return Action::Wait;
}
return Action::Checkpoint;
}
else if (self == Action::AggregateMetrics)
{
// AggregateMetrics can't do aggregate metrics if anyone is in shutdown
if (isAnyShutdown)
{
return Action::Shutdown;
}
// If all others are either metrics aggregate or checkpoint then state returned is checkpoint and we don't do metrics aggregation
return Action::Checkpoint;
}
}
// Otherwise we aggregate. This is given priority by all other workers in checkpoint, shutdown or aggregate metrics states.
return Action::Aggregate;
}
void AggregateImpl(std::vector<NDArrayViewPtr>& parameters)
{
// Let update the weights.
if (parameters.front()->GetDataType() == DataType::Double)
SynchronizeModel<double>(parameters);
else if (parameters.front()->GetDataType() == DataType::Float)
SynchronizeModel<float>(parameters);
else if (parameters.front()->GetDataType() == DataType::Float16)
SynchronizeModel<half>(parameters);
else
RuntimeError("Unsupported type.");
m_numSamplesSeenInCurrentBlock = 0;
if (m_resetSGDMomentumAfterAggregation)
m_learner->ResetSmoothedGradients();
}
Dictionary CreateCheckpointImpl(std::vector<NDArrayViewPtr>& parameters)
{
// During checkpoint, other workers could be in aggregation state. Let's allow them to finish aggregation.
Action action;
while ((action = SynchronizeAction(Action::Checkpoint)) != Action::Checkpoint)
{
DebugPrintSynchronizeInfo(Action::Checkpoint, action);
if (action == Action::Wait)
continue;
if (action == Action::Aggregate)
AggregateImpl(parameters);
else
RuntimeError("Unexpected action received.");
}
DebugPrintSynchronizeInfo(Action::Checkpoint, action);
return DistributedLearnerBase::CreateCheckpoint();
}
bool IsResetRequired(std::vector<NDArrayViewPtr>& parameters) const
{
if (m_prevParameters.size() != parameters.size() ||
m_blockLevelSmoothedGradient.size() != parameters.size())
return true;
for (size_t i = 0; i < parameters.size(); ++i)
{
if (m_prevParameters[i]->Shape() != parameters[i]->Shape() ||
m_prevParameters[i]->Device() != parameters[i]->Device() ||
m_blockLevelSmoothedGradient[i]->Shape() != parameters[i]->Shape() ||
m_blockLevelSmoothedGradient[i]->Device() != parameters[i]->Device())
{
return true;
}
}
return false;
}
void Reset(const std::vector<NDArrayViewPtr>& parameters)
{
for (size_t i = 0; i < parameters.size(); ++i)
{
auto& p = parameters[i];
if (p->GetDataType() == DataType::Double)
ResetBuffer<double>(i, p);
else if (p->GetDataType() == DataType::Float)
ResetBuffer<float>(i, p);
else
RuntimeError("Unsupported type.");
}
}
template<class ElemType>
void ResetBuffer(size_t index, const NDArrayViewPtr& p)
{
auto data = p->GetMatrix<ElemType>();
if (!m_blockLevelSmoothedGradient[index])
{
// has not been initialized yet
auto pSmoothedGrad = std::make_shared<NDArrayView>(AsDataType<ElemType>(), p->Shape(), AsDeviceDescriptor(data->GetDeviceId()));
pSmoothedGrad->SetValue(static_cast<ElemType>(0));
m_blockLevelSmoothedGradient[index] = pSmoothedGrad;
}
if (!m_prevParameters[index])
{
NDArrayViewPtr newValue = std::make_shared<NDArrayView>(AsDataType<ElemType>(), p->Shape(), AsDeviceDescriptor(data->GetDeviceId()));
std::shared_ptr<Matrix<ElemType>> newData = newValue->GetWritableMatrix<ElemType>();
newData->SetValue(*data);
m_prevParameters[index] = newValue;
}
else
{
m_prevParameters[index]->GetWritableMatrix<ElemType>()->SetValue(*data);
}
if (!m_tempBlockGradient[index])
{
m_tempBlockGradient[index] = std::make_shared<NDArrayView>(AsDataType<ElemType>(), p->Shape(), AsDeviceDescriptor(data->GetDeviceId()));
}
}
template<class ElemType>
void SynchronizeModel(const std::vector<NDArrayViewPtr>& parameterValues)
{
ElemType blockMomentum = (ElemType)TimeConstant2Momentum(m_blockMomentumAsTimeConstantPerWorker, m_numSamplesSeenInCurrentBlock);
// 1. Let's aggregate weights
for (size_t i = 0; i < parameterValues.size(); ++i)
{
// Get current model
Matrix<ElemType>& previousWeight = *m_prevParameters[i]->GetWritableMatrix<ElemType>(); // prev model value
Matrix<ElemType>& currentWeight = *parameterValues[i]->GetWritableMatrix<ElemType>();
Matrix<ElemType>& blockGrad = *m_tempBlockGradient[i]->GetWritableMatrix<ElemType>();
// Subtract it from the previous model
blockGrad = previousWeight - currentWeight; // matW becomes local block gradient (of one worker)
}
// Send block gradient over MPI nodes.
m_communicator->AggregateInPlace(m_tempBlockGradient, m_communicator->Workers());
// 2. Let's update the model
for (size_t i = 0; i < parameterValues.size(); ++i)
{
// 2 block gradient aggregation
// 2.1. get current model
Matrix<ElemType>& previousWeight = *m_prevParameters[i]->GetWritableMatrix<ElemType>(); // prev model value
Matrix<ElemType>& currentWeight = *parameterValues[i]->GetWritableMatrix<ElemType>();
Matrix<ElemType>& blockGrad = *m_tempBlockGradient[i]->GetWritableMatrix<ElemType>();
// 2.2. model update
{
Matrix<ElemType>& sg = *m_blockLevelSmoothedGradient[i]->GetWritableMatrix<ElemType>(); // smoothed gradient
// 2.2.1 update block level smoothed gradient;
// This is essentially a first-order infinite impulse response (IIR) filter with the gain (1 - blockMomentum)*m_blockLearningRate:
// smoothedGradient(t)=blockMomentum * smoothedGradients(t-1) + (1 - blockMomentum)*m_blockLearningRate*blockGrad(t)
Matrix<ElemType>::ScaleAndAdd((ElemType)((1 - blockMomentum)*m_blockLearningRate), blockGrad, (ElemType)blockMomentum, sg);
// 2.2.2 update parameters;
currentWeight.SetValue(previousWeight);
currentWeight -= sg;
// 2.2.3 Nesterov Momentum
// A Nesterov momentum here is to do a partial weight update before calculating the gradient, i.e.,
// (step 1) w(t) <-- w(t) - \eta* v(t)
// (step 2) g(t+1) <-- forwardbackward on minibatches with initial model as w(t)
// (step 3) v(t+1) <-- \eta*v(t) + (1-\eta)*learningRate*g(t+1)
// (step 4) w(t+1) <-- w(t)-v(t)
// (step 5) t <-- t+1
// without step 1, this becomes stanard momentum
if (m_useNesterovMomentum)
{
Matrix<ElemType>::ScaleAndAdd((ElemType)-blockMomentum, sg, currentWeight);
}
// 2.2.4 update bookkeeping
previousWeight.SetValue(currentWeight);
}
}
}
static double TimeConstant2Momentum(double timeConstant, size_t syncPeroid)
{
if (timeConstant == 0)
return 0;
else
return exp(-((double)syncPeroid) / timeConstant);
}
static double Momentum2TimeConstant(double bm, size_t syncPeroid)
{
if (bm >= 1.0 || bm < 0.0)
{
InvalidArgument("Unexpected block momentum (%.2f). Block momentum should be in the range of [0,1)\n", bm);
}
return -(double)syncPeroid / log(bm);
}
const bool m_resetSGDMomentumAfterAggregation;
const bool m_useNesterovMomentum;
const double m_blockLearningRate;
const double m_blockMomentumAsTimeConstantPerWorker;
const size_t m_syncPeriodPerWorker;
const size_t m_globalModelAggregationBlockSize;
size_t m_numSamplesSeenInCurrentBlock;
size_t m_localTotalNumSamplesSeen;
// parameters at the last model aggregation point
std::vector<NDArrayViewPtr> m_prevParameters;
std::vector<NDArrayViewPtr> m_blockLevelSmoothedGradient;
std::vector<NDArrayViewPtr> m_tempBlockGradient;
// temp storage for MPI
std::vector<NDArrayViewPtr> m_actionBuffer;
bool m_prevParamInitialized = false;
bool m_endOfDataReached;
bool m_shutDownSeenBefore = false;
DISABLE_COPY_AND_MOVE(BlockMomentumDistributedLearner);
};
}

Просмотреть файл

@ -1,298 +0,0 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE.md file in the project root for full license information.
//
#pragma once
#include "../SGDLib/MASGD.h"
namespace Microsoft { namespace MSR { namespace CNTK {
// Implementation of Blockwise Model Update and Filtering (BMUF, a.k.a. block momentum)
// For detail, see the following paper
// Kai Chen and Qiang Huo, "Scalable training of deep learning machines by incremental block training
// with intra-block parallel optimization and blockwise model-update filtering",
// in International Conference on Acoustics, Speech and Signal Processing , March 2016, Shanghai, China.
template<typename ElemType>
class BlockMomentumSGD : public IMASGD<ElemType>
{
typedef IMASGD<ElemType> Base;
using Base::m_pMPI;
using Base::m_deviceId;
using Base::DownCast;
protected:
bool m_resetSGDMomentumAfterAggregation;
bool m_useNesterovMomentum;
double m_blockLearningRate;
double m_blockMomentumAsTimeConstantPerWorker;
size_t m_syncPeriodPerWorker;
map < wstring, shared_ptr<Matrix<ElemType>>> m_prevParameters; // parameters at the last model aggregation point
map < wstring, shared_ptr<Matrix<ElemType>>> m_blockLevelSmoothedGradient;
public:
BlockMomentumSGD(const MPIWrapperPtr& pMPI, size_t reportFreq, DEVICEID_TYPE devID,
bool useNestrovMomentum, bool resetSGDM,
double blockLearningRate,
double blockMomentumAsTimeConstant, size_t syncPeriod)
:IMASGD<ElemType>(pMPI, reportFreq, devID)
{
m_syncPeriodPerWorker = syncPeriod / pMPI->NumNodesInUse();
m_blockMomentumAsTimeConstantPerWorker = blockMomentumAsTimeConstant / pMPI->NumNodesInUse();
m_useNesterovMomentum = useNestrovMomentum;
m_resetSGDMomentumAfterAggregation = resetSGDM;
m_blockLearningRate = blockLearningRate;
}
/*virtual*/ void OnEpochStart(const std::list<ComputationNodeBasePtr>& LearnableNodes) override
{
Base::OnEpochStart(LearnableNodes);
for (auto& pNode : LearnableNodes)
{
auto pnode = DownCast(pNode);
wstring name = pNode->NodeName();
Matrix<ElemType>& NodeValue = pnode->Value();
if (m_blockLevelSmoothedGradient.find(name) == m_blockLevelSmoothedGradient.end())
{
// has not been initialized yet
auto pSmoothedGrad = make_shared<Matrix<ElemType>> (NodeValue.GetDeviceId());
pSmoothedGrad->Resize(NodeValue.GetNumRows(), NodeValue.GetNumCols());
pSmoothedGrad->SetValue((ElemType)0);
m_blockLevelSmoothedGradient[name] = pSmoothedGrad;
}
if (m_prevParameters.find(name) == m_prevParameters.end())
{
auto pValue = make_shared<Matrix<ElemType>> (NodeValue.GetDeviceId());
pValue->SetValue(NodeValue);
m_prevParameters[name] = pValue;
}
else
{
m_prevParameters[name]->SetValue(NodeValue);
}
}
fprintf(stderr, "Parallel training (%d workers) using BlockMomentumSGD with "
"block momentum = %6.4f, "
"block momentum time constant (per worker) = %6.4f, "
"block learning rate = %6.4f, "
"block size per worker = %d samples, "
"%s"
"%s"
"\n",
(int)m_pMPI->NumNodesInUse(),
BlockMomentumSGD<double>::TimeConstant2Momentum(m_blockMomentumAsTimeConstantPerWorker, m_syncPeriodPerWorker),
m_blockMomentumAsTimeConstantPerWorker,
m_blockLearningRate,
(int)m_syncPeriodPerWorker,
m_useNesterovMomentum ? "using Nesterov-style block momentum, " : "" ,
m_resetSGDMomentumAfterAggregation ? "resetting SGD momentum after sync." : "."
);
}
/*virtual*/ void OnEpochEnd(const std::list<ComputationNodeBasePtr>& LearnableNodes,
std::list<Matrix<ElemType>>& smoothedGradient,
size_t samplesSinceLastSync) override
{
Base::OnEpochEnd(LearnableNodes, smoothedGradient, samplesSinceLastSync);
}
/*virtual*/ void ModelAggregationProcessing(
size_t samplesSinceLastSync,
const std::list<ComputationNodeBasePtr>& learnableNodes,
std::list<Matrix<ElemType>>& smoothedGradient,
size_t& totalSamplesProcessed,
float& secondsOnCommunication
) override
{
//----------------------------------------
// 1. communicate with other nodes to negotiate contribution weights
//----------------------------------------
int nTotalSamples = samplesSinceLastSync;
ElemType blockMomentum = (ElemType)BlockMomentumSGD<double>::TimeConstant2Momentum(m_blockMomentumAsTimeConstantPerWorker, m_syncPeriodPerWorker);
Timer commTimer;
secondsOnCommunication = 0.0f;
commTimer.Start();
m_pMPI->AllReduce(&nTotalSamples, 1);
commTimer.Stop();
secondsOnCommunication += (float)commTimer.ElapsedSeconds();
totalSamplesProcessed = nTotalSamples;
for (auto& pBaseNode : learnableNodes)
{
if (!pBaseNode->IsParameterUpdateRequired())
{
continue;
}
wstring name = pBaseNode->NodeName();
// 2 block gradient aggregation
auto pNode = DownCast(pBaseNode);
// 2.1. get current model
Matrix<ElemType>& prevWeight = *m_prevParameters[name]; // prev model value
Matrix<ElemType>& currentWeight = pNode->Value(); // current model
// 2.1.2. subtract it from the previous model
Matrix<ElemType> blockGrad(prevWeight.DeepClone());
blockGrad -= currentWeight; // matW becomes local block gradient (of one worker)
// 2.1.3. send block gradient over MPI nodes;
unique_ptr<ElemType[]> px(blockGrad.CopyToArray());
size_t nx = blockGrad.GetNumElements();
// 2.1.4. inplace sum
commTimer.Restart();
m_pMPI->AllReduce(px.get(), nx);
commTimer.Stop();
secondsOnCommunication += (float)commTimer.ElapsedSeconds();
// 2.1.5. global block gradient
blockGrad.SetValue(blockGrad.GetNumRows(),
blockGrad.GetNumCols(),
blockGrad.GetDeviceId(),
px.get()
);
// 2.2. model update
{
// alias for better readability
Matrix<ElemType>& smoothedGradientUpdate = *m_blockLevelSmoothedGradient[name]; // smoothed gradient
// 2.2.1 update block level smoothed gradient;
// This is essentially a first-order infinite impulse response (IIR) filter with the gain (1 - blockMomentum)*m_blockLearningRate:
// smoothedGradientUpdate(t)=blockMomentum * smoothedGradients(t-1) + (1 - blockMomentum)*m_blockLearningRate*blockGrad(t)
Matrix<ElemType>::ScaleAndAdd((ElemType)((1 - blockMomentum)*m_blockLearningRate), blockGrad, (ElemType)blockMomentum, smoothedGradientUpdate);
// 2.2.2 update parameters;
currentWeight.SetValue(prevWeight);
currentWeight -= smoothedGradientUpdate;
// 2.2.3 Nesterov Momentum
// A Nesterov momentum here is to do a partial weight update before calculating the gradient, i.e.,
// (step 1) w(t) <-- w(t) - \eta* v(t)
// (step 2) g(t+1) <-- forwardbackward on minibatches with initial model as w(t)
// (step 3) v(t+1) <-- \eta*v(t) + (1-\eta)*learningRate*g(t+1)
// (step 4) w(t+1) <-- w(t)-v(t)
// (step 5) t <-- t+1
// without step 1, this becomes stanard momentum
if (m_useNesterovMomentum)
{
Matrix<ElemType>::ScaleAndAdd((ElemType)-blockMomentum, smoothedGradientUpdate, currentWeight);
}
// 2.2.4 update bookkeeping
prevWeight.SetValue(currentWeight);
}
}
//----------------------------------------
// 3. reset SGD momentum if necessary
//----------------------------------------
if (m_resetSGDMomentumAfterAggregation)
{
for (Matrix<ElemType>& x : smoothedGradient)
{
x.SetValue((ElemType)0);
}
}
}
/*virtual*/ void SaveToCheckPoint(File& fstream) override
{
if (m_pMPI->IsMainNode())
{
fstream.PutMarker(FileMarker::fileMarkerBeginSection, L"BMACKP");
fstream.PutMarker(FileMarker::fileMarkerBeginSection, L"BOptions");
fstream << m_resetSGDMomentumAfterAggregation;
fstream.PutMarker(FileMarker::fileMarkerEndSection, L"EOptions");
fstream.PutMarker(FileMarker::fileMarkerBeginSection, L"BMomentumAsTimeConstant");
fstream << m_blockMomentumAsTimeConstantPerWorker;
fstream.PutMarker(FileMarker::fileMarkerEndSection, L"EMomentumAsTimeConstant");
fstream.PutMarker(FileMarker::fileMarkerBeginSection, L"BSyncPeriodInSamples");
fstream << m_syncPeriodPerWorker;
fstream.PutMarker(FileMarker::fileMarkerEndSection, L"ESyncPeriodInSamples");
fstream.PutMarker(FileMarker::fileMarkerBeginSection, L"BParam");
SaveParameters(fstream, m_prevParameters);
SaveParameters(fstream, m_blockLevelSmoothedGradient);
fstream.PutMarker(FileMarker::fileMarkerBeginSection, L"EParam");
fstream.PutMarker(FileMarker::fileMarkerBeginSection, L"EMACKP");
}
}
/*virtual*/ void LoadFromCheckPoint(File& fstream) override
{
if (fstream.TryGetMarker(FileMarker::fileMarkerBeginSection, L"BMACKP"))
{
fstream.GetMarker(FileMarker::fileMarkerBeginSection, L"BOptions");
fstream >> m_resetSGDMomentumAfterAggregation;
fstream.GetMarker(FileMarker::fileMarkerEndSection, L"EOptions");
fstream.GetMarker(FileMarker::fileMarkerBeginSection, L"BMomentumAsTimeConstant");
fstream >> m_blockMomentumAsTimeConstantPerWorker;
fstream.GetMarker(FileMarker::fileMarkerEndSection, L"EMomentumAsTimeConstant");
fstream.GetMarker(FileMarker::fileMarkerBeginSection, L"BSyncPeriodInSamples");
fstream >> m_syncPeriodPerWorker;
fstream.GetMarker(FileMarker::fileMarkerEndSection, L"ESyncPeriodInSamples");
fstream.GetMarker(FileMarker::fileMarkerBeginSection, L"BParam");
LoadParameters(fstream, m_prevParameters, m_deviceId);
LoadParameters(fstream, m_blockLevelSmoothedGradient, m_deviceId);
fstream.GetMarker(FileMarker::fileMarkerBeginSection, L"EParam");
fstream.GetMarker(FileMarker::fileMarkerEndSection, L"EMACKP");
}
}
private:
// helper function to save/load map<wstring, shared_ptr<Matrix<ElemType>> structure
void SaveParameters(File& f, const map<wstring, shared_ptr<Matrix<ElemType>>>& parameters) const
{
// save sizeof(ElemType)
unsigned int size = sizeof(ElemType);
f << size;
// save number of pairs
unsigned int numPairs = parameters.size();
f << numPairs;
for (auto& x : parameters)
{
f << x.first;
f << *x.second;
}
f.Flush();
return;
}
void LoadParameters(File& f, map<wstring, shared_ptr<Matrix<ElemType>>>& parameters, DEVICEID_TYPE deviceID)
{
unsigned int size = 0;
unsigned int pair = 0;
f >> size;
f >> pair;
if (size != sizeof(ElemType))
{
LogicError("Mismatched ElemType in loading BlockMomentumSGD checkpoint. Expecting %s, while loading element size=%d\n",
sizeof(ElemType) == 4 ? "float" : "double",
size
);
}
parameters.clear();
for (size_t i = 0; i < pair; i++)
{
wstring name;
f >> name;
shared_ptr<Matrix<ElemType>> mat = make_shared<Matrix<ElemType>>(deviceID);
f >> *mat;
parameters[name] = mat;
}
}
public:
static double TimeConstant2Momentum(double timeConstant, size_t syncPeroid)
{
return exp(-((double)syncPeroid) / timeConstant);
}
static double Momentum2TimeConstant(double bm, size_t syncPeroid)
{
if (bm >= 1.0 || bm < 0.0)
{
InvalidArgument("Unexpected block momentum (%.2f). Block momentum should be in the range of [0,1)\n", bm);
}
return -(double)syncPeroid / log(bm);
}
};
} } }

Просмотреть файл

@ -1,87 +0,0 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE.md file in the project root for full license information.
//
#pragma once
#include "ColumnQuantizer.h"
#include "QuantizedMatrix.h"
#include "MatrixQuantizerImpl.h"
namespace Microsoft { namespace MSR { namespace CNTK {
// This type does the quantization on a matrix
// This is a technique to reduce the cost of communicating
// the gradient matrices during aggregation across all nodes in
// data-parallel SGD training, at the end of each minibatch.
// Refer this paper http://research.microsoft.com/apps/pubs/?id=230137
// for details.
class MatrixQuantizerBase
{};
template <class ElemType>
class MatrixQuantizer final : public MatrixQuantizerBase
{
public:
MatrixQuantizer(size_t numRows, size_t numCols, int deviceId, bool useAsync) : MatrixQuantizer(deviceId, useAsync)
{
m_residual = std::make_shared<Matrix<ElemType>>(numRows, numCols, deviceId, DENSE);
}
MatrixQuantizer(int deviceId, bool useAsync) : m_residual(nullptr)
{
m_quantizerImpl.reset(MatrixQuantizerImpl<ElemType>::Create(deviceId, useAsync));
}
// Disallow copy and move construction and assignment
DISABLE_COPY_AND_MOVE(MatrixQuantizer);
void QuantizeAsync(const Matrix<ElemType>& inMatrix, QuantizedMatrix<ElemType>& outQMatrix, bool zeroThresholdFor1Bit)
{
m_quantizerImpl->QuantizeAsync(inMatrix, *m_residual, outQMatrix, *m_residual, zeroThresholdFor1Bit);
}
void QuantizeAsync(const Matrix<ElemType>& inMatrix, const Matrix<ElemType>& inResidual, QuantizedMatrix<ElemType>& outQMatrix, Matrix<ElemType>& outResidual, bool zeroThresholdFor1Bit)
{
m_quantizerImpl->QuantizeAsync(inMatrix, inResidual, outQMatrix, outResidual, zeroThresholdFor1Bit);
}
void WaitQuantizeAsyncDone()
{
m_quantizerImpl->WaitQuantizeAsyncDone();
}
void UnquantizeAsync(QuantizedMatrix<ElemType>& inQMatrix, Matrix<ElemType>& outMatrix, bool add = false)
{
m_quantizerImpl->UnquantizeAsync(inQMatrix, outMatrix, add);
}
void WaitUnquantizeAsyncDone()
{
m_quantizerImpl->WaitUnquantizeAsyncDone();
}
int GetDeviceId() const
{
return m_quantizerImpl->GetDeviceId();
}
void ResetResidue()
{
m_residual->SetValue(0.0);
}
const Matrix<ElemType>& GetResidualMatrix() const
{
return *m_residual;
}
private:
std::unique_ptr<MatrixQuantizerImpl<ElemType>> m_quantizerImpl;
// the residual matrix
std::shared_ptr<Matrix<ElemType>> m_residual;
};
} } }

Просмотреть файл

@ -1,99 +0,0 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE.md file in the project root for full license information.
//
#pragma once
#include <vector>
#include "CNTKLibrary.h"
#include "DistributedLearnerBase.h"
#include "PerformanceProfiler.h"
namespace CNTK
{
///
/// Quantized Distributed Trainer.
///
class QuantizedDataParallelDistributedLearner : public DistributedLearnerBase
{
public:
QuantizedDataParallelDistributedLearner(QuantizedDistributedCommunicatorPtr communicator, LearnerPtr learner, size_t distributeAfterSamples, bool useAsyncBufferedParameterUpdate)
: DistributedLearnerBase(communicator, learner, distributeAfterSamples)
{
if (useAsyncBufferedParameterUpdate)
LogicError("Asynchronous parameter update is not yet supported.");
}
// Optional override that gets called per minibatch after finishing gradient computation but before updating model parameters
bool Update(std::unordered_map<Parameter, NDArrayViewPtr>& gradientValues, MinibatchInfo& info) override
{
if (m_sampleCount >= m_distributeAfterSamples)
{
auto profGradientAgg = Microsoft::MSR::CNTK::ScopeProfile(Microsoft::MSR::CNTK::profilerEvtMainGradient);
if (info.IsEmpty())
PrepaireZeroGradients(gradientValues);
ConvertToOrdered(gradientValues, m_gradientBuffer);
std::vector<NDArrayViewPtr> headerToAggregate;
headerToAggregate.push_back(info.evalCriterionValue);
headerToAggregate.push_back(info.trainingLossValue);
auto value = MakeSharedObject<NDArrayView>(static_cast<double>(info.numberOfSamples), NDShape{ 1 }, DeviceDescriptor::CPUDevice());
headerToAggregate.push_back(value);
m_communicator->AggregateInPlace(headerToAggregate, m_communicator->Workers());
info.numberOfSamples = static_cast<size_t>(*headerToAggregate.back()->DataBuffer<double>());
std::vector<NDArrayViewPtr> gradients;
for (const auto& i : m_gradientBuffer)
gradients.push_back(i.second);
m_gradientBuffer.clear();
dynamic_cast<QuantizedDistributedCommunicator*>(m_communicator.get())->QuantizedAggregateInPlace(
gradients,
m_residuals,
m_stripeResiduals,
m_communicator->Workers());
}
auto profWeights = Microsoft::MSR::CNTK::ScopeProfile(Microsoft::MSR::CNTK::profilerEvtMainWeights);
m_sampleCount += info.numberOfSamples;
if (info.IsEmpty())
return false;
return m_learner->Update(gradientValues, info.numberOfSamples, info.atEndOfSweep);
}
// Optionally overridable method to get checkpoint state associated with this Distributed train method
Dictionary CreateCheckpoint() override
{
// Resetting the residuals.
// We do this to make sure that the returned checkpoint state is consistent with the in - memory state, since we do not checkpoint the residues.
for (size_t i = 0; i < m_residuals.size(); ++i)
if (m_residuals[i]->GetDataType() == DataType::Double)
m_residuals[i]->SetValue(0.0);
else
m_residuals[i]->SetValue(0.0f);
for (size_t i = 0; i < m_stripeResiduals.size(); ++i)
if (m_stripeResiduals[i])
if (m_stripeResiduals[i]->GetDataType() == DataType::Double)
m_stripeResiduals[i]->SetValue(0.0);
else
m_stripeResiduals[i]->SetValue(0.0f);
return DistributedLearnerBase::CreateCheckpoint();
}
private:
// Residuals of quantized gradients.
std::vector<NDArrayViewPtr> m_residuals;
// Residuals of quantized aggregated stripes this node is responsible for.
std::vector<NDArrayViewPtr> m_stripeResiduals;
};
}

Просмотреть файл

@ -1,567 +0,0 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE.md file in the project root for full license information.
//
#pragma once
#include "Basics.h"
#include "MPIWrapper.h"
#include "CNTKLibrary.h"
#include "MatrixQuantizerImpl.h"
#include "MatrixQuantizer.h"
#include "CUDAPageLockedMemAllocator.h"
#include "Utils.h"
#include "DistributedCommunicator.h"
namespace Microsoft { namespace MSR { namespace CNTK {
class MatrixQuantizerBase;
class QuantizedMatrixBase;
std::shared_ptr<QuantizedMatrixBase> QuantizedMatrixBasePtr;
class CUDAPageLockedMemAllocator;
} } }
namespace CNTK
{
class QuantizedMPICommunicatorImpl final : public MPICommunicatorImpl, public QuantizedDistributedCommunicator
{
using Base = MPICommunicatorImpl;
template<class T> using vector = std::vector<T>;
template<class T> using shared_ptr = std::shared_ptr<T>;
template<class T> using unordered_set = std::unordered_set<T>;
using MpiFail = Microsoft::MSR::CNTK::MpiFail;
using QuantizedMatrixBase = Microsoft::MSR::CNTK::QuantizedMatrixBase;
using QuantizedMatrixBasePtr = shared_ptr<QuantizedMatrixBase>;
using MatrixQuantizerBase = Microsoft::MSR::CNTK::MatrixQuantizerBase;
using CUDAPageLockedMemAllocator = Microsoft::MSR::CNTK::CUDAPageLockedMemAllocator;
template<class T> using MatrixQuantizer = Microsoft::MSR::CNTK::MatrixQuantizer<T>;
template<class T> using QuantizedMatrix = Microsoft::MSR::CNTK::QuantizedMatrix<T>;
template<class T> using Matrix = Microsoft::MSR::CNTK::Matrix<T>;
public:
QuantizedMPICommunicatorImpl(bool zeroThresholdFor1Bit, bool useQuantizationForSelfStripe, size_t numQuantizationBits)
: m_zeroThresholdFor1Bit(zeroThresholdFor1Bit), m_useQuantizationForSelfStripe(useQuantizationForSelfStripe), m_numQuantizationBits(numQuantizationBits)
{}
void QuantizedAggregateInPlace(
std::vector<NDArrayViewPtr>& inValues,
std::vector<NDArrayViewPtr>& valueQuantizationResidues,
std::vector<NDArrayViewPtr>& stripeQuantizationResidues,
const std::unordered_set<DistributedWorkerDescriptor>& sendToWorkers) override
{
QuantizedAggregate(
inValues, valueQuantizationResidues, stripeQuantizationResidues,
inValues, valueQuantizationResidues, stripeQuantizationResidues,
sendToWorkers);
}
// A collective communication API to perform quantized aggregation of values across all workers of this communicator
void QuantizedAggregate(
const vector<NDArrayViewPtr>& inValues,
const vector<NDArrayViewPtr>& valueQuantizationResidues,
const vector<NDArrayViewPtr>& stripeQuantizationResidues,
vector<NDArrayViewPtr>& aggregatedOutputs,
vector<NDArrayViewPtr>& newQuantizationResidues,
vector<NDArrayViewPtr>& newStripeQuantizationResidues,
const unordered_set<DistributedWorkerDescriptor>& sendToWorkers) override
{
CheckWorkers(sendToWorkers);
if (Workers().size() == 1) // No need to aggregate anything.
{
aggregatedOutputs = inValues;
newQuantizationResidues = valueQuantizationResidues;
newStripeQuantizationResidues = stripeQuantizationResidues;
return;
}
if (inValues.empty())
return;
DataType dataType = inValues.front()->GetDataType();
for (const auto& v : inValues)
{
if (v->GetDataType() != dataType)
RuntimeError("Currently values of different types are not supported for quantize.");
}
if (dataType == DataType::Float)
QuantizedAggregate<float>(inValues, valueQuantizationResidues, stripeQuantizationResidues, aggregatedOutputs, newQuantizationResidues, newStripeQuantizationResidues, sendToWorkers);
else if (dataType == DataType::Double)
QuantizedAggregate<double>(inValues, valueQuantizationResidues, stripeQuantizationResidues, aggregatedOutputs, newQuantizationResidues, newStripeQuantizationResidues, sendToWorkers);
else
LogicError("Unexpected type value.");
}
// Redefining inherited members.
// TODO: Use using and virtual inheritance after switching to VS2015.
const std::unordered_set<DistributedWorkerDescriptor>& Workers() const override { return Base::Workers(); }
const DistributedWorkerDescriptor& CurrentWorker() const override { return Base::CurrentWorker(); }
DistributedCommunicatorPtr SubGroup(const std::unordered_set<DistributedWorkerDescriptor>& g) const override { return Base::SubGroup(g); }
void Concatenate(
const std::vector<ValuePtr>& in,
std::vector<ValuePtr>& out,
const std::unordered_set<DistributedWorkerDescriptor>& w) override
{
Base::Concatenate(in, out, w);
}
void AggregateInPlace(
const std::vector<NDArrayViewPtr>& values,
const std::unordered_set<DistributedWorkerDescriptor>& sendToWorkers) override
{
Base::AggregateInPlace(values, sendToWorkers);
}
void Aggregate(
const std::vector<NDArrayViewPtr>& values,
std::vector<NDArrayViewPtr>& outputValues,
const std::unordered_set<DistributedWorkerDescriptor>& sendToWorkers) override
{
Base::Aggregate(values, outputValues, sendToWorkers);
}
void Barrier() override
{
Base::Barrier();
}
virtual void Concatenate(
const std::vector<NDArrayViewPtr>& input,
std::vector<NDArrayViewPtr>& output,
const std::unordered_set<DistributedWorkerDescriptor>& sendToWorkers) override
{
Base::Concatenate(input, output, sendToWorkers);
}
virtual void Gather(
const Dictionary& input,
std::vector<DictionaryPtr>& output,
const std::unordered_set<DistributedWorkerDescriptor>& sendToWorkers) override
{
Base::Gather(input, output, sendToWorkers);
}
private:
struct Stripe
{
size_t m_startCol;
size_t m_numCols;
};
// Determine which stripe of the gradient is this node responsible for
Stripe GetStripeForNode(size_t numCols, size_t nodeRank, size_t numNodes)
{
size_t numColsPerNode = numCols / numNodes;
size_t residue = numCols % numNodes;
size_t startColNumofStripe = (numColsPerNode * nodeRank) + min(residue, nodeRank);
size_t numColsinStripe = numColsPerNode + ((nodeRank < residue) ? 1 : 0);
return Stripe{ startColNumofStripe, numColsinStripe };
}
template <typename ElementType>
MatrixQuantizer<ElementType>& GetQuantizer(const shared_ptr<MatrixQuantizerBase>& quantizer)
{
return static_cast<MatrixQuantizer<ElementType>&>(*quantizer);
}
template <typename ElementType>
QuantizedMatrix<ElementType>& GetQuantizedMatrix(QuantizedMatrixBase& matrix)
{
return static_cast<QuantizedMatrix<ElementType>&>(matrix);
}
void InitializeBuffers(
const vector<NDArrayViewPtr>& inValues,
vector<NDArrayViewPtr>& valueQuantizationResidues,
vector<NDArrayViewPtr>& stripeQuantizationResidues,
vector<NDArrayViewPtr>& aggregatedOutputs,
vector<NDArrayViewPtr>& newQuantizationResidues,
vector<NDArrayViewPtr>& newStripeQuantizationResidues)
{
m_preAggregatedGradientQuantizers.resize(std::max(inValues.size(), valueQuantizationResidues.size()));
if (inValues.size() != m_preAggregatedGradientQuantizers.size())
LogicError("Number of aggregated values should be equal number of quantized residuals.");
m_quantizedGradients.resize(inValues.size());
m_aggregatedGradientStripeQuantizers.resize(std::max(inValues.size(), stripeQuantizationResidues.size()));
if (inValues.size() != m_aggregatedGradientStripeQuantizers.size())
LogicError("Number of aggregated values should be equal number of striped quantized residuals.");
m_recvGradientStripesQuantized.resize(inValues.size());
if (valueQuantizationResidues.empty())
valueQuantizationResidues.resize(inValues.size());
if (stripeQuantizationResidues.empty())
stripeQuantizationResidues.resize(inValues.size());
if (newQuantizationResidues.empty())
newQuantizationResidues.resize(inValues.size());
if (newStripeQuantizationResidues.empty())
newStripeQuantizationResidues.resize(inValues.size());
for (auto i = 0; i < inValues.size(); ++i)
{
auto view = inValues[i];
// Make sure none of the values are sparse - we currently do not support aggregation of sparse matrices
if (view->GetStorageFormat() != StorageFormat::Dense)
RuntimeError("Aggregation for sparse matrices is currently not supported!");
// Currently we always use async aggregation. Is this correct?
if (view->GetDataType() == DataType::Float)
InitializeBuffer<float>(inValues, valueQuantizationResidues, stripeQuantizationResidues, aggregatedOutputs, newQuantizationResidues, newStripeQuantizationResidues, i);
else if (view->GetDataType() == DataType::Double)
InitializeBuffer<double>(inValues, valueQuantizationResidues, stripeQuantizationResidues, aggregatedOutputs, newQuantizationResidues, newStripeQuantizationResidues, i);
else
LogicError("Unsupported type");
}
}
template<class ElemType>
void InitializeBuffer(
const vector<NDArrayViewPtr>& inValues,
vector<NDArrayViewPtr>& valueQuantizationResidues,
vector<NDArrayViewPtr>& stripeQuantizationResidues,
vector<NDArrayViewPtr>& /*aggregatedOutputs*/,
vector<NDArrayViewPtr>& newQuantizationResidues,
vector<NDArrayViewPtr>& newStripeQuantizationResidues,
size_t index)
{
int rank = static_cast<int>(CurrentWorker().m_globalRank);
int numWorkers = static_cast<int>(Workers().size());
auto value = inValues[index];
auto v = GetMatrix<ElemType>(value);
size_t nRow = v->GetNumRows();
size_t nCol = v->GetNumCols();
if (!valueQuantizationResidues[index])
{
auto residual = MakeSharedObject<NDArrayView>(AsDataType<ElemType>(), NDShape{ nRow, nCol }, AsDeviceDescriptor(v->GetDeviceId()));
auto outputResidual = MakeSharedObject<NDArrayView>(AsDataType<ElemType>(), NDShape{ nRow, nCol }, AsDeviceDescriptor(v->GetDeviceId()));
valueQuantizationResidues[index] = residual;
newQuantizationResidues[index] = outputResidual;
}
Stripe stripe = GetStripeForNode(v->GetNumCols(), rank, numWorkers);
if (!stripeQuantizationResidues[index] && stripe.m_numCols > 0)
{
auto residual = MakeSharedObject<NDArrayView>(::CNTK::AsDataType<ElemType>(), NDShape{ nRow, stripe.m_numCols }, AsDeviceDescriptor(v->GetDeviceId()));
auto outputResidual = MakeSharedObject<NDArrayView>(::CNTK::AsDataType<ElemType>(), NDShape{ nRow, stripe.m_numCols }, AsDeviceDescriptor(v->GetDeviceId()));
stripeQuantizationResidues[index] = residual;
newStripeQuantizationResidues[index] = outputResidual;
}
auto inResidual = valueQuantizationResidues[index];
// Initialize buffer.
m_quantizedGradients[index] = std::make_shared<QuantizedMatrix<ElemType>>(v->GetNumRows(), v->GetNumCols(), m_numQuantizationBits, CPUDEVICE, m_allocator.get());
// Initialize gradient quantizer.
m_preAggregatedGradientQuantizers[index] = std::make_shared<MatrixQuantizer<ElemType>>(GetMatrix<ElemType>(inResidual)->GetDeviceId(), true);
// Determine which stripe of the gradient is this node responsible for
MatrixQuantizer<ElemType>* aggregatedGradientStripeQuantizers = nullptr;
if (stripe.m_numCols > 0)
{
// Initialize quantizer
aggregatedGradientStripeQuantizers = new MatrixQuantizer<ElemType>(GetMatrix<ElemType>(inResidual)->GetDeviceId(), true);
m_recvGradientStripesQuantized[index].resize(numWorkers - 1);
for (size_t j = 0; j < numWorkers - 1; ++j)
m_recvGradientStripesQuantized[index][j]= std::unique_ptr<QuantizedMatrix<ElemType>>(new QuantizedMatrix<ElemType>(v->GetNumRows(), stripe.m_numCols, m_numQuantizationBits, CPUDEVICE, m_allocator.get()));
}
m_aggregatedGradientStripeQuantizers[index] = std::unique_ptr<MatrixQuantizer<ElemType>>(aggregatedGradientStripeQuantizers);
}
template<class ElemType>
void QuantizedAggregate(
const vector<NDArrayViewPtr>& inValues,
const vector<NDArrayViewPtr>& formalValueQuantizationResidues,
const vector<NDArrayViewPtr>& formalStripeQuantizationResidues,
vector<NDArrayViewPtr>& aggregatedOutputs,
vector<NDArrayViewPtr>& newQuantizationResidues,
vector<NDArrayViewPtr>& newStripeQuantizationResidues,
const unordered_set<DistributedWorkerDescriptor>& sendToWorkers)
{
CheckWorkers(sendToWorkers);
const int numWorkers = static_cast<int>(Workers().size());
const int rank = static_cast<int>(CurrentWorker().m_globalRank);
auto valueQuantizationResidues = formalValueQuantizationResidues;
auto stripeQuantizationResidues = formalStripeQuantizationResidues;
InitializeBuffers(
inValues,
valueQuantizationResidues,
stripeQuantizationResidues,
aggregatedOutputs,
newQuantizationResidues,
newStripeQuantizationResidues);
vector<shared_ptr<Matrix<ElemType>>> inputValues;
vector<shared_ptr<Matrix<ElemType>>> outputValues;
vector<shared_ptr<Matrix<ElemType>>> inputResiduals;
vector<shared_ptr<Matrix<ElemType>>> outputResiduals;
vector<shared_ptr<Matrix<ElemType>>> inputStripeResiduals;
vector<shared_ptr<Matrix<ElemType>>> outputStripeResiduals;
// Check that input corresponds to output and covert NDArrayViews to the corresponding matrices.
for (size_t i = 0; i < inValues.size(); i++)
{
assert(inValues[i]->Shape().TotalSize() == aggregatedOutputs[i]->Shape().TotalSize());
assert(inValues[i]->GetDataType() == aggregatedOutputs[i]->GetDataType());
assert(inValues[i]->Device() == aggregatedOutputs[i]->Device());
assert(inValues[i] != nullptr);
inputValues.push_back(GetWritableMatrix<ElemType>(inValues[i]));
assert(aggregatedOutputs[i] != nullptr);
outputValues.push_back(GetWritableMatrix<ElemType>(aggregatedOutputs[i]));
assert(valueQuantizationResidues[i] != nullptr);
inputResiduals.push_back(GetWritableMatrix<ElemType>(valueQuantizationResidues[i]));
assert(newQuantizationResidues[i] != nullptr);
outputResiduals.push_back(GetWritableMatrix<ElemType>(newQuantizationResidues[i]));;
// Stripe residuals can be null in case when the stripe does not belong to this node.
inputStripeResiduals.push_back(stripeQuantizationResidues[i] ? GetWritableMatrix<ElemType>(stripeQuantizationResidues[i]) : nullptr);;
outputStripeResiduals.push_back(newStripeQuantizationResidues[i]? GetWritableMatrix<ElemType>(newStripeQuantizationResidues[i]) : nullptr);
}
// Prepare receiving buffers.
vector<std::unique_ptr<Matrix<ElemType>>> aggGradStripes;
vector<std::unique_ptr<QuantizedMatrix<ElemType>>> aggGradStripesQuantized;
for (size_t i = 0; i < inputValues.size(); i++)
{
size_t nCol = inputValues[i]->GetNumCols();
// Determine which stripe of the gradient is this node responsible for
Stripe stripe = GetStripeForNode(nCol, rank, numWorkers);
Matrix<ElemType>* currAggGradStripe = nullptr;
QuantizedMatrix<ElemType>* currAggGradStripeQuantized = nullptr;
if (stripe.m_numCols > 0)
{
currAggGradStripe = new Matrix<ElemType>(inputValues[i]->ColumnSlice(stripe.m_startCol, stripe.m_numCols));
currAggGradStripeQuantized = new QuantizedMatrix<ElemType>(GetQuantizedMatrix<ElemType>(*m_quantizedGradients[i]).ColumnSlice(stripe.m_startCol, stripe.m_numCols));
}
aggGradStripes.push_back(std::unique_ptr<Matrix<ElemType>>(currAggGradStripe));
aggGradStripesQuantized.push_back(std::unique_ptr<QuantizedMatrix<ElemType>>(currAggGradStripeQuantized));
}
// Initiate quantization of the gradient matrices
for (size_t i = 0; i < inValues.size(); ++i)
GetQuantizer<ElemType>(m_preAggregatedGradientQuantizers[i]).QuantizeAsync(*(inputValues[i]), *(inputResiduals[i]), GetQuantizedMatrix<ElemType>(*(m_quantizedGradients[i])), *(outputResiduals[i]), m_zeroThresholdFor1Bit);
// Initiate receive of the stripe to be aggregated by the current node, from all other nodes
vector<MPI_Request> recvGradStripesQuantizedRequests;
vector<int> recvRequestIdxToGradientMatrixIdxMap;
for (int i = 0; i < inputValues.size(); ++i)
{
Stripe stripe = GetStripeForNode(inputValues[i]->GetNumCols(), rank, numWorkers);
if (stripe.m_numCols > 0)
{
recvRequestIdxToGradientMatrixIdxMap.push_back(i);
for (int j = 0; j < numWorkers - 1; ++j)
{
int source = (j >= rank) ? (j + 1) : j;
recvGradStripesQuantizedRequests.push_back(MPI_Request());
int recvRequestIdx = (int)recvGradStripesQuantizedRequests.size() - 1;
m_mpi->Irecv(GetQuantizedMatrix<ElemType>(*m_recvGradientStripesQuantized[i][j]).Buffer(), (int)GetQuantizedMatrix<ElemType>(*m_recvGradientStripesQuantized[i][j]).GetSize(), MPI_CHAR, source, i, &(recvGradStripesQuantizedRequests[recvRequestIdx])) || MpiFail("MPI_Irecv");
}
}
}
// Asynchronously send stripes of the quantized gradient matrices to the respective nodes that own aggregation of that stripe
std::vector<std::vector<MPI_Request>> sendGradStripesQuantizedRequests(inValues.size());
for (int i = 0; i < inValues.size(); ++i)
{
GetQuantizer<ElemType>(m_preAggregatedGradientQuantizers[i]).WaitQuantizeAsyncDone();
size_t sendRequestIdx = 0;
for (int j = 0; j < numWorkers; ++j)
{
Stripe stripe = GetStripeForNode(inputValues[i]->GetNumCols(), j, numWorkers);
if (stripe.m_numCols > 0)
{
// Do not send stripe for self
if (j != rank)
{
sendGradStripesQuantizedRequests[i].push_back(MPI_Request());
QuantizedMatrix<ElemType> quantizedStripe = GetQuantizedMatrix<ElemType>(*m_quantizedGradients[i]).ColumnSlice(stripe.m_startCol, stripe.m_numCols);
m_mpi->Isend(quantizedStripe.Buffer(), (int)quantizedStripe.GetSize(), MPI_CHAR, j, i, &(sendGradStripesQuantizedRequests[i][sendRequestIdx])) || MpiFail("MPI_Isend");
sendRequestIdx++;
}
else
{
// Initialize the aggregate for the stripe with the quantized gradients instead of the original
// gradients themselves, if so desired
if (m_useQuantizationForSelfStripe)
{
QuantizedMatrix<ElemType> preAggGradSelfStripeQuantized = GetQuantizedMatrix<ElemType>(*m_quantizedGradients[i]).ColumnSlice(stripe.m_startCol, stripe.m_numCols);
GetQuantizer<ElemType>(m_aggregatedGradientStripeQuantizers[i]).UnquantizeAsync(preAggGradSelfStripeQuantized, *(aggGradStripes[i]), false);
}
}
}
}
}
// Wait for the stripes to arrive from each node and unquantize and aggregate
size_t numReceivesExpected = recvGradStripesQuantizedRequests.size();
size_t numActualReceives = 0;
std::vector<int> perGradMatrixReceiveCount(recvRequestIdxToGradientMatrixIdxMap.size(), 0);
while (numActualReceives < numReceivesExpected)
{
int idx = MPI_UNDEFINED;
m_mpi->Waitany((int)recvGradStripesQuantizedRequests.size(), recvGradStripesQuantizedRequests.data(), &idx, MPI_STATUS_IGNORE) || MpiFail("MPI_Waitany");
if (idx == MPI_UNDEFINED)
{
break;
}
numActualReceives++;
int gradMatrixIdxPosition = idx / (numWorkers - 1);
int recvBufferSubIndex = idx % (numWorkers - 1);
// Map idx back to the actual gradient matrix index
int gradMatrixIdx = recvRequestIdxToGradientMatrixIdxMap[gradMatrixIdxPosition];
// Wait for the previous Unquantize to finish before issuing a new one
if (m_useQuantizationForSelfStripe || (perGradMatrixReceiveCount[gradMatrixIdxPosition] > 0))
GetQuantizer<ElemType>(m_aggregatedGradientStripeQuantizers[gradMatrixIdx]).WaitUnquantizeAsyncDone();
GetQuantizer<ElemType>(m_aggregatedGradientStripeQuantizers[gradMatrixIdx]).UnquantizeAsync(
GetQuantizedMatrix<ElemType>(*m_recvGradientStripesQuantized[gradMatrixIdx][recvBufferSubIndex]),
*(aggGradStripes[gradMatrixIdx]),
true);
perGradMatrixReceiveCount[gradMatrixIdxPosition]++;
// Also issue the quantization if this stripe was the last one expected for this matrix
// Note: We issue the quantization without waiting for the unquantization since the same stream
// is used for both and they are implicitly sequenced
// We reuse the buffer that we used for quantizing and sending out the pre-aggregation gradient
if (perGradMatrixReceiveCount[gradMatrixIdxPosition] == (numWorkers - 1))
{
Stripe stripe = GetStripeForNode(inputValues[gradMatrixIdx]->GetNumCols(), rank, numWorkers);
UNUSED(stripe);
assert(stripe.m_numCols > 0);
GetQuantizer<ElemType>(m_aggregatedGradientStripeQuantizers[gradMatrixIdx]).QuantizeAsync(
*(aggGradStripes[gradMatrixIdx]),
*(inputStripeResiduals[gradMatrixIdx]),
*(aggGradStripesQuantized[gradMatrixIdx]),
*(outputStripeResiduals[gradMatrixIdx]),
m_zeroThresholdFor1Bit);
}
}
assert(numActualReceives == numReceivesExpected);
vector<vector<MPI_Request>> recvAggGradStripesQuantizedRequests(inValues.size());
// Initiate receive of stripes of quantized aggregated gradients from different nodes
for (int i = 0; i < inValues.size(); ++i)
{
int recvRequestIdx = 0;
for (int j = 0; j < numWorkers; ++j)
{
// Do not recv stripe for self
if (j != rank)
{
Stripe stripe = GetStripeForNode(inputValues[i]->GetNumCols(), j, numWorkers);
if (stripe.m_numCols > 0)
{
recvAggGradStripesQuantizedRequests[i].push_back(MPI_Request());
QuantizedMatrix<ElemType> quantizedStripe = GetQuantizedMatrix<ElemType>(*m_quantizedGradients[i]).ColumnSlice(stripe.m_startCol, stripe.m_numCols);
m_mpi->Irecv(quantizedStripe.Buffer(), (int)quantizedStripe.GetSize(), MPI_CHAR, j, (int)inValues.size() + 1 + i, &(recvAggGradStripesQuantizedRequests[i][recvRequestIdx])) || MpiFail("MPI_Irecv");
recvRequestIdx++;
}
}
}
}
// Initiate broadcast of quantized aggregated gradient stripes to all other nodes
vector<vector<MPI_Request>> sendAggGradStripeQuantizedRequests(inValues.size());
for (int i = 0; i < inValues.size(); ++i)
{
Stripe stripe = GetStripeForNode(inputValues[i]->GetNumCols(), rank, numWorkers);
if (stripe.m_numCols > 0)
{
sendAggGradStripeQuantizedRequests[i] = std::vector<MPI_Request>(numWorkers - 1);
GetQuantizer<ElemType>(m_aggregatedGradientStripeQuantizers[i]).WaitQuantizeAsyncDone();
for (int j = 0; j < numWorkers - 1; ++j)
{
int dest = (j >= rank) ? (j + 1) : j;
// TODO: Should we use MPI_Bcast instead for better performance
m_mpi->Isend(aggGradStripesQuantized[i]->Buffer(), (int)aggGradStripesQuantized[i]->GetSize(), MPI_CHAR, dest, (int)inValues.size() + 1 + i, &(sendAggGradStripeQuantizedRequests[i][j])) || MpiFail("MPI_Irecv");
}
}
}
// Wait to receive all aggregated stripes and unquantize
for (size_t i = 0; i < inValues.size(); ++i)
{
m_mpi->Waitall((int)recvAggGradStripesQuantizedRequests[i].size(), recvAggGradStripesQuantizedRequests[i].data(), MPI_STATUSES_IGNORE) || MpiFail("MPI_Waitall");
GetQuantizer<ElemType>(m_preAggregatedGradientQuantizers[i]).UnquantizeAsync(GetQuantizedMatrix<ElemType>(*m_quantizedGradients[i]), *(outputValues[i]), false);
}
// Wait for all the unquantizations to finish
for (size_t i = 0; i < inValues.size(); ++i)
GetQuantizer<ElemType>(m_preAggregatedGradientQuantizers[i]).WaitUnquantizeAsyncDone();
// Wait for completion of the async send requests
for (int i = 0; i < sendGradStripesQuantizedRequests.size(); ++i)
{
if (sendGradStripesQuantizedRequests[i].size() > 0)
m_mpi->Waitall((int)sendGradStripesQuantizedRequests[i].size(), sendGradStripesQuantizedRequests[i].data(), MPI_STATUSES_IGNORE) || MpiFail("MPI_Waitall");
}
for (int i = 0; i < sendAggGradStripeQuantizedRequests.size(); ++i)
{
if (sendAggGradStripeQuantizedRequests[i].size() > 0)
m_mpi->Waitall((int)sendAggGradStripeQuantizedRequests[i].size(), sendAggGradStripeQuantizedRequests[i].data(), MPI_STATUSES_IGNORE) || MpiFail("MPI_Waitall");
}
}
// option for handling the mean for 1-bit quantization
// force 1-bit quant to threshold against 0 rather than the midpoint between lower and upper
const bool m_zeroThresholdFor1Bit;
// Number of bits that each gradient value is quantized to before communication with other nodes.
const size_t m_numQuantizationBits;
// Since the self-stripe in an all-reduce is not communicated, there is really no reason to
// quantize it for reduced communication. However, we add this as an option for for consistency
// across all stripes if desired
const bool m_useQuantizationForSelfStripe;
const std::unique_ptr<CUDAPageLockedMemAllocator> m_allocator;
// Buffer for quantized gradients.
vector<QuantizedMatrixBasePtr> m_quantizedGradients;
// Buffer for quantized stripes.
vector<vector<QuantizedMatrixBasePtr>> m_recvGradientStripesQuantized;
// Quantizers to quantize initial gradients.
vector<shared_ptr<MatrixQuantizerBase>> m_preAggregatedGradientQuantizers;
// Quantizers to quantize aggregated stripes.
vector<shared_ptr<MatrixQuantizerBase>> m_aggregatedGradientStripeQuantizers;
};
}

Просмотреть файл

@ -1,299 +0,0 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE.md file in the project root for full license information.
//
#pragma once
#undef _SCL_SECURE_NO_WARNINGS
#include "CNTKLibrary.h"
#include "Utils.h"
#include "IDistGradAggregator.h"
#include "CUDAPageLockedMemAllocator.h"
#include "QuantizedMatrix.h"
#include "MatrixQuantizer.h"
#include "MatrixQuantizerGPU.h"
#include <future>
#include "TimerUtility.h"
namespace Microsoft { namespace MSR { namespace CNTK {
// =======================================================================
// AllReduceDistGradAggregator -- 1-bit SGD.
// This implements
// Frank Seide, Hao Fu, Jasha Droppo, Gang Li, and Dong Yu:
// "1-bit stochastic gradient descent and its application to data-parallel distributed training of speech DNNs"
// In Proc. Interspeech 2014.
// =======================================================================
template <class ElemType>
class V2AllReduceDistGradAggregator : public IDistGradAggregator<ElemType>
{
UsingIDistGradAggregatorMembers;
static const int DEBUG_OUTPUT_TRACE_LEVEL = 3;
::CNTK::QuantizedDistributedCommunicatorPtr m_communicator;
public:
V2AllReduceDistGradAggregator(::CNTK::QuantizedDistributedCommunicatorPtr communicator, bool useAsyncAggregation, int traceLevel, int syncStatsTrace)
: IDistGradAggregator<ElemType>(nullptr), m_traceLevel(traceLevel), m_initialized(false), m_useAsyncAggregation(useAsyncAggregation), m_bufferedGradHeader(nullptr), m_syncStatsTrace(syncStatsTrace), m_iterationCount(0),
m_communicator(communicator)
{}
~V2AllReduceDistGradAggregator()
{
if (m_bufferedGradHeader != nullptr)
DistGradHeader::Destroy(m_bufferedGradHeader);
}
void Initialize(const std::vector<Matrix<ElemType>*>& gradients, int numEvalNodes)
{
// When called the first time let's setup the quantizers and matrices for holding quantized values.
// These can live for the lifetime of the aggregator since the gradient matrix dimensions for learnable parameters
// do not change
m_initialized = true;
int deviceId = gradients[0]->GetDeviceId();
for (size_t i = 0; i < gradients.size(); i++)
{
// Make sure none of the gradient matrices are sparse - we currently do not support aggregation of sparse gradient matrices
if (gradients[i]->GetMatrixType() != DENSE)
RuntimeError("Gradient aggregation for sparse gradient matrices is currently unsupported!");
if (m_useAsyncAggregation)
m_bufferedGradients[gradients[i]].reset(new Matrix<ElemType>(gradients[i]->GetNumRows(), gradients[i]->GetNumCols(), deviceId));
}
if (m_useAsyncAggregation)
{
m_bufferedGradHeader = DistGradHeader::Create(numEvalNodes);
m_bufferedGradHeader->Clear();
}
}
void ResetState(const std::vector<Matrix<ElemType>*>& gradients)
{
// If we are resetting state, let's clear previous quantization residues
// Make sure there is no pending async aggregation
if (m_useAsyncAggregation && m_pendingAsyncAggregation.valid())
LogicError("Unexpected pending async gradient aggregation found when resetting aggregator state!");
for (size_t i = 0; i < m_residuals.size(); ++i)
m_residuals[i]->SetValue(static_cast<ElemType>(0.0));
for (size_t i = 0; i < m_stripeResiduals.size(); ++i)
if (m_stripeResiduals[i])
m_stripeResiduals[i]->SetValue(static_cast<ElemType>(0.0));
// Zero out the buffered gradients if resetting state
if (m_useAsyncAggregation)
{
for (size_t i = 0; i < gradients.size(); i++)
m_bufferedGradients[gradients[i]]->SetValue(static_cast<ElemType>(0));
m_bufferedGradHeader->Clear();
}
}
// Aggregate the gradient matrices across all nodes
bool AggregateGradients(const std::vector<Matrix<ElemType>*>& gradients, DistGradHeader* headerCPU, bool resetState) override
{
if (!m_initialized)
Initialize(gradients, headerCPU->numEvalNode);
else if (resetState)
ResetState(gradients);
bool showSyncPerfStats = (m_syncStatsTrace > 0) && ((m_iterationCount % m_syncStatsTrace) == 0);
m_iterationCount++;
if (m_useAsyncAggregation)
{
// If we are performing async gradient aggregation, let's wait for the pending gradient aggregation to finish
// then swap the contents of the buffered gradients and the new gradient matrices and fire an async aggreagation
// of the new gradient matrices
if (m_pendingAsyncAggregation.valid())
{
Timer aggregationTimer;
if (showSyncPerfStats)
aggregationTimer.Start();
m_pendingAsyncAggregation.get();
if (showSyncPerfStats)
{
aggregationTimer.Stop();
double gradientAggregationTime = aggregationTimer.ElapsedSeconds();
fprintf(stderr, "Async gradient aggregation wait time: %.6g\n", gradientAggregationTime);
}
}
std::vector<Matrix<ElemType>*> newGradients;
size_t numGradMatrices = gradients.size();
for (size_t i = 0; i < numGradMatrices; i++)
{
Matrix<ElemType>* bufferedGradientMatrix = m_bufferedGradients[gradients[i]].get();
if ((bufferedGradientMatrix == nullptr) ||
(bufferedGradientMatrix->GetNumCols() != gradients[i]->GetNumCols()) ||
(bufferedGradientMatrix->GetNumRows() != gradients[i]->GetNumRows()) ||
(bufferedGradientMatrix->GetDeviceId() != gradients[i]->GetDeviceId()))
{
LogicError("No buffered gradient matrix found corresponding to a gradient matrix to be aggregated!");
}
// Swap the gradient matrix contents with the buffered matrices
std::swap(*(gradients[i]), *bufferedGradientMatrix);
newGradients.push_back(bufferedGradientMatrix);
}
// Swap the grad header contents with the buffered grad header
swap(*headerCPU, *m_bufferedGradHeader);
// Initiate aggregation only if any samples were processed in previous iteration
if (resetState || (headerCPU->numSamples != 0))
{
int deviceId = gradients[0]->GetDeviceId();
DistGradHeader* newGradHeader = m_bufferedGradHeader;
// Since we will be aggregating the gradients asynchronously, let us
// ensure that the gradient matrices have been computed before starting to aggregate
// them asynchronously on another thread. This essentially means that when we are using
// a GPU device, we will synchronize on the main GPU compute stream before starting
// the gradient aggregation asynchronously on a separate stream
MatrixComputeStreamEvent* mainStreamSyncEvent = MatrixComputeStreamEvent::Create(deviceId);
m_pendingAsyncAggregation = std::async(std::launch::async, [=] {
// We are starting on a new thread. Make sure the new thread is
// setup to use the right device
Matrix<ElemType>::SetDevice(deviceId);
// Synchronize the Quantization compute stream with the completion of
// compute of the gradient matrices on the main compute stream
mainStreamSyncEvent->SynchronizeQuantizationComputeStreamWithEvent<ElemType>();
delete mainStreamSyncEvent;
AggregateGradientsImpl(newGradients, newGradHeader, showSyncPerfStats);
});
return true;
}
return false;
}
else
{
AggregateGradientsImpl(gradients, headerCPU, showSyncPerfStats);
return (headerCPU->numSamples != 0);
}
}
void AggregateGradientsImpl(const std::vector<Matrix<ElemType>*>& gradients, DistGradHeader* headerCPU, bool showSyncPerfStats)
{
Timer aggregationTimer;
int deviceId = gradients[0]->GetDeviceId();
if (showSyncPerfStats)
{
std::unique_ptr<MatrixComputeStreamEvent> mainStreamSyncEvent(MatrixComputeStreamEvent::Create(deviceId));
mainStreamSyncEvent->SynchronizeEvent();
aggregationTimer.Start();
}
size_t numGradMatrices = gradients.size();
if (headerCPU->numSamples == 0)
{
assert(headerCPU->criterion == 0.0);
assert(headerCPU->numSamplesWithLabel == 0);
for (int i = 0; i < headerCPU->numEvalNode; ++i)
assert(headerCPU->evalErrors[i].first == 0 && headerCPU->evalErrors[i].second == 0);
// If the current node did not process any samples, the gradients should be zero'd
for (size_t i = 0; i < numGradMatrices; ++i)
gradients[i]->SetValue(static_cast<ElemType>(0));
if (m_useAsyncAggregation)
{
std::unique_ptr<MatrixComputeStreamEvent> mainStreamSyncEvent(MatrixComputeStreamEvent::Create(deviceId));
mainStreamSyncEvent->SynchronizeQuantizationComputeStreamWithEvent<ElemType>();
}
}
// Aggregate header.
size_t numberOfElements = 1 + 1 + 1 + headerCPU->numEvalNode * 2;
std::unique_ptr<double[]> headerBuffer(new double[numberOfElements]);
headerBuffer[0] = headerCPU->criterion;
headerBuffer[1] = static_cast<double>(headerCPU->numSamples);
headerBuffer[2] = static_cast<double>(headerCPU->numSamplesWithLabel);
for (size_t i = 0; i < headerCPU->numEvalNode; ++i)
{
headerBuffer[3 + 2 * i] = headerCPU->evalErrors[i].first;
headerBuffer[3 + 2 * i + 1] = static_cast<double>(headerCPU->evalErrors[i].second);
}
auto headerData = ::CNTK::MakeSharedObject<::CNTK::NDArrayView>(::CNTK::DataType::Double, ::CNTK::NDShape{ numberOfElements }, headerBuffer.get(), numberOfElements * sizeof(double), ::CNTK::DeviceDescriptor::CPUDevice());
std::vector<::CNTK::NDArrayViewPtr> valuesToAggregate{ headerData };
// TODO: Should be async
m_communicator->AggregateInPlace(valuesToAggregate, m_communicator->Workers());
// Copy data back to the header
headerCPU->criterion = headerBuffer[0];
headerCPU->numSamples = static_cast<size_t>(headerBuffer[1]);
headerCPU->numSamplesWithLabel = static_cast<size_t>(headerBuffer[2]);
for (size_t i = 0; i < headerCPU->numEvalNode; ++i)
{
headerCPU->evalErrors[i].first = headerBuffer[3 + 2 * i];
headerCPU->evalErrors[i].second = static_cast<size_t>(headerBuffer[3 + 2 * i + 1]);
}
// Aggregate gradients.
std::vector<::CNTK::NDArrayViewPtr> gradientValues;
for (size_t i = 0; i < gradients.size(); ++i)
{
assert(gradients[i]->Data() != nullptr);
::CNTK::NDShape shape{ gradients[i]->GetNumRows(), gradients[i]->GetNumCols() };
auto data = ::CNTK::MakeSharedObject<::CNTK::NDArrayView>(::CNTK::AsDataType<ElemType>(), shape, gradients[i]->Data(), gradients[i]->GetNumElements() * sizeof(ElemType), ::CNTK::AsDeviceDescriptor(gradients[i]->GetDeviceId()));
gradientValues.push_back(data);
}
m_communicator->QuantizedAggregateInPlace(
gradientValues,
m_residuals,
m_stripeResiduals,
m_communicator->Workers());
if (showSyncPerfStats)
{
aggregationTimer.Stop();
double gradientAggregationTime = aggregationTimer.ElapsedSeconds();
fprintf(stderr, "Actual gradient aggregation time: %.6g\n", gradientAggregationTime);
}
}
private:
// Perform asynchronous gradient aggregation using double buffering of the gradient matrices
bool m_useAsyncAggregation;
// Future corresponding to the current in-flight async gradient aggregation
std::future<void> m_pendingAsyncAggregation;
// Buffered gradients that we asynchronously aggregate
std::unordered_map<Matrix<ElemType>*, std::unique_ptr<Matrix<ElemType>>> m_bufferedGradients;
DistGradHeader* m_bufferedGradHeader;
int m_traceLevel;
int m_syncStatsTrace;
// Only used for controlling frequency of measuring/showing gradient aggregation perf stats
size_t m_iterationCount;
bool m_initialized;
// Residuals of quantized gradients.
std::vector<::CNTK::NDArrayViewPtr> m_residuals;
// Residuals of quantized aggregated stripes this node is responsible for.
std::vector<::CNTK::NDArrayViewPtr> m_stripeResiduals;
};
} } }

Просмотреть файл

@ -1,361 +0,0 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE.md file in the project root for full license information.
//
#pragma once
#include "../SGDLib/MASGD.h"
#include <map>
#include <string>
#include <memory>
namespace Microsoft { namespace MSR { namespace CNTK {
// Implementation of Blockwise Model Update and Filtering (BMUF, a.k.a. block momentum)
// For detail, see the following paper
// Kai Chen and Qiang Huo, "Scalable training of deep learning machines by incremental block training
// with intra-block parallel optimization and blockwise model-update filtering",
// in International Conference on Acoustics, Speech and Signal Processing , March 2016, Shanghai, China.
template<typename ElemType>
class V2BlockMomentumSGD : public IMASGD<ElemType>
{
typedef IMASGD<ElemType> Base;
using Base::m_deviceId;
using Base::DownCast;
bool m_resetSGDMomentumAfterAggregation;
bool m_useNesterovMomentum;
double m_blockLearningRate;
double m_blockMomentumAsTimeConstantPerWorker;
size_t m_syncPeriodPerWorker;
::CNTK::DistributedCommunicatorPtr m_communicator;
bool m_someWorkerHasFinished;
// parameters at the last model aggregation point
std::map<std::wstring, std::shared_ptr<Matrix<ElemType>>> m_prevParameters;
std::map<std::wstring, std::shared_ptr<Matrix<ElemType>>> m_blockLevelSmoothedGradient;
public:
V2BlockMomentumSGD(const MPIWrapperPtr& pMPI,
::CNTK::DistributedCommunicatorPtr communicator,
size_t reportFrequency,
DEVICEID_TYPE deviceId,
bool useNestrovMomentum,
bool resetSGDM,
double blockLearningRate,
double blockMomentumAsTimeConstant,
size_t syncPeriod)
: IMASGD<ElemType>(pMPI, reportFrequency, deviceId),
m_communicator(communicator),
m_useNesterovMomentum(useNestrovMomentum),
m_resetSGDMomentumAfterAggregation(resetSGDM),
m_blockLearningRate(blockLearningRate),
m_blockMomentumAsTimeConstantPerWorker(blockMomentumAsTimeConstant / communicator->Workers().size())
{
m_syncPeriodPerWorker = syncPeriod / communicator->Workers().size();
if (m_syncPeriodPerWorker == 0)
InvalidArgument("Sync period is too small.");
}
void OnEpochStart(const std::list<ComputationNodeBasePtr>& learnableNodes) override
{
m_someWorkerHasFinished = false;
for (auto& n : learnableNodes)
{
auto node = DownCast(n);
std::wstring name = node->NodeName();
Matrix<ElemType>& value = node->Value();
if (m_blockLevelSmoothedGradient.find(name) == m_blockLevelSmoothedGradient.end())
{
// has not been initialized yet
auto pSmoothedGrad = make_shared<Matrix<ElemType>> (value.GetDeviceId());
pSmoothedGrad->Resize(value.GetNumRows(), value.GetNumCols());
pSmoothedGrad->SetValue((ElemType)0);
m_blockLevelSmoothedGradient[name] = pSmoothedGrad;
}
if (m_prevParameters.find(name) == m_prevParameters.end())
{
auto newValue = make_shared<Matrix<ElemType>>(value.GetDeviceId());
newValue->SetValue(value);
m_prevParameters[name] = newValue;
}
else
{
m_prevParameters[name]->SetValue(value);
}
}
fprintf(stderr, "Parallel training (%d workers) using BlockMomentumSGD with "
"block momentum = %6.4f, "
"block momentum time constant (per worker) = %6.4f, "
"block learning rate = %6.4f, "
"block size per worker = %d samples, "
"%s"
"%s"
"\n",
(int)m_communicator->Workers().size(),
BlockMomentumSGD<double>::TimeConstant2Momentum(m_blockMomentumAsTimeConstantPerWorker, m_syncPeriodPerWorker),
m_blockMomentumAsTimeConstantPerWorker,
m_blockLearningRate,
(int)m_syncPeriodPerWorker,
m_useNesterovMomentum ? "using Nesterov-style block momentum, " : "" ,
m_resetSGDMomentumAfterAggregation ? "resetting SGD momentum after sync." : ".");
}
bool OnArrivingAtSyncPoint(
const std::list<ComputationNodeBasePtr>& learnableNodes, /* input/output: */
std::list<Matrix<ElemType>>& smoothedGradient, /* input/output: under some setup, it will reset to zero*/
size_t samplesSinceLastSync /* input: samples processed since last sync on this worker only */
) override
{
if (m_someWorkerHasFinished)
return false;
// Let's check the status.
double statusValue = 0;
auto status = ::CNTK::MakeSharedObject<::CNTK::NDArrayView>(::CNTK::DataType::Double, ::CNTK::NDShape{ 1 }, &statusValue, sizeof(double), ::CNTK::DeviceDescriptor::CPUDevice());
std::vector<::CNTK::NDArrayViewPtr> aggregatedStatus { status };
m_communicator->AggregateInPlace(aggregatedStatus, m_communicator->Workers());
if (statusValue > 0)
{
m_someWorkerHasFinished = true;
return false;
}
// Otherwise let update the weights.
float secondsOnCommunication = 0.0f;
size_t totalSamples = 0;
ModelAggregationProcessing(samplesSinceLastSync, learnableNodes, smoothedGradient, totalSamples, secondsOnCommunication);
return true;
}
/*virtual*/ void OnEpochEnd(const std::list<ComputationNodeBasePtr>& learnableNodes,
std::list<Matrix<ElemType>>& smoothedGradient,
size_t samplesSinceLastSync) override
{
if (!m_someWorkerHasFinished)
{
// Let's update the other guys that we have finished.
m_someWorkerHasFinished = true;
double statusValue = 1;
auto status = ::CNTK::MakeSharedObject<::CNTK::NDArrayView>(::CNTK::DataType::Double, ::CNTK::NDShape{ 1 }, &statusValue, sizeof(double), ::CNTK::DeviceDescriptor::CPUDevice());
std::vector<::CNTK::NDArrayViewPtr> aggregatedStatus{ status };
m_communicator->AggregateInPlace(aggregatedStatus, m_communicator->Workers());
}
// Let's update our weights no matter what.
float secondsOnCommunication = 0.0f;
size_t totalSamples = 0;
ModelAggregationProcessing(samplesSinceLastSync, learnableNodes, smoothedGradient, totalSamples, secondsOnCommunication);
}
/*virtual*/ void ModelAggregationProcessing(
size_t /*samplesSinceLastSync*/,
const std::list<ComputationNodeBasePtr>& learnableNodes,
std::list<Matrix<ElemType>>& smoothedGradient,
size_t& /*totalSamplesProcessed*/, /* out */
float& secondsOnCommunication /* out */
) override
{
ElemType blockMomentum = (ElemType)BlockMomentumSGD<double>::TimeConstant2Momentum(m_blockMomentumAsTimeConstantPerWorker, m_syncPeriodPerWorker);
Timer commTimer;
secondsOnCommunication = 0.0f;
// 1. Let's aggregate weights
std::map<std::wstring, std::shared_ptr<Matrix<ElemType>>> aggregatedWeights;
std::vector<::CNTK::NDArrayViewPtr> aggregatedWeightsPrepared;
for (auto& pBaseNode : learnableNodes)
{
if (!pBaseNode->IsParameterUpdateRequired())
continue;
wstring name = pBaseNode->NodeName();
auto pNode = DownCast(pBaseNode);
// Get current model
Matrix<ElemType>& prevWeight = *m_prevParameters[name]; // prev model value
Matrix<ElemType>& currentWeight = pNode->Value(); // current model
// Subtract it from the previous model
auto blockGrad = std::make_shared<Matrix<ElemType>>(prevWeight, CPUDEVICE);
*blockGrad -= currentWeight; // matW becomes local block gradient (of one worker)
aggregatedWeights[name] = blockGrad;
::CNTK::NDShape shape{ blockGrad->GetNumElements() };
auto data = ::CNTK::MakeSharedObject<::CNTK::NDArrayView>(::CNTK::AsDataType<ElemType>(), shape, blockGrad->Data(), blockGrad->GetNumElements() * sizeof(ElemType), ::CNTK::AsDeviceDescriptor(blockGrad->GetDeviceId()));
aggregatedWeightsPrepared.push_back(data);
}
// Send block gradient over MPI nodes.
m_communicator->AggregateInPlace(aggregatedWeightsPrepared, m_communicator->Workers());
// 2. Let's update the model
for (auto& pBaseNode : learnableNodes)
{
if (!pBaseNode->IsParameterUpdateRequired())
continue;
wstring name = pBaseNode->NodeName();
auto pNode = DownCast(pBaseNode);
// 2 block gradient aggregation
// 2.1. get current model
Matrix<ElemType>& prevWeight = *m_prevParameters[name]; // prev model value
Matrix<ElemType>& currentWeight = pNode->Value(); // current model
auto blockGrad = aggregatedWeights[name];
// 2.2. model update
{
Matrix<ElemType>& sg = *m_blockLevelSmoothedGradient[name]; // smoothed gradient
blockGrad->TransferToDeviceIfNotThere(sg.GetDeviceId());
// 2.2.1 update block level smoothed gradient;
// This is essentially a first-order infinite impulse response (IIR) filter with the gain (1 - blockMomentum)*m_blockLearningRate:
// smoothedGradient(t)=blockMomentum * smoothedGradients(t-1) + (1 - blockMomentum)*m_blockLearningRate*blockGrad(t)
Matrix<ElemType>::ScaleAndAdd((ElemType)((1 - blockMomentum)*m_blockLearningRate), *blockGrad, (ElemType)blockMomentum, sg);
// 2.2.2 update parameters;
currentWeight.SetValue(prevWeight);
currentWeight -= sg;
// 2.2.3 Nesterov Momentum
// A Nesterov momentum here is to do a partial weight update before calculating the gradient, i.e.,
// (step 1) w(t) <-- w(t) - \eta* v(t)
// (step 2) g(t+1) <-- forwardbackward on minibatches with initial model as w(t)
// (step 3) v(t+1) <-- \eta*v(t) + (1-\eta)*learningRate*g(t+1)
// (step 4) w(t+1) <-- w(t)-v(t)
// (step 5) t <-- t+1
// without step 1, this becomes stanard momentum
if (m_useNesterovMomentum)
{
Matrix<ElemType>::ScaleAndAdd((ElemType)-blockMomentum, sg, currentWeight);
}
// 2.2.4 update bookkeeping
prevWeight.SetValue(currentWeight);
}
}
//----------------------------------------
// 3. reset SGD momentum if necessary
//----------------------------------------
if (m_resetSGDMomentumAfterAggregation)
{
for (Matrix<ElemType>& x : smoothedGradient)
{
x.SetValue((ElemType)0);
}
}
}
void SaveToCheckPoint(File& fstream) override
{
if (!m_communicator->CurrentWorker().IsMain())
return;
fstream.PutMarker(FileMarker::fileMarkerBeginSection, L"BMACKP");
fstream.PutMarker(FileMarker::fileMarkerBeginSection, L"BOptions");
fstream << m_resetSGDMomentumAfterAggregation;
fstream.PutMarker(FileMarker::fileMarkerEndSection, L"EOptions");
fstream.PutMarker(FileMarker::fileMarkerBeginSection, L"BMomentumAsTimeConstant");
fstream << m_blockMomentumAsTimeConstantPerWorker;
fstream.PutMarker(FileMarker::fileMarkerEndSection, L"EMomentumAsTimeConstant");
fstream.PutMarker(FileMarker::fileMarkerBeginSection, L"BSyncPeriodInSamples");
fstream << m_syncPeriodPerWorker;
fstream.PutMarker(FileMarker::fileMarkerEndSection, L"ESyncPeriodInSamples");
fstream.PutMarker(FileMarker::fileMarkerBeginSection, L"BParam");
SaveParameters(fstream, m_prevParameters);
SaveParameters(fstream, m_blockLevelSmoothedGradient);
fstream.PutMarker(FileMarker::fileMarkerBeginSection, L"EParam");
fstream.PutMarker(FileMarker::fileMarkerBeginSection, L"EMACKP");
}
void LoadFromCheckPoint(File& fstream) override
{
if (!fstream.TryGetMarker(FileMarker::fileMarkerBeginSection, L"BMACKP"))
return;
fstream.GetMarker(FileMarker::fileMarkerBeginSection, L"BOptions");
fstream >> m_resetSGDMomentumAfterAggregation;
fstream.GetMarker(FileMarker::fileMarkerEndSection, L"EOptions");
fstream.GetMarker(FileMarker::fileMarkerBeginSection, L"BMomentumAsTimeConstant");
fstream >> m_blockMomentumAsTimeConstantPerWorker;
fstream.GetMarker(FileMarker::fileMarkerEndSection, L"EMomentumAsTimeConstant");
fstream.GetMarker(FileMarker::fileMarkerBeginSection, L"BSyncPeriodInSamples");
fstream >> m_syncPeriodPerWorker;
fstream.GetMarker(FileMarker::fileMarkerEndSection, L"ESyncPeriodInSamples");
fstream.GetMarker(FileMarker::fileMarkerBeginSection, L"BParam");
LoadParameters(fstream, m_prevParameters, m_deviceId);
LoadParameters(fstream, m_blockLevelSmoothedGradient, m_deviceId);
fstream.GetMarker(FileMarker::fileMarkerBeginSection, L"EParam");
fstream.GetMarker(FileMarker::fileMarkerEndSection, L"EMACKP");
}
private:
// helper function to save/load map<wstring, shared_ptr<Matrix<ElemType>> structure
void SaveParameters(File& f, const map<wstring, shared_ptr<Matrix<ElemType>>>& parameters) const
{
// save sizeof(ElemType)
unsigned int size = sizeof(ElemType);
f << size;
// save number of pairs
unsigned int numPairs = parameters.size();
f << numPairs;
for (auto& x : parameters)
{
f << x.first;
f << *x.second;
}
f.Flush();
return;
}
void LoadParameters(File& f, map<wstring, shared_ptr<Matrix<ElemType>>>& parameters, DEVICEID_TYPE deviceID)
{
unsigned int size = 0;
unsigned int pair = 0;
f >> size;
f >> pair;
if (size != sizeof(ElemType))
{
LogicError("Mismatched ElemType in loading BlockMomentumSGD checkpoint. Expecting %s, while loading element size=%d\n",
sizeof(ElemType) == 4 ? "float" : "double",
size
);
}
parameters.clear();
for (size_t i = 0; i < pair; i++)
{
wstring name;
f >> name;
shared_ptr<Matrix<ElemType>> mat = make_shared<Matrix<ElemType>>(deviceID);
f >> *mat;
parameters[name] = mat;
}
}
public:
static double TimeConstant2Momentum(double timeConstant, size_t syncPeroid)
{
return exp(-((double)syncPeroid) / timeConstant);
}
static double Momentum2TimeConstant(double bm, size_t syncPeroid)
{
if (bm >= 1.0 || bm < 0.0)
{
InvalidArgument("Unexpected block momentum (%.2f). Block momentum should be in the range of [0,1)\n", bm);
}
return -(double)syncPeroid / log(bm);
}
};
} } }

Просмотреть файл

@ -1,938 +0,0 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE.md file in the project root for full license information.
//
// fileutil.h - file I/O with error checking
//
#pragma once
#ifndef _FILEUTIL_
#define _FILEUTIL_
#include "Basics.h"
#ifdef __WINDOWS__
#ifndef NOMINMAX
#define NOMINMAX
#endif // NOMINMAX
#include "Windows.h" // for mmreg.h and FILETIME
#include <mmreg.h>
#endif
#ifdef __unix__
#include <sys/types.h>
#include <sys/stat.h>
#endif
#include <algorithm> // for std::find
#include <vector>
#include <string>
#include <map>
#include <functional>
#include <cctype>
#include <errno.h>
#include <stdint.h>
#include <assert.h>
#include <string.h> // for strerror()
#include <stdexcept> // for exception
#include <fcntl.h>
#define FCLOSE_SUCCESS 0
/* guoye: start */
/*
#include "basetypes.h" //for attemp()
#include "ProgressTracing.h"
#include <unistd.h>
#include <glob.h>
#include <dirent.h>
#include <sys/sendfile.h>
#include <stdio.h>
#include <ctype.h>
#include <limits.h>
#include <memory>
#include <cwctype>
*/
// using namespace Microsoft::MSR::CNTK;
/* guoye: end */
// ----------------------------------------------------------------------------
// fopenOrDie(): like fopen() but terminate with err msg in case of error.
// A pathname of "-" returns stdout or stdin, depending on mode, and it will
// change the binary mode if 'b' or 't' are given. If you use this, make sure
// not to fclose() such a handle.
// ----------------------------------------------------------------------------
FILE* fopenOrDie(const std::string& pathname, const char* mode);
FILE* fopenOrDie(const std::wstring& pathname, const wchar_t* mode);
#ifndef __unix__
// ----------------------------------------------------------------------------
// fsetmode(): set mode to binary or text
// ----------------------------------------------------------------------------
void fsetmode(FILE* f, char type);
#endif
// ----------------------------------------------------------------------------
// freadOrDie(): like fread() but terminate with err msg in case of error
// ----------------------------------------------------------------------------
void freadOrDie(void* ptr, size_t size, size_t count, FILE* f);
#ifdef _WIN32
void freadOrDie(void* ptr, size_t size, size_t count, const HANDLE f);
#endif
template <class _T>
void freadOrDie(_T& data, int num, FILE* f) // template for std::vector<>
{
data.resize(num);
if (data.size() > 0)
freadOrDie(&data[0], sizeof(data[0]), data.size(), f);
}
template <class _T>
void freadOrDie(_T& data, size_t num, FILE* f) // template for std::vector<>
{
data.resize(num);
if (data.size() > 0)
freadOrDie(&data[0], sizeof(data[0]), data.size(), f);
}
#ifdef _WIN32
template <class _T>
void freadOrDie(_T& data, int num, const HANDLE f) // template for std::vector<>
{
data.resize(num);
if (data.size() > 0)
freadOrDie(&data[0], sizeof(data[0]), data.size(), f);
}
template <class _T>
void freadOrDie(_T& data, size_t num, const HANDLE f) // template for std::vector<>
{
data.resize(num);
if (data.size() > 0)
freadOrDie(&data[0], sizeof(data[0]), data.size(), f);
}
#endif
// ----------------------------------------------------------------------------
// fwriteOrDie(): like fwrite() but terminate with err msg in case of error
// ----------------------------------------------------------------------------
void fwriteOrDie(const void* ptr, size_t size, size_t count, FILE* f);
#ifdef _WIN32
void fwriteOrDie(const void* ptr, size_t size, size_t count, const HANDLE f);
#endif
template <class _T>
void fwriteOrDie(const _T& data, FILE* f) // template for std::vector<>
{
if (data.size() > 0)
fwriteOrDie(&data[0], sizeof(data[0]), data.size(), f);
}
#ifdef _WIN32
template <class _T>
void fwriteOrDie(const _T& data, const HANDLE f) // template for std::vector<>
{
if (data.size() > 0)
fwriteOrDie(&data[0], sizeof(data[0]), data.size(), f);
}
#endif
// ----------------------------------------------------------------------------
// fprintfOrDie(): like fprintf() but terminate with err msg in case of error
// ----------------------------------------------------------------------------
void fprintfOrDie(FILE* f, const char* format, ...);
// ----------------------------------------------------------------------------
// fcloseOrDie(): like fclose() but terminate with err msg in case of error
// not yet implemented, but we should
// ----------------------------------------------------------------------------
#define fcloseOrDie fclose
// ----------------------------------------------------------------------------
// fflushOrDie(): like fflush() but terminate with err msg in case of error
// ----------------------------------------------------------------------------
void fflushOrDie(FILE* f);
// ----------------------------------------------------------------------------
// filesize(): determine size of the file in bytes
// ----------------------------------------------------------------------------
size_t filesize(const wchar_t* pathname);
size_t filesize(FILE* f);
int64_t filesize64(const wchar_t* pathname);
// ----------------------------------------------------------------------------
// fseekOrDie(),ftellOrDie(), fget/setpos(): seek functions with error handling
// ----------------------------------------------------------------------------
// 32-bit offsets only
long fseekOrDie(FILE* f, long offset, int mode = SEEK_SET);
#define ftellOrDie ftell
// ----------------------------------------------------------------------------
// fget/setpos(): seek functions with error handling
// ----------------------------------------------------------------------------
uint64_t fgetpos(FILE* f);
void fsetpos(FILE* f, uint64_t pos);
// ----------------------------------------------------------------------------
// unlinkOrDie(): unlink() with error handling
// ----------------------------------------------------------------------------
void unlinkOrDie(const std::string& pathname);
void unlinkOrDie(const std::wstring& pathname);
// ----------------------------------------------------------------------------
// renameOrDie(): rename() with error handling
// ----------------------------------------------------------------------------
void renameOrDie(const std::string& from, const std::string& to);
void renameOrDie(const std::wstring& from, const std::wstring& to);
// ----------------------------------------------------------------------------
// copyOrDie(): copy file with error handling.
// ----------------------------------------------------------------------------
void copyOrDie(const std::string& from, const std::string& to);
void copyOrDie(const std::wstring& from, const std::wstring& to);
// ----------------------------------------------------------------------------
// fexists(): test if a file exists
// ----------------------------------------------------------------------------
bool fexists(const char* pathname);
bool fexists(const wchar_t* pathname);
inline bool fexists(const std::string& pathname)
{
return fexists(pathname.c_str());
}
inline bool fexists(const std::wstring& pathname)
{
return fexists(pathname.c_str());
}
// ----------------------------------------------------------------------------
// funicode(): test if a file uses unicode
// ----------------------------------------------------------------------------
bool funicode(FILE* f);
// ----------------------------------------------------------------------------
// fskipspace(): skip space characters
// ----------------------------------------------------------------------------
bool fskipspace(FILE* F);
bool fskipwspace(FILE* F);
// ----------------------------------------------------------------------------
// fgetline(): like fgets() but terminate with err msg in case of error;
// removes the newline character at the end (like gets()), returned buffer is
// always 0-terminated; has second version that returns an STL std::string instead
// fgetstring(): read a 0-terminated std::string (terminate if error)
// fgetword(): read a space-terminated token (terminate if error)
// fskipNewLine(): skip all white space until end of line incl. the newline
// ----------------------------------------------------------------------------
// ----------------------------------------------------------------------------
// fputstring(): write a 0-terminated std::string (terminate if error)
// ----------------------------------------------------------------------------
void fputstring(FILE* f, const char*);
void fputstring(const HANDLE f, const char* str);
void fputstring(FILE* f, const std::string&);
void fputstring(FILE* f, const wchar_t*);
void fputstring(FILE* f, const std::wstring&);
template <class CHAR>
CHAR* fgetline(FILE* f, CHAR* buf, int size);
template <class CHAR, size_t n>
CHAR* fgetline(FILE* f, CHAR(&buf)[n])
{
/* guoye: start */
// fprintf(stderr, "\n fileutil.h: fgetline(FILE* f, CHAR(&buf)[n]): debug 0\n");
return fgetline(f, buf, n);
/* guoye: end */
}
std::string fgetline(FILE* f);
std::wstring fgetlinew(FILE* f);
void fgetline(FILE* f, std::string& s, std::vector<char>& buf);
void fgetline(FILE* f, std::wstring& s, std::vector<char>& buf);
void fgetline(FILE* f, std::vector<char>& buf);
void fgetline(FILE* f, std::vector<wchar_t>& buf);
const char* fgetstring(FILE* f, char* buf, int size);
template <size_t n>
const char* fgetstring(FILE* f, char(&buf)[n])
{
return fgetstring(f, buf, n);
}
const char* fgetstring(const HANDLE f, char* buf, int size);
template <size_t n>
const char* fgetstring(const HANDLE f, char(&buf)[n])
{
return fgetstring(f, buf, n);
}
const wchar_t* fgetstring(FILE* f, wchar_t* buf, int size);
std::wstring fgetwstring(FILE* f);
std::string fgetstring(FILE* f);
const char* fgettoken(FILE* f, char* buf, int size);
template <size_t n>
const char* fgettoken(FILE* f, char(&buf)[n])
{
return fgettoken(f, buf, n);
}
std::string fgettoken(FILE* f);
const wchar_t* fgettoken(FILE* f, wchar_t* buf, int size);
std::wstring fgetwtoken(FILE* f);
int fskipNewline(FILE* f, bool skip = true);
// ----------------------------------------------------------------------------
// fputstring(): write a 0-terminated std::string (terminate if error)
// ----------------------------------------------------------------------------
void fputstring(FILE* f, const char*);
#ifdef _WIN32
void fputstring(const HANDLE f, const char* str);
#endif
void fputstring(FILE* f, const std::string&);
void fputstring(FILE* f, const wchar_t*);
void fputstring(FILE* f, const std::wstring&);
// ----------------------------------------------------------------------------
// fgetTag(): read a 4-byte tag & return as a std::string
// ----------------------------------------------------------------------------
std::string fgetTag(FILE* f);
// ----------------------------------------------------------------------------
// fcheckTag(): read a 4-byte tag & verify it; terminate if wrong tag
// ----------------------------------------------------------------------------
void fcheckTag(FILE* f, const char* expectedTag);
#ifdef _WIN32
void fcheckTag(const HANDLE f, const char* expectedTag);
#endif
void fcheckTag_ascii(FILE* f, const std::string& expectedTag);
// ----------------------------------------------------------------------------
// fcompareTag(): compare two tags; terminate if wrong tag
// ----------------------------------------------------------------------------
void fcompareTag(const std::string& readTag, const std::string& expectedTag);
// ----------------------------------------------------------------------------
// fputTag(): write a 4-byte tag
// ----------------------------------------------------------------------------
void fputTag(FILE* f, const char* tag);
#ifdef _WIN32
void fputTag(const HANDLE f, const char* tag);
#endif
// ----------------------------------------------------------------------------
// fskipstring(): skip a 0-terminated std::string, such as a pad std::string
// ----------------------------------------------------------------------------
void fskipstring(FILE* f);
// ----------------------------------------------------------------------------
// fpad(): write a 0-terminated std::string to pad file to a n-byte boundary
// ----------------------------------------------------------------------------
void fpad(FILE* f, int n);
// ----------------------------------------------------------------------------
// fgetbyte(): read a byte value
// ----------------------------------------------------------------------------
char fgetbyte(FILE* f);
// ----------------------------------------------------------------------------
// fgetshort(): read a short value
// ----------------------------------------------------------------------------
short fgetshort(FILE* f);
short fgetshort_bigendian(FILE* f);
// ----------------------------------------------------------------------------
// fgetint24(): read a 3-byte (24-bit) int value
// ----------------------------------------------------------------------------
int fgetint24(FILE* f);
// ----------------------------------------------------------------------------
// fgetint(): read an int value
// ----------------------------------------------------------------------------
int fgetint(FILE* f);
#ifdef _WIN32
int fgetint(const HANDLE f);
#endif
int fgetint_bigendian(FILE* f);
int fgetint_ascii(FILE* f);
// ----------------------------------------------------------------------------
// fgetlong(): read an long value
// ----------------------------------------------------------------------------
long fgetlong(FILE* f);
// ----------------------------------------------------------------------------
// fgetfloat(): read a float value
// ----------------------------------------------------------------------------
float fgetfloat(FILE* f);
float fgetfloat_bigendian(FILE* f);
float fgetfloat_ascii(FILE* f);
// ----------------------------------------------------------------------------
// fgetdouble(): read a double value
// ----------------------------------------------------------------------------
double fgetdouble(FILE* f);
#ifdef _WIN32
// ----------------------------------------------------------------------------
// fgetwav(): read an entire .wav file
// ----------------------------------------------------------------------------
void fgetwav(FILE* f, std::vector<short>& wav, int& sampleRate);
void fgetwav(const std::wstring& fn, std::vector<short>& wav, int& sampleRate);
// ----------------------------------------------------------------------------
// fputwav(): save data into a .wav file
// ----------------------------------------------------------------------------
void fputwav(FILE* f, const std::vector<short>& wav, int sampleRate, int nChannels = 1);
void fputwav(const std::wstring& fn, const std::vector<short>& wav, int sampleRate, int nChannels = 1);
#endif
// ----------------------------------------------------------------------------
// fputbyte(): write a byte value
// ----------------------------------------------------------------------------
void fputbyte(FILE* f, char val);
// ----------------------------------------------------------------------------
// fputshort(): write a short value
// ----------------------------------------------------------------------------
void fputshort(FILE* f, short val);
// ----------------------------------------------------------------------------
// fputint24(): write a 3-byte (24-bit) int value
// ----------------------------------------------------------------------------
void fputint24(FILE* f, int v);
// ----------------------------------------------------------------------------
// fputint(): write an int value
// ----------------------------------------------------------------------------
void fputint(FILE* f, int val);
// ----------------------------------------------------------------------------
// fputlong(): write an long value
// ----------------------------------------------------------------------------
void fputlong(FILE* f, long val);
#ifdef _WIN32
void fputint(const HANDLE f, int v);
#endif
// ----------------------------------------------------------------------------
// fputfloat(): write a float value
// ----------------------------------------------------------------------------
void fputfloat(FILE* f, float val);
// ----------------------------------------------------------------------------
// fputdouble(): write a double value
// ----------------------------------------------------------------------------
void fputdouble(FILE* f, double val);
// template versions of put/get functions for binary files
template <typename T>
void fput(FILE* f, T v)
{
fwriteOrDie(&v, sizeof(v), 1, f);
}
// template versions of put/get functions for binary files
template <typename T>
void fget(FILE* f, T& v)
{
freadOrDie((void*) &v, sizeof(v), 1, f);
}
// GetFormatString - get the format std::string for a particular type
template <typename T>
const wchar_t* GetFormatString(T /*t*/)
{
// if this _ASSERT goes off it means that you are using a type that doesn't have
// a read and/or write routine.
// If the type is a user defined class, you need to create some global functions that handles file in/out.
// for example:
// File& operator>>(File& stream, MyClass& test);
// File& operator<<(File& stream, MyClass& test);
//
// in your class you will probably want to add these functions as friends so you can access any private members
// friend File& operator>>(File& stream, MyClass& test);
// friend File& operator<<(File& stream, MyClass& test);
//
// if you are using wchar_t* or char* types, these use other methods because they require buffers to be passed
// either use std::string and std::wstring, or use the WriteString() and ReadString() methods
assert(false); // need a specialization
return NULL;
}
// GetFormatString - specalizations to get the format std::string for a particular type
template <>
const wchar_t* GetFormatString(char);
template <>
const wchar_t* GetFormatString(wchar_t);
template <>
const wchar_t* GetFormatString(short);
template <>
const wchar_t* GetFormatString(int);
template <>
const wchar_t* GetFormatString(long);
template <>
const wchar_t* GetFormatString(unsigned short);
template <>
const wchar_t* GetFormatString(unsigned int);
template <>
const wchar_t* GetFormatString(unsigned long);
template <>
const wchar_t* GetFormatString(float);
template <>
const wchar_t* GetFormatString(double);
template <>
const wchar_t* GetFormatString(unsigned long long);
template <>
const wchar_t* GetFormatString(long long);
template <>
const wchar_t* GetFormatString(const char*);
template <>
const wchar_t* GetFormatString(const wchar_t*);
// GetScanFormatString - get the format std::string for a particular type
template <typename T>
const wchar_t* GetScanFormatString(T)
{
assert(false); // need a specialization
return NULL;
}
// GetScanFormatString - specalizations to get the format std::string for a particular type
template <>
const wchar_t* GetScanFormatString(char);
template <>
const wchar_t* GetScanFormatString(wchar_t);
template <>
const wchar_t* GetScanFormatString(short);
template <>
const wchar_t* GetScanFormatString(int);
template <>
const wchar_t* GetScanFormatString(long);
template <>
const wchar_t* GetScanFormatString(unsigned short);
template <>
const wchar_t* GetScanFormatString(unsigned int);
template <>
const wchar_t* GetScanFormatString(unsigned long);
template <>
const wchar_t* GetScanFormatString(float);
template <>
const wchar_t* GetScanFormatString(double);
template <>
const wchar_t* GetScanFormatString(unsigned long long);
template <>
const wchar_t* GetScanFormatString(long long);
// ----------------------------------------------------------------------------
// fgetText(): get a value from a text file
// ----------------------------------------------------------------------------
template <typename T>
void fgetText(FILE* f, T& v)
{
int rc = ftrygetText(f, v);
if (rc == 0)
Microsoft::MSR::CNTK::RuntimeError("error reading value from file (invalid format)");
else if (rc == EOF)
Microsoft::MSR::CNTK::RuntimeError("error reading from file: %s", strerror(errno));
assert(rc == 1);
}
// version to try and get a std::string, and not throw exceptions if contents don't match
template <typename T>
int ftrygetText(FILE* f, T& v)
{
const wchar_t* formatString = GetScanFormatString<T>(v);
int rc = fwscanf(f, formatString, &v);
assert(rc == 1 || rc == 0);
return rc;
}
template <>
int ftrygetText<bool>(FILE* f, bool& v);
// ----------------------------------------------------------------------------
// fgetText() specializations for fwscanf_s differences: get a value from a text file
// ----------------------------------------------------------------------------
void fgetText(FILE* f, char& v);
void fgetText(FILE* f, wchar_t& v);
// ----------------------------------------------------------------------------
// fputText(): write a value out as text
// ----------------------------------------------------------------------------
template <typename T>
void fputText(FILE* f, T v)
{
const wchar_t* formatString = GetFormatString(v);
int rc = fwprintf(f, formatString, v);
if (rc == 0)
Microsoft::MSR::CNTK::RuntimeError("error writing value to file, no values written");
else if (rc < 0)
Microsoft::MSR::CNTK::RuntimeError("error writing to file: %s", strerror(errno));
}
// ----------------------------------------------------------------------------
// fputText(): write a bool out as character
// ----------------------------------------------------------------------------
template <>
void fputText<bool>(FILE* f, bool v);
// ----------------------------------------------------------------------------
// fputfile(): write a binary block or a std::string as a file
// ----------------------------------------------------------------------------
void fputfile(const std::wstring& pathname, const std::vector<char>& buffer);
void fputfile(const std::wstring& pathname, const std::wstring&);
void fputfile(const std::wstring& pathname, const std::string&);
// ----------------------------------------------------------------------------
// fgetfile(): load a file as a binary block
// ----------------------------------------------------------------------------
void fgetfile(const std::wstring& pathname, std::vector<char>& buffer);
void fgetfile(FILE* f, std::vector<char>& buffer);
namespace msra { namespace files {
void fgetfilelines(const std::wstring& pathname, std::vector<char>& readbuffer, std::vector<std::string>& lines, int numberOfTries = 1);
static inline std::vector<std::string> fgetfilelines(const std::wstring& pathname)
{
std::vector<char> buffer;
std::vector<std::string> lines;
fgetfilelines(pathname, buffer, lines);
return lines;
}
std::vector<char*> fgetfilelines(const std::wstring& pathname, std::vector<char>& readbuffer, int numberOfTries = 1);
}}
#ifdef _WIN32
// ----------------------------------------------------------------------------
// getfiletime(), setfiletime(): access modification time
// ----------------------------------------------------------------------------
bool getfiletime(const std::wstring& path, FILETIME& time);
void setfiletime(const std::wstring& path, const FILETIME& time);
#endif
// ----------------------------------------------------------------------------
// expand_wildcards() -- expand a path with wildcards (also intermediate ones)
// ----------------------------------------------------------------------------
void expand_wildcards(const std::wstring& path, std::vector<std::wstring>& paths);
// ----------------------------------------------------------------------------
// make_intermediate_dirs() -- make all intermediate dirs on a path
// ----------------------------------------------------------------------------
namespace msra { namespace files {
void make_intermediate_dirs(const std::wstring& filepath);
std::vector<std::wstring> get_all_files_from_directory(const std::wstring& directory);
}}
// ----------------------------------------------------------------------------
// fuptodate() -- test whether an output file is at least as new as an input file
// ----------------------------------------------------------------------------
namespace msra { namespace files {
bool fuptodate(const std::wstring& target, const std::wstring& input, bool inputrequired = true);
};
};
#ifdef _WIN32
// ----------------------------------------------------------------------------
// simple support for WAV file I/O
// ----------------------------------------------------------------------------
typedef struct wavehder
{
char riffchar[4];
unsigned int RiffLength;
char wavechar[8];
unsigned int FmtLength;
signed short wFormatTag;
signed short nChannels;
unsigned int nSamplesPerSec;
unsigned int nAvgBytesPerSec;
signed short nBlockAlign;
signed short wBitsPerSample;
char datachar[4];
unsigned int DataLength;
private:
void prepareRest(int SampleCount);
public:
void prepare(unsigned int Fs, int Bits, int Channels, int SampleCount);
void prepare(const WAVEFORMATEX& wfx, int SampleCount);
unsigned int read(FILE* f, signed short& wRealFormatTag, int& bytesPerSample);
void write(FILE* f);
static void update(FILE* f);
} WAVEHEADER;
// ----------------------------------------------------------------------------
// fgetwfx(), fputwfx(): I/O of wave file headers only
// ----------------------------------------------------------------------------
unsigned int fgetwfx(FILE* f, WAVEFORMATEX& wfx);
void fputwfx(FILE* f, const WAVEFORMATEX& wfx, unsigned int numSamples);
// ----------------------------------------------------------------------------
// fgetraw(): read data of .wav file, and separate data of multiple channels.
// For example, data[i][j]: i is channel index, 0 means the first
// channel. j is sample index.
// ----------------------------------------------------------------------------
void fgetraw(FILE* f, std::vector<std::vector<short>>& data, const WAVEHEADER& wavhd);
#endif
// ----------------------------------------------------------------------------
// auto_file_ptr -- FILE* with auto-close; use auto_file_ptr instead of FILE*.
// Warning: do not pass an auto_file_ptr to a function that calls fclose(),
// except for fclose() itself.
// ----------------------------------------------------------------------------
class auto_file_ptr
{
FILE* f;
FILE* operator=(auto_file_ptr&); // can't ref-count: no assignment
auto_file_ptr(auto_file_ptr&);
void close()
{
if (f && f != stdin && f != stdout && f != stderr)
{
int rc = ::fclose(f);
if ((rc != FCLOSE_SUCCESS) && !std::uncaught_exception())
RuntimeError("auto_file_ptr: failed to close file: %s", strerror(errno));
f = NULL;
}
}
#pragma warning(push)
#pragma warning(disable : 4996)
void openfailed(const std::string& path)
{
Microsoft::MSR::CNTK::RuntimeError("auto_file_ptr: error opening file '%s': %s", path.c_str(), strerror(errno));
}
#pragma warning(pop)
protected:
friend int fclose(auto_file_ptr&); // explicit close (note: may fail)
int fclose()
{
int rc = ::fclose(f);
if (rc == 0)
f = NULL;
return rc;
}
public:
auto_file_ptr()
: f(NULL)
{
}
~auto_file_ptr()
{
close();
}
#pragma warning(push)
#pragma warning(disable : 4996)
auto_file_ptr(const char* path, const char* mode)
{
f = fopen(path, mode);
if (f == NULL)
openfailed(path);
}
auto_file_ptr(const wchar_t* wpath, const char* mode)
{
f = _wfopen(wpath, msra::strfun::utf16(mode).c_str());
if (f == NULL)
openfailed(msra::strfun::utf8(wpath));
}
#pragma warning(pop)
FILE* operator=(FILE* other)
{
close();
f = other;
return f;
}
auto_file_ptr(FILE* other)
: f(other)
{
}
operator FILE*() const
{
return f;
}
FILE* operator->() const
{
return f;
}
void swap(auto_file_ptr& other) throw()
{
std::swap(f, other.f);
}
};
inline int fclose(auto_file_ptr& af)
{
return af.fclose();
}
namespace msra { namespace files {
// ----------------------------------------------------------------------------
// textreader -- simple reader for text files --we need this all the time!
// Currently reads 8-bit files, but can return as wstring, in which case
// they are interpreted as UTF-8 (without BOM).
// Note: Not suitable for pipes or typed input due to readahead (fixable if needed).
// ----------------------------------------------------------------------------
class textreader
{
auto_file_ptr f;
std::vector<char> buf; // read buffer (will only grow, never shrink)
int ch; // next character (we need to read ahead by one...)
char getch()
{
char prevch = (char) ch;
ch = fgetc(f);
return prevch;
}
public:
textreader(const std::wstring& path)
: f(path.c_str(), "rb")
{
buf.reserve(10000);
ch = fgetc(f);
}
operator bool() const
{
return ch != EOF;
} // true if still a line to read
std::string getline() // get and consume the next line
{
if (ch == EOF)
LogicError("textreader: attempted to read beyond EOF");
assert(buf.empty());
// get all line's characters --we recognize UNIX (LF), DOS (CRLF), and Mac (CR) convention
while (ch != EOF && ch != '\n' && ch != '\r')
buf.push_back(getch());
if (ch != EOF && getch() == '\r' && ch == '\n')
getch(); // consume EOLN char
std::string line(buf.begin(), buf.end());
buf.clear();
return line;
}
std::wstring wgetline()
{
return msra::strfun::utf16(getline());
}
};
}
}
// ----------------------------------------------------------------------------
// temp functions -- clean these up
// ----------------------------------------------------------------------------
// split a pathname into directory and filename
static inline void splitpath(const std::wstring& path, std::wstring& dir, std::wstring& file)
{
size_t pos = path.find_last_of(L"\\:/"); // DOS drives, UNIX, Windows
if (pos == path.npos) // no directory found
{
dir.clear();
file = path;
}
else
{
dir = path.substr(0, pos);
file = path.substr(pos + 1);
}
}
// test if a pathname is a relative path
// A relative path is one that can be appended to a directory.
// Drive-relative paths, such as D:file, are considered non-relative.
static inline bool relpath(const wchar_t* path)
{ // this is a wild collection of pathname conventions in Windows
if (path[0] == '/' || path[0] == '\\') // e.g. \WINDOWS
return false;
if (path[0] && path[1] == ':') // drive syntax
return false;
// ... TODO: handle long NT paths
return true; // all others
}
template <class Char>
static inline bool relpath(const std::basic_string<Char>& s)
{
return relpath(s.c_str());
}
// trim from start
template<class String>
static inline String& ltrim(String& s)
{
s.erase(s.begin(), std::find_if(s.begin(), s.end(), [](typename String::value_type c){ return !iscspace(c); }));
return s;
}
// trim from end
template<class String>
static inline String& rtrim(String& s)
{
s.erase(std::find_if(s.rbegin(), s.rend(), [](typename String::value_type c){ return !iscspace(c); }).base(), s.end());
return s;
}
// trim from both ends
template<class String>
static inline String& trim(String& s)
{
return ltrim(rtrim(s));
}
template<class String>
std::vector<String> SplitString(const String& str, const String& sep);
template<class String, class Char>
std::vector<String> SplitString(const String& str, const Char* sep) { return SplitString(str, String(sep)); }
std::wstring s2ws(const std::string& str);
std::string ws2s(const std::wstring& wstr);
/* guoye: start */
#include "../fileutil.cpp"
/* guoye: end */
#endif // _FILEUTIL_

Разница между файлами не показана из-за своего большого размера Загрузить разницу

33846
k Normal file

Разница между файлами не показана из-за своего большого размера Загрузить разницу

1
readme_guoye.txt Normal file
Просмотреть файл

@ -0,0 +1 @@
the corresponding build is 78746 and 79012