Fixing some synchronization of block momentum and distributed checkpointing

This commit is contained in:
Eldar Akchurin 2016-11-07 13:56:34 +01:00
Родитель 1a44dac712
Коммит 1be9e30e3a
24 изменённых файлов: 647 добавлений и 401 удалений

Просмотреть файл

@ -418,6 +418,7 @@ CNTKLIBRARY_COMMON_SRC =\
$(SOURCEDIR)/CNTKv2LibraryDll/Learner.cpp \
$(SOURCEDIR)/CNTKv2LibraryDll/Serialization.cpp \
$(SOURCEDIR)/CNTKv2LibraryDll/DistributedCommunicator.cpp \
$(SOURCEDIR)/CNTKv2LibraryDll/DistributedTrainerBase.cpp \
$(SOURCEDIR)/CNTKv2LibraryDll/DataParallelDistributedTrainer.cpp \
$(SOURCEDIR)/CNTKv2LibraryDll/proto/CNTK.pb.cc \
@ -492,6 +493,7 @@ $(CNTKLIBRARY_TESTS): $(CNTKLIBRARY_TESTS_OBJ) | $(CNTKLIBRARY_LIB)
CNTKLIBRARY_DISTRIBUTION_TESTS_SRC =\
$(CNTKLIBRARY_TESTS_SRC_PATH)/Common.cpp \
Tests/UnitTests/V2LibraryDistributionTests/Main.cpp \
Tests/UnitTests/V2LibraryDistributionTests/FrameModeTests.cpp \
CNTKLIBRARY_DISTRIBUTION_TESTS:=$(BINDIR)/v2librarydistributiontests
CNTKLIBRARY_DISTRIBUTION_TESTS_OBJ := $(patsubst %.cu, $(OBJDIR)/%.o, $(patsubst %.cpp, $(OBJDIR)/%.o, $(CNTKLIBRARY_DISTRIBUTION_TESTS_SRC)))

@ -1 +1 @@
Subproject commit 6535b08760744c890a88e4c934352ae7fb6b6e30
Subproject commit 8f4337ebe14798c9c27b6f1e984435b6d7dd124c

Просмотреть файл

@ -3365,6 +3365,8 @@ namespace CNTK
const std::vector<LearnerPtr>& ParameterLearners() const { return m_parameterLearners; }
private:
void Save(const std::wstring& modelFilePath, bool usingLegacyModelFormat, const Dictionary& state);
FunctionPtr m_combinedTrainingFunction;
FunctionPtr m_model;
FunctionPtr m_lossFunction;
@ -3599,12 +3601,21 @@ namespace CNTK
CNTK_API virtual DistributedCommunicatorPtr SubGroup(const std::unordered_set<DistributedWorkerDescriptor>& subGroupWorkers) const = 0;
// A collective communication API to concatenate values across each worker of this communicator. The concatenated values are only sent to the specified workers; for all others the returned Values are null
// TODO: Add an async variant of the Concatenate method
CNTK_API virtual void Concatenate(
const std::vector<ValuePtr>& values,
std::vector<ValuePtr>& outputValues,
const std::unordered_set<DistributedWorkerDescriptor>& sendToWorkers) = 0;
CNTK_API virtual void Concatenate(
const std::vector<NDArrayViewPtr>& input,
std::vector<NDArrayViewPtr>& output,
const std::unordered_set<DistributedWorkerDescriptor>& sendToWorkers) = 0;
CNTK_API virtual void Gather(
const Dictionary& input,
std::vector<DictionaryPtr>& output,
const std::unordered_set<DistributedWorkerDescriptor>& sendToWorkers) = 0;
// A collective communication API to aggregate values across each worker of this communicator.
// The aggregated values are only sent to the specified workers; for all others the returned Values are null
CNTK_API virtual void AggregateInPlace(
@ -3668,6 +3679,7 @@ namespace CNTK
/// A collection of additional information needed for the distributed trainer to aggregate the gradients
struct MinibatchInfo
{
bool atEndOfData;
size_t numberOfSamples;
NDArrayViewPtr trainingLossValue;
NDArrayViewPtr evalCriterionValue;
@ -3682,14 +3694,15 @@ namespace CNTK
// Optional override that gets called before each minibatch during training
CNTK_API virtual void PreMinibatchCallback(const Trainer& trainer) = 0;
// Optional override that gets called per minibatch after finishing gradient computation but before updating model parameters
CNTK_API virtual void PreParameterUpdateCallback(const Trainer& trainer, std::vector<std::pair<Parameter, NDArrayViewPtr>>& gradientValues, MinibatchInfo& info) = 0;
// Optional override that gets called per minibatch after finishing gradient computation but before updating model parameters.
CNTK_API virtual bool PreParameterUpdateCallback(const Trainer& trainer, std::vector<std::pair<Parameter, NDArrayViewPtr>>& gradientValues, MinibatchInfo& info) = 0;
// Optionally overridable method to get checkpoint state associated with this Distributed train method
CNTK_API virtual Dictionary GetCheckpointState() const = 0;
CNTK_API virtual Dictionary CreateCheckpoint(const Trainer& trainer, const Dictionary& localStateToShare) = 0;
// Optionally overridable method to restore state pertaining this distributed training method from a previous checkpoint
CNTK_API virtual void RestoreFromCheckpoint(const Dictionary& checkpoint) = 0;
// Returns local state that corresponds to this worker.
CNTK_API virtual Dictionary RestoreFromCheckpoint(const Dictionary& checkpoint) = 0;
// Return the distributed communicator used in the distributed trainer
CNTK_API virtual DistributedCommunicatorPtr GetCommunicator() = 0;

Просмотреть файл

@ -186,6 +186,7 @@ namespace CNTK
typedef std::shared_ptr<Learner> LearnerPtr;
class Dictionary;
typedef std::shared_ptr<Dictionary> DictionaryPtr;
class MinibatchSource;
typedef std::shared_ptr<MinibatchSource> MinibatchSourcePtr;

Просмотреть файл

@ -136,6 +136,7 @@
<ClInclude Include="API\CNTKLibraryInternals.h" />
<ClInclude Include="DataParallelDistributedTrainer.h" />
<ClInclude Include="DistributedCommunicator.h" />
<ClInclude Include="DistributedTrainerBase.h" />
<ClInclude Include="Function.h" />
<ClInclude Include="Learner.h" />
<ClInclude Include="MinibatchSource.h" />
@ -152,6 +153,7 @@
<ClCompile Include="ComputeInputStatistics.cpp" />
<ClCompile Include="DataParallelDistributedTrainer.cpp" />
<ClCompile Include="DistributedCommunicator.cpp" />
<ClCompile Include="DistributedTrainerBase.cpp" />
<ClCompile Include="dllmain.cpp">
<CompileAsManaged>false</CompileAsManaged>
<PrecompiledHeader>

Просмотреть файл

@ -21,6 +21,7 @@
</ClCompile>
<ClCompile Include="DistributedCommunicator.cpp" />
<ClCompile Include="DataParallelDistributedTrainer.cpp" />
<ClCompile Include="DistributedTrainerBase.cpp" />
</ItemGroup>
<ItemGroup>
<ClInclude Include="stdafx.h" />
@ -43,6 +44,7 @@
<ClInclude Include="PrimitiveOpType.h" />
<ClInclude Include="DistributedCommunicator.h" />
<ClInclude Include="DataParallelDistributedTrainer.h" />
<ClInclude Include="DistributedTrainerBase.h" />
</ItemGroup>
<ItemGroup>
<Filter Include="API">
@ -58,4 +60,4 @@
<Filter>proto</Filter>
</None>
</ItemGroup>
</Project>
</Project>

Просмотреть файл

@ -97,15 +97,14 @@ namespace CNTK
}
DataParallelDistributedTrainer::DataParallelDistributedTrainer(DistributedCommunicatorPtr communicator, bool useAsyncBufferedParameterUpdate)
: m_communicator(communicator),
m_useAsyncBufferedParameterUpdate(useAsyncBufferedParameterUpdate)
: DistributedTrainerBase(communicator)
{
if (useAsyncBufferedParameterUpdate)
LogicError("Asynchronous parameter update is not yet supported.");
}
// Optional override that gets called per minibatch after finishing gradient computation but before updating model parameters
void DataParallelDistributedTrainer::PreParameterUpdateCallback(const Trainer& /*trainer*/, std::vector<std::pair<Parameter, NDArrayViewPtr>>& gradientValues, MinibatchInfo& info)
bool DataParallelDistributedTrainer::PreParameterUpdateCallback(const Trainer& /*trainer*/, std::vector<std::pair<Parameter, NDArrayViewPtr>>& gradientValues, MinibatchInfo& info)
{
std::vector<NDArrayViewPtr> valuesToAggregate;
for (const auto& i : gradientValues)
@ -118,24 +117,7 @@ namespace CNTK
m_communicator->AggregateInPlace(valuesToAggregate, m_communicator->Workers());
info.numberOfSamples = static_cast<size_t>(*valuesToAggregate.back()->DataBuffer<double>());
}
// Optional override that gets called before each minbatch during training
void DataParallelDistributedTrainer::PreMinibatchCallback(const Trainer& /*trainer*/)
{
}
// Optionally overridable method to get checkpoint state associated with this Distributed train method
Dictionary DataParallelDistributedTrainer::GetCheckpointState() const
{
// Currently we do not safe the state of the distributed trainer.
return Dictionary();
}
// Optionally overridable method to restore state pertaining this distributed training method from a previous checkpoint
void DataParallelDistributedTrainer::RestoreFromCheckpoint(const Dictionary& /*checkpoint*/)
{
// Currently we do not safe the state of the distributed trainer.
info.numberOfSamples = static_cast<size_t>(*valuesToAggregate.back()->WritableDataBuffer<double>());
return info.numberOfSamples == 0;
}
}

Просмотреть файл

@ -6,36 +6,19 @@
#pragma once
#include "CNTKLibrary.h"
#include "DistributedTrainerBase.h"
namespace CNTK
{
///
/// Distributed Trainer.
///
class DataParallelDistributedTrainer : public DistributedTrainer
class DataParallelDistributedTrainer : public DistributedTrainerBase
{
public:
DataParallelDistributedTrainer(DistributedCommunicatorPtr communicator, bool useAsyncBufferedParameterUpdate);
// Optional override that gets called before each minbatch during training
void PreMinibatchCallback(const Trainer& trainer) override;
// Optional override that gets called per minibatch after finishing gradient computation but before updating model parameters
void PreParameterUpdateCallback(const Trainer& trainer, std::vector<std::pair<Parameter, NDArrayViewPtr>>& gradientValues, MinibatchInfo& info) override;
// Optionally overridable method to get checkpoint state associated with this Distributed train method
Dictionary GetCheckpointState() const override;
// Optionally overridable method to restore state pertaining this distributed training method from a previous checkpoint
void RestoreFromCheckpoint(const Dictionary& checkpoint) override;
private:
DistributedCommunicatorPtr GetCommunicator() override
{
return m_communicator;
}
DistributedCommunicatorPtr m_communicator;
bool m_useAsyncBufferedParameterUpdate;
bool PreParameterUpdateCallback(const Trainer& trainer, std::vector<std::pair<Parameter, NDArrayViewPtr>>& gradientValues, MinibatchInfo& info) override;
};
}

Просмотреть файл

@ -12,6 +12,7 @@
#include "CUDAPageLockedMemAllocator.h"
#include "MatrixQuantizerImpl.h"
#include "GPUDataTransferer.h"
#include <numeric>
using namespace Microsoft::MSR::CNTK;
@ -172,6 +173,89 @@ namespace CNTK
NOT_IMPLEMENTED;
}
void MPICommunicatorImpl::Gather(
const Dictionary& input,
std::vector<std::shared_ptr<Dictionary>>& output,
const std::unordered_set<DistributedWorkerDescriptor>& sendToWorkers)
{
CheckWorkers(sendToWorkers);
std::stringstream dict;
dict << input;
std::string encoded = dict.str();
// Exchange sizes.
int encodedSizeInBytes = (int)encoded.size();
std::vector<int> othersSize;
othersSize.resize(m_mpi->NumNodesInUse());
m_mpi->Gather(&encodedSizeInBytes, 1, &othersSize[0], 1, 0);
output.resize(m_mpi->NumNodesInUse());
int totalSizeInBytes = std::accumulate(othersSize.begin(), othersSize.end(), 0);
// Exchange actual data.
std::vector<char> gathered;
gathered.resize(totalSizeInBytes);
std::vector<int> offsets;
offsets.resize(m_mpi->NumNodesInUse());
m_mpi->Gatherv(&encoded[0], encoded.size(), &gathered[0], &othersSize[0], &offsets[0], 0);
offsets.push_back(totalSizeInBytes);
output.resize(m_workers.size());
for (size_t i = 0; i < offsets.size() - 1; ++i)
{
size_t startOffset = offsets[i];
size_t size = offsets[i + 1] - startOffset;
std::stringstream ss;
ss.write(&gathered[startOffset], size);
output[i] = std::make_shared<Dictionary>();
ss >> *output[i];
}
}
void MPICommunicatorImpl::Concatenate(const std::vector<NDArrayViewPtr>& input, std::vector<NDArrayViewPtr>& output, const std::unordered_set<DistributedWorkerDescriptor>& workers)
{
// TODO: Currently we only support concatenation of inputs of the same size.
CheckWorkers(workers);
// Check inputs, currently we support only CPU
auto nonCpu = std::find_if(input.begin(), input.end(), [](const NDArrayViewPtr& v) { return v->Device() != DeviceDescriptor::CPUDevice(); });
if (nonCpu != input.end())
LogicError("Currently only CPU located buffers are supported for concatenation.");
output.resize(input.size());
// Currently we only support concatenation of input of the same size.
// Gathering blocks sequentially.
for (size_t i = 0; i < input.size(); ++i)
{
if (output[i] == nullptr ||
output[i]->Shape().TotalSize() != m_mpi->NumNodesInUse() * input[i]->Shape().TotalSize() ||
output[i]->GetDataType() != input[i]->GetDataType())
{
// Allocating flat array for all ranks.
output[i] = std::make_shared<NDArrayView>(input[i]->GetDataType(), NDShape{ input[i]->Shape().TotalSize() * m_mpi->NumNodesInUse() }, DeviceDescriptor::CPUDevice());
}
}
// Initiate concatenation.
std::vector<MPI_Request> allReduceRequests(input.size());
for (size_t i = 0; i < input.size(); ++i)
{
auto& in = input[i];
auto& out = output[i];
if (input[i]->GetDataType() == DataType::Float)
m_mpi->AllGatherAsync(in->DataBuffer<float>(), in->Shape().TotalSize(), out->WritableDataBuffer<float>(), in->Shape().TotalSize(), &allReduceRequests[i]);
else if (input[i]->GetDataType() == DataType::Double)
m_mpi->AllGatherAsync(in->DataBuffer<double>(), in->Shape().TotalSize(), out->WritableDataBuffer<double>(), in->Shape().TotalSize(), &allReduceRequests[i]);
else
LogicError("Type is not supported.");
}
// Wait till all requests are finished.
m_mpi->WaitAll(allReduceRequests);
}
void MPICommunicatorImpl::AggregateInPlace(
const std::vector<NDArrayViewPtr>& values,
const std::unordered_set<DistributedWorkerDescriptor>& sendToWorkers)

Просмотреть файл

@ -30,15 +30,25 @@ namespace CNTK
virtual DistributedCommunicatorPtr SubGroup(const std::unordered_set<DistributedWorkerDescriptor>& subGroupWorkers) const override;
// A collective communication API to concatenate values across each worker of this communicator. The concatenated values are only sent to the specified workers; for all others the returned Values are null
// TODO: Add an async variant of the Concatenate method
virtual void Concatenate(
const std::vector<ValuePtr>& values,
std::vector<ValuePtr>& outValues,
const std::unordered_set<DistributedWorkerDescriptor>& sendToWorkers) override;
virtual void Concatenate(
const std::vector<NDArrayViewPtr>& input,
std::vector<NDArrayViewPtr>& output, const
std::unordered_set<DistributedWorkerDescriptor>& sendToWorkers) override;
virtual void Gather(
const Dictionary& input,
std::vector<DictionaryPtr>& output,
const std::unordered_set<DistributedWorkerDescriptor>& sendToWorkers) override;
// A collective communication API to aggregate values across each worker of this communicator. The aggregated values are only sent to the specified workers; for all others the returned Values are null
virtual void AggregateInPlace(const std::vector<NDArrayViewPtr>& values,
const std::unordered_set<DistributedWorkerDescriptor>& sendToWorkers) override;
virtual void AggregateInPlace(
const std::vector<NDArrayViewPtr>& values,
const std::unordered_set<DistributedWorkerDescriptor>& sendToWorkers) override;
virtual void Aggregate(
const std::vector<NDArrayViewPtr>& inValues,

Просмотреть файл

@ -0,0 +1,50 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE.md file in the project root for full license information.
//
#include "stdafx.h"
#include "DistributedTrainerBase.h"
#include "DistributedCommunicator.h"
namespace CNTK
{
DistributedTrainerBase::DistributedTrainerBase(DistributedCommunicatorPtr communicator)
: m_communicator(communicator)
{
}
// Optional override that gets called before each minbatch during training
void DistributedTrainerBase::PreMinibatchCallback(const Trainer& /*trainer*/)
{
}
// Get checkpoint state associated with distributed trainer
Dictionary DistributedTrainerBase::CreateCheckpoint(const Trainer&, const Dictionary& localStateToShare)
{
std::vector<DictionaryPtr> remoteState;
m_communicator->Gather(localStateToShare, remoteState, m_communicator->Workers());
Dictionary result;
for (size_t i = 0; i < m_communicator->Workers().size(); ++i)
{
result[std::to_wstring(i)] = *remoteState[i];
}
return result;
}
// Restores the state associated with distributed trainer
Dictionary DistributedTrainerBase::RestoreFromCheckpoint(const Dictionary& checkpoint)
{
auto key = std::to_wstring(m_communicator->CurrentWorker().m_globalRank);
if (checkpoint.Contains(key))
return checkpoint[key].Value<Dictionary>();
// Return 0 rank if possible.
key = std::to_wstring(0);
if (!checkpoint.Contains(key))
RuntimeError("Cannot restore from the checkpoint, 0 rank is missing.");
return checkpoint[key].Value<Dictionary>();
}
}

Просмотреть файл

@ -0,0 +1,38 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE.md file in the project root for full license information.
//
#pragma once
#include "CNTKLibrary.h"
namespace CNTK
{
///
/// Base class for distributed trainers.
/// TODO: will be switched to Distributed Learner soon.
///
class DistributedTrainerBase : public DistributedTrainer
{
public:
// Callback that gets called before each minbatch during training
void PreMinibatchCallback(const Trainer& trainer) override;
// Gets checkpoint state associated with this distributed trainer
Dictionary CreateCheckpoint(const Trainer& trainer, const Dictionary& localStateToShare) override;
// Restores the trainer from the state.
Dictionary RestoreFromCheckpoint(const Dictionary& checkpoint) override;
DistributedCommunicatorPtr GetCommunicator() override
{
return m_communicator;
}
protected:
explicit DistributedTrainerBase(DistributedCommunicatorPtr communicator);
DistributedCommunicatorPtr m_communicator;
};
}

Просмотреть файл

@ -2380,7 +2380,7 @@ namespace CNTK
{
auto& matrix = getGradient ? computationNode->As<ComputationNode<float>>()->Gradient() : computationNode->As<ComputationNode<float>>()->Value();
if (varValue == nullptr)
nodeValue = MakeSharedObject<PackedValue>(var.Shape(), std::make_shared<Matrix<float>>(matrix.AsReference()), layout, /*readOnly =*/ !getGradient);
nodeValue = MakeSharedObject<PackedValue>(var.Shape(), std::make_shared<Matrix<float>>(matrix.AsReference()), layout, /*readOnly =*/ false);
else
nodeValue = GetValueObjectFromCNTKImplMatrixAndMBLayout<float>(var, matrix, layout);
break;
@ -2389,7 +2389,7 @@ namespace CNTK
{
auto& matrix = getGradient ? computationNode->As<ComputationNode<double>>()->Gradient() : computationNode->As<ComputationNode<double>>()->Value();
if (varValue == nullptr)
nodeValue = MakeSharedObject<PackedValue>(var.Shape(), std::make_shared<Matrix<double>>(matrix.AsReference()), layout, /*readOnly =*/ !getGradient);
nodeValue = MakeSharedObject<PackedValue>(var.Shape(), std::make_shared<Matrix<double>>(matrix.AsReference()), layout, /*readOnly =*/ false);
else
nodeValue = GetValueObjectFromCNTKImplMatrixAndMBLayout<double>(var, matrix, layout);
break;

Просмотреть файл

@ -263,7 +263,6 @@ namespace CNTK
{
Dictionary checkpointState;
checkpointState[MinibatchSourcePositionAttributeName] = m_shim->GetCurrentSamplePosition();
return checkpointState;
}

Просмотреть файл

@ -9,6 +9,12 @@
#include "Function.h"
#include "Serialization.h"
namespace
{
const std::wstring learnersPropertyName = L"Learners";
const std::wstring distributedLearnerPropertyName = L"DistributedLearner";
}
namespace CNTK
{
Trainer::Trainer(const FunctionPtr& model, const FunctionPtr& lossFunction, const FunctionPtr& evaluationFunction, const std::vector<LearnerPtr>& parameterLearners, const DistributedTrainerPtr& distributedTrainer)
@ -177,6 +183,7 @@ namespace CNTK
m_prevMinibatchNumSamples = GetSampleCount(m_trainingSampleCountVar, outputs[m_trainingSampleCountVar]);
bool endOfData = m_prevMinibatchNumSamples == 0;
if (m_distributedTrainer)
{
// Aggregation should happen in the same order, the order of parmaters is guaranteed to be the same.
@ -187,12 +194,13 @@ namespace CNTK
MinibatchInfo info
{
arguments.empty(),
m_prevMinibatchNumSamples,
m_prevMinibatchAggregateTrainingLossValue->Data(),
m_prevMinibatchAggregateEvalCriterionValue->Data()
};
m_distributedTrainer->PreParameterUpdateCallback(*this, gradients, info);
endOfData = m_distributedTrainer->PreParameterUpdateCallback(*this, gradients, info);
m_prevMinibatchNumSamples = info.numberOfSamples;
}
@ -212,7 +220,7 @@ namespace CNTK
anyUpdatesPerformed |= learner->Update(learnerParameterGradients, m_prevMinibatchNumSamples);
}
return anyUpdatesPerformed;
return anyUpdatesPerformed && !endOfData;
}
static std::wstring GetTrainerStateCheckpointFilePath(const std::wstring& modelFilePath)
@ -223,43 +231,42 @@ namespace CNTK
void Trainer::SaveCheckpoint(const std::wstring& modelFilePath, bool usinglegacyModelFormat)
{
bool shouldSave = true;
if (m_distributedTrainer != nullptr)
// TODO: Need to pass currect state of the minibatch source here.
if (!m_distributedTrainer)
return Save(modelFilePath, usinglegacyModelFormat, Dictionary());
assert(m_distributedTrainer != nullptr);
// TODO: Make sure checkpoints between distributed and non-distributed case are compatible.
// CreateCheckpoint call synchronizes all workers before the perform the checkpoint.
Dictionary state = m_distributedTrainer->CreateCheckpoint(*this, Dictionary());
if (m_distributedTrainer->GetCommunicator()->CurrentWorker().IsMain())
Save(modelFilePath, usinglegacyModelFormat, state);
// all workers need to sync up after saving model to avoid read-after-write hazard
// i.e. one worker is in the middle of write while another tries to read
m_distributedTrainer->GetCommunicator()->Barrier();
}
void Trainer::Save(const std::wstring& modelFilePath, bool usinglegacyModelFormat, const Dictionary& distributedLearnerState)
{
vector<DictionaryValue> learnerStates;
for (const auto& learner : m_parameterLearners)
{
// all workers need to sync up before saving model to avoid write-after-read hazard
// i.e. one worker is in the middle of reading a checkpoint while another overwrites
m_distributedTrainer->GetCommunicator()->Barrier();
// for distributed training, only save checkpoint at worker 0
shouldSave = m_distributedTrainer->GetCommunicator()->CurrentWorker().IsMain();
}
if (shouldSave)
{
m_combinedTrainingFunction->SaveModel(modelFilePath, usinglegacyModelFormat);
vector<DictionaryValue> learnerStates;
for (const auto& learner : m_parameterLearners)
{
// TODO: add DictionaryValue(T&&)
learnerStates.push_back(DictionaryValue(learner->Serialize()));
}
std::wstring trainerStateCheckpointFilePath = GetTrainerStateCheckpointFilePath(modelFilePath);
auto ckpStream = GetFstream(trainerStateCheckpointFilePath, false);
// TODO: this will create an extra copy of all leaner states,
// add DictionaryValue ctor that takes an rvalue!
*ckpStream << DictionaryValue(learnerStates);
ckpStream->flush();
// TODO: add DictionaryValue(T&&)
learnerStates.push_back(DictionaryValue(learner->Serialize()));
}
if (m_distributedTrainer != nullptr)
{
// all workers need to sync up after saving model to avoid read-after-write hazard
// i.e. one worker is in the middle of write while another tries to read
m_distributedTrainer->GetCommunicator()->Barrier();
}
// add DictionaryValue ctor that takes an rvalue!
Dictionary state;
state[learnersPropertyName] = learnerStates;
state[distributedLearnerPropertyName] = distributedLearnerState;
m_combinedTrainingFunction->SaveModel(modelFilePath, usinglegacyModelFormat);
std::wstring trainerStateCheckpointFilePath = GetTrainerStateCheckpointFilePath(modelFilePath);
auto ckpStream = GetFstream(trainerStateCheckpointFilePath, false);
*ckpStream << state;
ckpStream->flush();
}
void Trainer::RestoreFromCheckpoint(const std::wstring& modelFilePath)
@ -269,10 +276,11 @@ namespace CNTK
std::wstring trainerStateCheckpointFilePath = GetTrainerStateCheckpointFilePath(modelFilePath);
auto ckpStream = GetFstream(trainerStateCheckpointFilePath, true);
DictionaryValue checkpoint;
Dictionary checkpoint;
*ckpStream >> checkpoint;
const vector<DictionaryValue>& learnerStates = checkpoint.Value<vector<DictionaryValue>>();
const DictionaryValue& learners = checkpoint[learnersPropertyName];
const vector<DictionaryValue>& learnerStates = learners.Value<vector<DictionaryValue>>();
if (learnerStates.size() != m_parameterLearners.size())
{
@ -285,6 +293,14 @@ namespace CNTK
{
m_parameterLearners[i]->RestoreFromCheckpoint(learnerStates[i].Value<Dictionary>());
}
// TODO: we should return shared state from this function,
// otherwise how can we be sure the minibatch source is in consistent state?
if (m_distributedTrainer)
{
const DictionaryValue& distributedLearner = checkpoint[distributedLearnerPropertyName];
m_distributedTrainer->RestoreFromCheckpoint(distributedLearner.Value<Dictionary>());
}
}
double Trainer::PreviousMinibatchLossAverage() const

Просмотреть файл

@ -445,37 +445,46 @@ public:
AllReduce<ElemType>(static_cast<ElemType*>(MPI_IN_PLACE), sendData, numElements, op);
}
template <class ElemType>
void AllReduce(ElemType *sendData, ElemType *receiveData, size_t numElements, MPI_Op op = MPI_SUM) const
{
if ((NumNodesInUse() > 1 && (Communicator() != MPI_COMM_NULL)))
{
MPI_Allreduce(sendData, receiveData, (int) numElements, GetDataType(sendData), op, Communicator()) || MpiFail("AllReduce: MPI_Allreduce");
}
}
template <class ElemType>
void AllReduceAsync(ElemType* sendData, size_t numElements, MPI_Request* request, MPI_Op op = MPI_SUM) const
{
AllReduceAsync<ElemType>(static_cast<ElemType*>(MPI_IN_PLACE), sendData, numElements, request, op);
}
template <class ElemType>
void AllGatherAsync(const ElemType *sendData, size_t numSendElements, ElemType *receiveData, size_t numRecvElements, MPI_Request* request) const
{
MPI_Iallgather(sendData, (int)numSendElements, GetDataType(receiveData), receiveData, (int)numRecvElements, GetDataType(receiveData), Communicator(), request) || MpiFail("AllReduceAsync: MPI_Iallgather");
}
template <class ElemType>
void AllReduceAsync(ElemType *sendData, ElemType *receiveData, size_t numElements, MPI_Request* request, MPI_Op op = MPI_SUM) const
{
if ((NumNodesInUse() > 1 && (Communicator() != MPI_COMM_NULL)))
{
MPI_Iallreduce(sendData, receiveData, (int) numElements, GetDataType(sendData), op, Communicator(), request) || MpiFail("AllReduceAsync: MPI_Iallreduce");
}
MPI_Iallreduce(sendData, receiveData, (int)numElements, GetDataType(sendData), op, Communicator(), request) || MpiFail("AllReduceAsync: MPI_Iallreduce");
}
template <class ElemType>
void AllReduce(ElemType *sendData, ElemType *receiveData, size_t numElements, MPI_Op op = MPI_SUM) const
{
MPI_Allreduce(sendData, receiveData, (int)numElements, GetDataType(sendData), op, Communicator()) || MpiFail("AllReduce: MPI_Allreduce");
}
template <class ElemType>
void Gather(const ElemType *sendData, size_t numSendElements, ElemType *receiveData, size_t numRecvElements, size_t rootRank) const
{
MPI_Gather(sendData, (int)numSendElements, GetDataType(receiveData), receiveData, (int)numRecvElements, GetDataType(receiveData), (int)rootRank, Communicator()) || MpiFail("AllReduceAsync: MPI_Gather");
}
template <class ElemType>
void Gatherv(const ElemType *sendData, size_t numSendElements, ElemType *receiveData, int recvCounts[], int offsets[], size_t rootRank) const
{
MPI_Gatherv(sendData, (int)numSendElements, GetDataType(receiveData), receiveData, recvCounts, offsets, GetDataType(receiveData), (int)rootRank, Communicator()) || MpiFail("AllReduceAsync: MPI_Gatherv");
}
template <class ElemType>
void Bcast(ElemType *pData, size_t nData, size_t srcRank)
{
if ((NumNodesInUse() > 1) && (Communicator() != MPI_COMM_NULL))
{
MPI_Bcast(pData, (int) nData, GetDataType(pData), (int) srcRank, Communicator()) || MpiFail("Bcast: MPI_Bcast");
}
MPI_Bcast(pData, (int) nData, GetDataType(pData), (int) srcRank, Communicator()) || MpiFail("Bcast: MPI_Bcast");
}
// wait for an async request to finish
@ -495,8 +504,10 @@ public:
MPI_Barrier(m_currentComm) || MpiFail("waitall: MPI_Barrier");
}
void WaitAll(std::vector<MPI_Request>& requests)
{
MPI_Waitall((int)requests.size(), &requests[0], MPI_STATUSES_IGNORE) || MpiFail("waitall: MPI_Waitall");
}
};
}}}

Просмотреть файл

@ -1,162 +1,174 @@
Minibatch 0: CrossEntropy loss = 0.6954454, Evaluation criterion = 0.52
Minibatch 20: CrossEntropy loss = 0.81229057, Evaluation criterion = 0.44
Minibatch 40: CrossEntropy loss = 0.62668446, Evaluation criterion = 0.44
Minibatch 60: CrossEntropy loss = 0.4867942, Evaluation criterion = 0.08
Minibatch 80: CrossEntropy loss = 0.293085, Evaluation criterion = 0.04
Minibatch 100: CrossEntropy loss = 0.17526035, Evaluation criterion = 0.08
Minibatch 120: CrossEntropy loss = 0.17222725, Evaluation criterion = 0.04
Minibatch 140: CrossEntropy loss = 0.10928572, Evaluation criterion = 0.04
Minibatch 160: CrossEntropy loss = 0.068897753, Evaluation criterion = 0
Minibatch 180: CrossEntropy loss = 0.070378456, Evaluation criterion = 0.04
Minibatch 200: CrossEntropy loss = 0.1981641, Evaluation criterion = 0.16
Minibatch 220: CrossEntropy loss = 0.14033669, Evaluation criterion = 0.08
Minibatch 240: CrossEntropy loss = 0.10872889, Evaluation criterion = 0.04
Minibatch 260: CrossEntropy loss = 0.079692345, Evaluation criterion = 0
Minibatch 280: CrossEntropy loss = 0.50890541, Evaluation criterion = 0.24
Minibatch 300: CrossEntropy loss = 0.16729982, Evaluation criterion = 0.04
Minibatch 320: CrossEntropy loss = 0.096241093, Evaluation criterion = 0.08
Minibatch 340: CrossEntropy loss = 0.24491482, Evaluation criterion = 0.12
Minibatch 360: CrossEntropy loss = 0.16696636, Evaluation criterion = 0.12
Minibatch 380: CrossEntropy loss = 0.083896952, Evaluation criterion = 0.04
Minibatch 400: CrossEntropy loss = 0.1058311, Evaluation criterion = 0.08
Minibatch 420: CrossEntropy loss = 0.1028916, Evaluation criterion = 0.04
Minibatch 440: CrossEntropy loss = 0.16459396, Evaluation criterion = 0.08
Minibatch 460: CrossEntropy loss = 0.14627156, Evaluation criterion = 0.08
Minibatch 480: CrossEntropy loss = 0.10650005, Evaluation criterion = 0.08
Minibatch 500: CrossEntropy loss = 0.1363752, Evaluation criterion = 0.12
Minibatch 520: CrossEntropy loss = 0.086916132, Evaluation criterion = 0.04
Minibatch 540: CrossEntropy loss = 0.13855596, Evaluation criterion = 0.08
Minibatch 560: CrossEntropy loss = 0.088330908, Evaluation criterion = 0.04
Minibatch 580: CrossEntropy loss = 0.21265511, Evaluation criterion = 0.08
Minibatch 600: CrossEntropy loss = 0.17782074, Evaluation criterion = 0.08
Minibatch 620: CrossEntropy loss = 0.090263462, Evaluation criterion = 0.04
Minibatch 640: CrossEntropy loss = 0.25346519, Evaluation criterion = 0.16
Minibatch 660: CrossEntropy loss = 0.17309887, Evaluation criterion = 0.12
Minibatch 680: CrossEntropy loss = 0.11421286, Evaluation criterion = 0.08
Minibatch 700: CrossEntropy loss = 0.090820036, Evaluation criterion = 0.04
Minibatch 720: CrossEntropy loss = 0.1634412, Evaluation criterion = 0.08
Minibatch 740: CrossEntropy loss = 0.13637156, Evaluation criterion = 0.08
Minibatch 760: CrossEntropy loss = 0.031969812, Evaluation criterion = 0
Minibatch 780: CrossEntropy loss = 0.066480079, Evaluation criterion = 0.04
Minibatch 0: CrossEntropy loss = 0.69331757, Evaluation criterion = 0.48
Minibatch 20: CrossEntropy loss = 0.80803825, Evaluation criterion = 0.44
Minibatch 40: CrossEntropy loss = 0.59340229, Evaluation criterion = 0.4
Minibatch 60: CrossEntropy loss = 0.390079, Evaluation criterion = 0.04
Minibatch 80: CrossEntropy loss = 0.24672558, Evaluation criterion = 0.04
Minibatch 100: CrossEntropy loss = 0.15607712, Evaluation criterion = 0.08
Minibatch 120: CrossEntropy loss = 0.16455307, Evaluation criterion = 0.04
Minibatch 140: CrossEntropy loss = 0.10465605, Evaluation criterion = 0.04
Minibatch 160: CrossEntropy loss = 0.066373091, Evaluation criterion = 0
Minibatch 180: CrossEntropy loss = 0.067441063, Evaluation criterion = 0.04
Minibatch 200: CrossEntropy loss = 0.19784435, Evaluation criterion = 0.16
Minibatch 220: CrossEntropy loss = 0.140825, Evaluation criterion = 0.08
Minibatch 240: CrossEntropy loss = 0.10816033, Evaluation criterion = 0.04
Minibatch 260: CrossEntropy loss = 0.078140802, Evaluation criterion = 0
Minibatch 280: CrossEntropy loss = 0.50540421, Evaluation criterion = 0.24
Minibatch 300: CrossEntropy loss = 0.16727772, Evaluation criterion = 0.04
Minibatch 320: CrossEntropy loss = 0.096806526, Evaluation criterion = 0.08
Minibatch 340: CrossEntropy loss = 0.24469637, Evaluation criterion = 0.12
Minibatch 360: CrossEntropy loss = 0.16545345, Evaluation criterion = 0.12
Minibatch 380: CrossEntropy loss = 0.083174438, Evaluation criterion = 0.04
Minibatch 400: CrossEntropy loss = 0.10837061, Evaluation criterion = 0.08
Minibatch 420: CrossEntropy loss = 0.10223916, Evaluation criterion = 0.04
Minibatch 440: CrossEntropy loss = 0.16467476, Evaluation criterion = 0.08
Minibatch 460: CrossEntropy loss = 0.14460797, Evaluation criterion = 0.08
Minibatch 480: CrossEntropy loss = 0.10899806, Evaluation criterion = 0.08
Minibatch 500: CrossEntropy loss = 0.13487343, Evaluation criterion = 0.12
Minibatch 520: CrossEntropy loss = 0.08723834, Evaluation criterion = 0.04
Minibatch 540: CrossEntropy loss = 0.1384371, Evaluation criterion = 0.08
Minibatch 560: CrossEntropy loss = 0.087085314, Evaluation criterion = 0.04
Minibatch 580: CrossEntropy loss = 0.21659525, Evaluation criterion = 0.08
Minibatch 600: CrossEntropy loss = 0.17421137, Evaluation criterion = 0.08
Minibatch 620: CrossEntropy loss = 0.089142971, Evaluation criterion = 0.04
Minibatch 640: CrossEntropy loss = 0.24882771, Evaluation criterion = 0.16
Minibatch 660: CrossEntropy loss = 0.17557541, Evaluation criterion = 0.12
Minibatch 680: CrossEntropy loss = 0.11400983, Evaluation criterion = 0.08
Minibatch 700: CrossEntropy loss = 0.090476017, Evaluation criterion = 0.04
Minibatch 720: CrossEntropy loss = 0.16078716, Evaluation criterion = 0.08
Minibatch 740: CrossEntropy loss = 0.13521654, Evaluation criterion = 0.04
Minibatch 760: CrossEntropy loss = 0.032673693, Evaluation criterion = 0
Minibatch 780: CrossEntropy loss = 0.06836617, Evaluation criterion = 0.04
Minibatch 0: CrossEntropy loss = 0.69867996, Evaluation criterion = 0.52
Minibatch 20: CrossEntropy loss = 0.81037231, Evaluation criterion = 0.44
Minibatch 40: CrossEntropy loss = 0.62135517, Evaluation criterion = 0.44
Minibatch 60: CrossEntropy loss = 0.47872032, Evaluation criterion = 0.16
Minibatch 80: CrossEntropy loss = 0.28000193, Evaluation criterion = 0.04
Minibatch 100: CrossEntropy loss = 0.16768997, Evaluation criterion = 0.04
Minibatch 120: CrossEntropy loss = 0.16949463, Evaluation criterion = 0.04
Minibatch 140: CrossEntropy loss = 0.10876145, Evaluation criterion = 0.04
Minibatch 160: CrossEntropy loss = 0.068323808, Evaluation criterion = 0
Minibatch 180: CrossEntropy loss = 0.069454532, Evaluation criterion = 0.04
Minibatch 200: CrossEntropy loss = 0.19818489, Evaluation criterion = 0.16
Minibatch 220: CrossEntropy loss = 0.14136215, Evaluation criterion = 0.08
Minibatch 240: CrossEntropy loss = 0.10862691, Evaluation criterion = 0.04
Minibatch 260: CrossEntropy loss = 0.078882818, Evaluation criterion = 0
Minibatch 280: CrossEntropy loss = 0.50608982, Evaluation criterion = 0.24
Minibatch 300: CrossEntropy loss = 0.16754644, Evaluation criterion = 0.04
Minibatch 320: CrossEntropy loss = 0.097167368, Evaluation criterion = 0.08
Minibatch 340: CrossEntropy loss = 0.24575237, Evaluation criterion = 0.12
Minibatch 360: CrossEntropy loss = 0.165613, Evaluation criterion = 0.12
Minibatch 380: CrossEntropy loss = 0.083667068, Evaluation criterion = 0.04
Minibatch 400: CrossEntropy loss = 0.10856432, Evaluation criterion = 0.08
Minibatch 420: CrossEntropy loss = 0.10331212, Evaluation criterion = 0.04
Minibatch 440: CrossEntropy loss = 0.16514906, Evaluation criterion = 0.08
Minibatch 460: CrossEntropy loss = 0.14375429, Evaluation criterion = 0.08
Minibatch 480: CrossEntropy loss = 0.11091783, Evaluation criterion = 0.08
Minibatch 500: CrossEntropy loss = 0.13466415, Evaluation criterion = 0.12
Minibatch 520: CrossEntropy loss = 0.08790472, Evaluation criterion = 0.04
Minibatch 540: CrossEntropy loss = 0.13872595, Evaluation criterion = 0.08
Minibatch 560: CrossEntropy loss = 0.08639514, Evaluation criterion = 0.04
Minibatch 580: CrossEntropy loss = 0.21813036, Evaluation criterion = 0.08
Minibatch 600: CrossEntropy loss = 0.1733963, Evaluation criterion = 0.08
Minibatch 620: CrossEntropy loss = 0.089930305, Evaluation criterion = 0.04
Minibatch 640: CrossEntropy loss = 0.2478857, Evaluation criterion = 0.16
Minibatch 660: CrossEntropy loss = 0.17713673, Evaluation criterion = 0.12
Minibatch 680: CrossEntropy loss = 0.11394748, Evaluation criterion = 0.04
Minibatch 700: CrossEntropy loss = 0.090961437, Evaluation criterion = 0.04
Minibatch 720: CrossEntropy loss = 0.15971701, Evaluation criterion = 0.08
Minibatch 740: CrossEntropy loss = 0.13449444, Evaluation criterion = 0.04
Minibatch 760: CrossEntropy loss = 0.032643244, Evaluation criterion = 0
Minibatch 780: CrossEntropy loss = 0.069212947, Evaluation criterion = 0.04
Minibatch 0: CrossEntropy loss = 0.69272743, Evaluation criterion = 0.48
Minibatch 20: CrossEntropy loss = 0.75411827, Evaluation criterion = 0.44
Minibatch 40: CrossEntropy loss = 0.60825462, Evaluation criterion = 0.44
Minibatch 60: CrossEntropy loss = 0.43199036, Evaluation criterion = 0.08
Minibatch 80: CrossEntropy loss = 0.26699848, Evaluation criterion = 0.04
Minibatch 100: CrossEntropy loss = 0.16455112, Evaluation criterion = 0.08
Minibatch 120: CrossEntropy loss = 0.16799458, Evaluation criterion = 0.04
Minibatch 140: CrossEntropy loss = 0.10715288, Evaluation criterion = 0.04
Minibatch 160: CrossEntropy loss = 0.067906699, Evaluation criterion = 0
Minibatch 180: CrossEntropy loss = 0.068846769, Evaluation criterion = 0.04
Minibatch 200: CrossEntropy loss = 0.19838778, Evaluation criterion = 0.16
Minibatch 220: CrossEntropy loss = 0.14056544, Evaluation criterion = 0.08
Minibatch 240: CrossEntropy loss = 0.10901067, Evaluation criterion = 0.04
Minibatch 260: CrossEntropy loss = 0.078888097, Evaluation criterion = 0
Minibatch 280: CrossEntropy loss = 0.5081443, Evaluation criterion = 0.24
Minibatch 300: CrossEntropy loss = 0.16783852, Evaluation criterion = 0.04
Minibatch 320: CrossEntropy loss = 0.097371922, Evaluation criterion = 0.08
Minibatch 340: CrossEntropy loss = 0.24453039, Evaluation criterion = 0.12
Minibatch 360: CrossEntropy loss = 0.16672588, Evaluation criterion = 0.12
Minibatch 380: CrossEntropy loss = 0.083697166, Evaluation criterion = 0.04
Minibatch 400: CrossEntropy loss = 0.10680311, Evaluation criterion = 0.08
Minibatch 420: CrossEntropy loss = 0.10154376, Evaluation criterion = 0.04
Minibatch 440: CrossEntropy loss = 0.1639748, Evaluation criterion = 0.08
Minibatch 460: CrossEntropy loss = 0.14593479, Evaluation criterion = 0.08
Minibatch 480: CrossEntropy loss = 0.1082506, Evaluation criterion = 0.08
Minibatch 500: CrossEntropy loss = 0.13558685, Evaluation criterion = 0.12
Minibatch 520: CrossEntropy loss = 0.086868982, Evaluation criterion = 0.04
Minibatch 540: CrossEntropy loss = 0.13893658, Evaluation criterion = 0.08
Minibatch 560: CrossEntropy loss = 0.08850028, Evaluation criterion = 0.04
Minibatch 580: CrossEntropy loss = 0.21615963, Evaluation criterion = 0.08
Minibatch 600: CrossEntropy loss = 0.17612524, Evaluation criterion = 0.08
Minibatch 620: CrossEntropy loss = 0.088544693, Evaluation criterion = 0.04
Minibatch 640: CrossEntropy loss = 0.25012495, Evaluation criterion = 0.16
Minibatch 660: CrossEntropy loss = 0.17481609, Evaluation criterion = 0.12
Minibatch 680: CrossEntropy loss = 0.1136517, Evaluation criterion = 0.08
Minibatch 700: CrossEntropy loss = 0.090395937, Evaluation criterion = 0.04
Minibatch 720: CrossEntropy loss = 0.1612656, Evaluation criterion = 0.08
Minibatch 740: CrossEntropy loss = 0.13536187, Evaluation criterion = 0.04
Minibatch 760: CrossEntropy loss = 0.032082968, Evaluation criterion = 0
Minibatch 780: CrossEntropy loss = 0.067839103, Evaluation criterion = 0.04
CNTKv2LibraryDistribution tests: Passed
CPU info:
CPU Model Name: Intel(R) Xeon(R) CPU E5-2620 v3 @ 2.40GHz
Hardware threads: 1
Total Memory: 33476764 kB
-------------------------------------------------------------------
/tmp/cntk-test-20161110170715.224686/CNTKv2LibraryDistribution_UnitTests@debug_gpu/TestData /cygdrive/c/repo/cntk_github6/CNTK/Tests/EndToEndTests/CNTKv2LibraryDistribution/UnitTests
=== Running C:\Program Files\Microsoft MPI\Bin\/mpiexec.exe -n 2 -l C:\repo\cntk_github6\CNTK\x64\debug\V2LibraryDistributionTests.exe F:\cygwin64\tmp\cntk-test-20161110170715.224686\CNTKv2LibraryDistribution_UnitTests@debug_gpu/v2library.log
[0]requestnodes [MPIWrapper]: using 2 out of 2 MPI nodes on a single host (2 requested); we (0) are in (participating)
[1]requestnodes [MPIWrapper]: using 2 out of 2 MPI nodes on a single host (2 requested); we (1) are in (participating)
MPI Rank 0: Minibatch 0: CrossEntropy loss = 0.69241676, Evaluation criterion = 0.48
MPI Rank 0: Minibatch 20: CrossEntropy loss = 0.69337639, Evaluation criterion = 0.44
MPI Rank 0: Minibatch 40: CrossEntropy loss = 0.60980656, Evaluation criterion = 0.44
MPI Rank 0: Minibatch 60: CrossEntropy loss = 0.28104153, Evaluation criterion = 0.04
MPI Rank 0: Minibatch 80: CrossEntropy loss = 0.17967899, Evaluation criterion = 0.04
MPI Rank 0: Minibatch 100: CrossEntropy loss = 0.10600471, Evaluation criterion = 0.04
MPI Rank 0: Minibatch 120: CrossEntropy loss = 0.1360253, Evaluation criterion = 0.04
MPI Rank 0: Minibatch 140: CrossEntropy loss = 0.091501751, Evaluation criterion = 0.04
MPI Rank 0: Minibatch 160: CrossEntropy loss = 0.056065564, Evaluation criterion = 0
MPI Rank 0: Minibatch 180: CrossEntropy loss = 0.065461845, Evaluation criterion = 0.04
MPI Rank 0: Minibatch 200: CrossEntropy loss = 0.20466948, Evaluation criterion = 0.16
MPI Rank 0: Minibatch 220: CrossEntropy loss = 0.14153589, Evaluation criterion = 0.08
MPI Rank 0: Minibatch 240: CrossEntropy loss = 0.11171801, Evaluation criterion = 0.04
MPI Rank 0: Minibatch 260: CrossEntropy loss = 0.082311573, Evaluation criterion = 0
MPI Rank 0: Minibatch 280: CrossEntropy loss = 0.51050812, Evaluation criterion = 0.24
MPI Rank 0: Minibatch 300: CrossEntropy loss = 0.16595322, Evaluation criterion = 0.04
MPI Rank 0: Minibatch 320: CrossEntropy loss = 0.097113705, Evaluation criterion = 0.08
MPI Rank 0: Minibatch 340: CrossEntropy loss = 0.23648327, Evaluation criterion = 0.12
MPI Rank 0: Minibatch 360: CrossEntropy loss = 0.16291708, Evaluation criterion = 0.12
MPI Rank 0: Minibatch 380: CrossEntropy loss = 0.078984389, Evaluation criterion = 0.04
MPI Rank 0: Minibatch 400: CrossEntropy loss = 0.12134525, Evaluation criterion = 0.08
MPI Rank 0: Minibatch 420: CrossEntropy loss = 0.098799028, Evaluation criterion = 0.04
MPI Rank 0: Minibatch 440: CrossEntropy loss = 0.16222105, Evaluation criterion = 0.08
MPI Rank 0: Minibatch 460: CrossEntropy loss = 0.14262832, Evaluation criterion = 0.08
MPI Rank 0: Minibatch 480: CrossEntropy loss = 0.10843548, Evaluation criterion = 0.08
MPI Rank 0: Minibatch 500: CrossEntropy loss = 0.13464042, Evaluation criterion = 0.12
MPI Rank 0: Minibatch 520: CrossEntropy loss = 0.081623363, Evaluation criterion = 0.04
MPI Rank 0: Minibatch 540: CrossEntropy loss = 0.1375001, Evaluation criterion = 0.08
MPI Rank 0: Minibatch 560: CrossEntropy loss = 0.085772104, Evaluation criterion = 0.04
MPI Rank 0: Minibatch 580: CrossEntropy loss = 0.20834332, Evaluation criterion = 0.08
MPI Rank 0: Minibatch 600: CrossEntropy loss = 0.16688332, Evaluation criterion = 0.08
MPI Rank 0: Minibatch 620: CrossEntropy loss = 0.084589157, Evaluation criterion = 0.04
MPI Rank 0: Minibatch 640: CrossEntropy loss = 0.23415596, Evaluation criterion = 0.12
MPI Rank 0: Minibatch 660: CrossEntropy loss = 0.17398777, Evaluation criterion = 0.12
MPI Rank 0: Minibatch 680: CrossEntropy loss = 0.11343967, Evaluation criterion = 0.08
MPI Rank 0: Minibatch 700: CrossEntropy loss = 0.090730791, Evaluation criterion = 0.04
MPI Rank 0: Minibatch 720: CrossEntropy loss = 0.15637251, Evaluation criterion = 0.08
MPI Rank 0: Minibatch 740: CrossEntropy loss = 0.13192295, Evaluation criterion = 0.08
MPI Rank 0: Minibatch 760: CrossEntropy loss = 0.0337656, Evaluation criterion = 0
MPI Rank 0: Minibatch 780: CrossEntropy loss = 0.080909252, Evaluation criterion = 0.04
MPI Rank 0: Minibatch 0: CrossEntropy loss = 0.69225273, Evaluation criterion = 0.48
MPI Rank 0: Minibatch 20: CrossEntropy loss = 0.68108383, Evaluation criterion = 0.44
MPI Rank 0: Minibatch 40: CrossEntropy loss = 0.55950417, Evaluation criterion = 0.4
MPI Rank 0: Minibatch 60: CrossEntropy loss = 0.21498363, Evaluation criterion = 0.04
MPI Rank 0: Minibatch 80: CrossEntropy loss = 0.17286421, Evaluation criterion = 0.04
MPI Rank 0: Minibatch 100: CrossEntropy loss = 0.10292575, Evaluation criterion = 0.04
MPI Rank 0: Minibatch 120: CrossEntropy loss = 0.13511962, Evaluation criterion = 0.04
MPI Rank 0: Minibatch 140: CrossEntropy loss = 0.091661301, Evaluation criterion = 0.04
MPI Rank 0: Minibatch 160: CrossEntropy loss = 0.057151217, Evaluation criterion = 0
MPI Rank 0: Minibatch 180: CrossEntropy loss = 0.064163666, Evaluation criterion = 0.04
MPI Rank 0: Minibatch 200: CrossEntropy loss = 0.20556129, Evaluation criterion = 0.16
MPI Rank 0: Minibatch 220: CrossEntropy loss = 0.14280916, Evaluation criterion = 0.08
MPI Rank 0: Minibatch 240: CrossEntropy loss = 0.11125059, Evaluation criterion = 0.04
MPI Rank 0: Minibatch 260: CrossEntropy loss = 0.082986908, Evaluation criterion = 0
MPI Rank 0: Minibatch 280: CrossEntropy loss = 0.51586842, Evaluation criterion = 0.24
MPI Rank 0: Minibatch 300: CrossEntropy loss = 0.16666496, Evaluation criterion = 0.04
MPI Rank 0: Minibatch 320: CrossEntropy loss = 0.096743765, Evaluation criterion = 0.08
MPI Rank 0: Minibatch 340: CrossEntropy loss = 0.23621889, Evaluation criterion = 0.12
MPI Rank 0: Minibatch 360: CrossEntropy loss = 0.16260574, Evaluation criterion = 0.12
MPI Rank 0: Minibatch 380: CrossEntropy loss = 0.077602329, Evaluation criterion = 0.04
MPI Rank 0: Minibatch 400: CrossEntropy loss = 0.1224162, Evaluation criterion = 0.08
MPI Rank 0: Minibatch 420: CrossEntropy loss = 0.097998457, Evaluation criterion = 0.08
MPI Rank 0: Minibatch 440: CrossEntropy loss = 0.1605204, Evaluation criterion = 0.08
MPI Rank 0: Minibatch 460: CrossEntropy loss = 0.14325989, Evaluation criterion = 0.08
MPI Rank 0: Minibatch 480: CrossEntropy loss = 0.10864172, Evaluation criterion = 0.08
MPI Rank 0: Minibatch 500: CrossEntropy loss = 0.13486629, Evaluation criterion = 0.12
MPI Rank 0: Minibatch 520: CrossEntropy loss = 0.08147995, Evaluation criterion = 0.04
MPI Rank 0: Minibatch 540: CrossEntropy loss = 0.1372897, Evaluation criterion = 0.08
MPI Rank 0: Minibatch 560: CrossEntropy loss = 0.085105867, Evaluation criterion = 0.04
MPI Rank 0: Minibatch 580: CrossEntropy loss = 0.20932877, Evaluation criterion = 0.08
MPI Rank 0: Minibatch 600: CrossEntropy loss = 0.16583017, Evaluation criterion = 0.08
MPI Rank 0: Minibatch 620: CrossEntropy loss = 0.084400692, Evaluation criterion = 0.04
MPI Rank 0: Minibatch 640: CrossEntropy loss = 0.23501949, Evaluation criterion = 0.12
MPI Rank 0: Minibatch 660: CrossEntropy loss = 0.17227427, Evaluation criterion = 0.12
MPI Rank 0: Minibatch 680: CrossEntropy loss = 0.11319256, Evaluation criterion = 0.08
MPI Rank 0: Minibatch 700: CrossEntropy loss = 0.089835396, Evaluation criterion = 0.04
MPI Rank 0: Minibatch 720: CrossEntropy loss = 0.15484819, Evaluation criterion = 0.08
MPI Rank 0: Minibatch 740: CrossEntropy loss = 0.13176038, Evaluation criterion = 0.08
MPI Rank 0: Minibatch 760: CrossEntropy loss = 0.034538326, Evaluation criterion = 0
MPI Rank 0: Minibatch 780: CrossEntropy loss = 0.082417784, Evaluation criterion = 0.04
MPI Rank 0:
MPI Rank 0: CNTKv2LibraryDistribution tests: Passed
MPI Rank 1: Minibatch 0: CrossEntropy loss = 0.69241676, Evaluation criterion = 0.48
MPI Rank 1: Minibatch 20: CrossEntropy loss = 0.69337639, Evaluation criterion = 0.44
MPI Rank 1: Minibatch 40: CrossEntropy loss = 0.60980656, Evaluation criterion = 0.44
MPI Rank 1: Minibatch 60: CrossEntropy loss = 0.28104153, Evaluation criterion = 0.04
MPI Rank 1: Minibatch 80: CrossEntropy loss = 0.17967899, Evaluation criterion = 0.04
MPI Rank 1: Minibatch 100: CrossEntropy loss = 0.10600471, Evaluation criterion = 0.04
MPI Rank 1: Minibatch 120: CrossEntropy loss = 0.1360253, Evaluation criterion = 0.04
MPI Rank 1: Minibatch 140: CrossEntropy loss = 0.091501751, Evaluation criterion = 0.04
MPI Rank 1: Minibatch 160: CrossEntropy loss = 0.056065564, Evaluation criterion = 0
MPI Rank 1: Minibatch 180: CrossEntropy loss = 0.065461845, Evaluation criterion = 0.04
MPI Rank 1: Minibatch 200: CrossEntropy loss = 0.20466948, Evaluation criterion = 0.16
MPI Rank 1: Minibatch 220: CrossEntropy loss = 0.14153589, Evaluation criterion = 0.08
MPI Rank 1: Minibatch 240: CrossEntropy loss = 0.11171801, Evaluation criterion = 0.04
MPI Rank 1: Minibatch 260: CrossEntropy loss = 0.082311573, Evaluation criterion = 0
MPI Rank 1: Minibatch 280: CrossEntropy loss = 0.51050812, Evaluation criterion = 0.24
MPI Rank 1: Minibatch 300: CrossEntropy loss = 0.16595322, Evaluation criterion = 0.04
MPI Rank 1: Minibatch 320: CrossEntropy loss = 0.097113705, Evaluation criterion = 0.08
MPI Rank 1: Minibatch 340: CrossEntropy loss = 0.23648327, Evaluation criterion = 0.12
MPI Rank 1: Minibatch 360: CrossEntropy loss = 0.16291708, Evaluation criterion = 0.12
MPI Rank 1: Minibatch 380: CrossEntropy loss = 0.078984389, Evaluation criterion = 0.04
MPI Rank 1: Minibatch 400: CrossEntropy loss = 0.12134525, Evaluation criterion = 0.08
MPI Rank 1: Minibatch 420: CrossEntropy loss = 0.098799028, Evaluation criterion = 0.04
MPI Rank 1: Minibatch 440: CrossEntropy loss = 0.16222105, Evaluation criterion = 0.08
MPI Rank 1: Minibatch 460: CrossEntropy loss = 0.14262832, Evaluation criterion = 0.08
MPI Rank 1: Minibatch 480: CrossEntropy loss = 0.10843548, Evaluation criterion = 0.08
MPI Rank 1: Minibatch 500: CrossEntropy loss = 0.13464042, Evaluation criterion = 0.12
MPI Rank 1: Minibatch 520: CrossEntropy loss = 0.081623363, Evaluation criterion = 0.04
MPI Rank 1: Minibatch 540: CrossEntropy loss = 0.1375001, Evaluation criterion = 0.08
MPI Rank 1: Minibatch 560: CrossEntropy loss = 0.085772104, Evaluation criterion = 0.04
MPI Rank 1: Minibatch 580: CrossEntropy loss = 0.20834332, Evaluation criterion = 0.08
MPI Rank 1: Minibatch 600: CrossEntropy loss = 0.16688332, Evaluation criterion = 0.08
MPI Rank 1: Minibatch 620: CrossEntropy loss = 0.084589157, Evaluation criterion = 0.04
MPI Rank 1: Minibatch 640: CrossEntropy loss = 0.23415596, Evaluation criterion = 0.12
MPI Rank 1: Minibatch 660: CrossEntropy loss = 0.17398777, Evaluation criterion = 0.12
MPI Rank 1: Minibatch 680: CrossEntropy loss = 0.11343967, Evaluation criterion = 0.08
MPI Rank 1: Minibatch 700: CrossEntropy loss = 0.090730791, Evaluation criterion = 0.04
MPI Rank 1: Minibatch 720: CrossEntropy loss = 0.15637251, Evaluation criterion = 0.08
MPI Rank 1: Minibatch 740: CrossEntropy loss = 0.13192295, Evaluation criterion = 0.08
MPI Rank 1: Minibatch 760: CrossEntropy loss = 0.0337656, Evaluation criterion = 0
MPI Rank 1: Minibatch 780: CrossEntropy loss = 0.080909252, Evaluation criterion = 0.04
MPI Rank 1: Minibatch 0: CrossEntropy loss = 0.69225273, Evaluation criterion = 0.48
MPI Rank 1: Minibatch 20: CrossEntropy loss = 0.68108383, Evaluation criterion = 0.44
MPI Rank 1: Minibatch 40: CrossEntropy loss = 0.55950417, Evaluation criterion = 0.4
MPI Rank 1: Minibatch 60: CrossEntropy loss = 0.21498363, Evaluation criterion = 0.04
MPI Rank 1: Minibatch 80: CrossEntropy loss = 0.17286421, Evaluation criterion = 0.04
MPI Rank 1: Minibatch 100: CrossEntropy loss = 0.10292575, Evaluation criterion = 0.04
MPI Rank 1: Minibatch 120: CrossEntropy loss = 0.13511962, Evaluation criterion = 0.04
MPI Rank 1: Minibatch 140: CrossEntropy loss = 0.091661301, Evaluation criterion = 0.04
MPI Rank 1: Minibatch 160: CrossEntropy loss = 0.057151217, Evaluation criterion = 0
MPI Rank 1: Minibatch 180: CrossEntropy loss = 0.064163666, Evaluation criterion = 0.04
MPI Rank 1: Minibatch 200: CrossEntropy loss = 0.20556129, Evaluation criterion = 0.16
MPI Rank 1: Minibatch 220: CrossEntropy loss = 0.14280916, Evaluation criterion = 0.08
MPI Rank 1: Minibatch 240: CrossEntropy loss = 0.11125059, Evaluation criterion = 0.04
MPI Rank 1: Minibatch 260: CrossEntropy loss = 0.082986908, Evaluation criterion = 0
MPI Rank 1: Minibatch 280: CrossEntropy loss = 0.51586842, Evaluation criterion = 0.24
MPI Rank 1: Minibatch 300: CrossEntropy loss = 0.16666496, Evaluation criterion = 0.04
MPI Rank 1: Minibatch 320: CrossEntropy loss = 0.096743765, Evaluation criterion = 0.08
MPI Rank 1: Minibatch 340: CrossEntropy loss = 0.23621889, Evaluation criterion = 0.12
MPI Rank 1: Minibatch 360: CrossEntropy loss = 0.16260574, Evaluation criterion = 0.12
MPI Rank 1: Minibatch 380: CrossEntropy loss = 0.077602329, Evaluation criterion = 0.04
MPI Rank 1: Minibatch 400: CrossEntropy loss = 0.1224162, Evaluation criterion = 0.08
MPI Rank 1: Minibatch 420: CrossEntropy loss = 0.097998457, Evaluation criterion = 0.08
MPI Rank 1: Minibatch 440: CrossEntropy loss = 0.1605204, Evaluation criterion = 0.08
MPI Rank 1: Minibatch 460: CrossEntropy loss = 0.14325989, Evaluation criterion = 0.08
MPI Rank 1: Minibatch 480: CrossEntropy loss = 0.10864172, Evaluation criterion = 0.08
MPI Rank 1: Minibatch 500: CrossEntropy loss = 0.13486629, Evaluation criterion = 0.12
MPI Rank 1: Minibatch 520: CrossEntropy loss = 0.08147995, Evaluation criterion = 0.04
MPI Rank 1: Minibatch 540: CrossEntropy loss = 0.1372897, Evaluation criterion = 0.08
MPI Rank 1: Minibatch 560: CrossEntropy loss = 0.085105867, Evaluation criterion = 0.04
MPI Rank 1: Minibatch 580: CrossEntropy loss = 0.20932877, Evaluation criterion = 0.08
MPI Rank 1: Minibatch 600: CrossEntropy loss = 0.16583017, Evaluation criterion = 0.08
MPI Rank 1: Minibatch 620: CrossEntropy loss = 0.084400692, Evaluation criterion = 0.04
MPI Rank 1: Minibatch 640: CrossEntropy loss = 0.23501949, Evaluation criterion = 0.12
MPI Rank 1: Minibatch 660: CrossEntropy loss = 0.17227427, Evaluation criterion = 0.12
MPI Rank 1: Minibatch 680: CrossEntropy loss = 0.11319256, Evaluation criterion = 0.08
MPI Rank 1: Minibatch 700: CrossEntropy loss = 0.089835396, Evaluation criterion = 0.04
MPI Rank 1: Minibatch 720: CrossEntropy loss = 0.15484819, Evaluation criterion = 0.08
MPI Rank 1: Minibatch 740: CrossEntropy loss = 0.13176038, Evaluation criterion = 0.08
MPI Rank 1: Minibatch 760: CrossEntropy loss = 0.034538326, Evaluation criterion = 0
MPI Rank 1: Minibatch 780: CrossEntropy loss = 0.082417784, Evaluation criterion = 0.04
MPI Rank 1:
MPI Rank 1: CNTKv2LibraryDistribution tests: Passed
/cygdrive/c/repo/cntk_github6/CNTK/Tests/EndToEndTests/CNTKv2LibraryDistribution/UnitTests

Просмотреть файл

@ -2,7 +2,7 @@
. $TEST_ROOT_DIR/run-test-common
set -x
#set -x
# This test uses a large dataset which is not part of the CNTK repository itself
# We use the dataset from an external location specified using an environment variable
@ -30,11 +30,23 @@ fi
pushd $TestDataDir
if [ "$OS" == "Windows_NT" ]; then
$TEST_BIN_DIR/V2LibraryDistributionTests.exe
RunDir=$(cygpath -aw $RunDir)
fi
LogPath=$RunDir/v2library.log
Instances=2
if [ "$OS" == "Windows_NT" ]; then
TestBinaryPath=$(cygpath -aw $TEST_BIN_DIR/V2LibraryDistributionTests.exe)
run "$MPI_BINARY" -n $Instances -l $TestBinaryPath $LogPath
else
$TEST_BIN_DIR/v2librarydistributiontests
TestBinaryPath=$TEST_BIN_DIR/v2librarydistributiontests
run "$MPI_BINARY" -n $Instances $TEST_BIN_DIR/v2librarydistributiontests $LogPath
fi
sed 's/^/MPI Rank 0: /' "$LogPath"0
sed 's/^/MPI Rank 1: /' "$LogPath"1
ExitCode=$?
# Delete the test data

Просмотреть файл

@ -7,5 +7,7 @@ tags:
testCases:
Test run must be completed:
patterns:
- "CNTKv2LibraryDistribution tests: Passed"
- ^MPI Rank {{integer}}
- CNTKv2LibraryDistribution tests
- Passed

Просмотреть файл

@ -0,0 +1,96 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE.md file in the project root for full license information.
//
#define _CRT_SECURE_NO_WARNINGS // "secure" CRT not available on all platforms --add this at the top of all CPP files that give "function or variable may be unsafe" warnings
#include "CNTKLibrary.h"
#include "Common.h"
using namespace CNTK;
using namespace std::placeholders;
extern bool Is1bitSGDAvailable();
void TrainSimpleDistributedFeedForwardClassifer(const DeviceDescriptor& device, DistributedTrainerPtr distributedTrainer, size_t rank)
{
UNUSED(rank);
const size_t inputDim = 2;
const size_t numOutputClasses = 2;
const size_t hiddenLayerDim = 50;
const size_t numHiddenLayers = 2;
const size_t minibatchSize = 25;
const size_t numSamplesPerSweep = 10000;
const size_t numSweepsToTrainWith = 2;
const size_t numMinibatchesToTrain = (numSamplesPerSweep * numSweepsToTrainWith) / minibatchSize;
auto featureStreamName = L"features";
auto labelsStreamName = L"labels";
auto minibatchSource = TextFormatMinibatchSource(L"SimpleDataTrain_cntk_text.txt", { { featureStreamName, inputDim }, { labelsStreamName, numOutputClasses } }, MinibatchSource::FullDataSweep, false);
auto featureStreamInfo = minibatchSource->StreamInfo(featureStreamName);
auto labelStreamInfo = minibatchSource->StreamInfo(labelsStreamName);
std::unordered_map<StreamInformation, std::pair<NDArrayViewPtr, NDArrayViewPtr>> inputMeansAndInvStdDevs = { { featureStreamInfo, { nullptr, nullptr } } };
ComputeInputPerDimMeansAndInvStdDevs(minibatchSource, inputMeansAndInvStdDevs);
auto nonLinearity = std::bind(Sigmoid, _1, L"Sigmoid");
auto input = InputVariable({ inputDim }, DataType::Float, L"features");
auto normalizedinput = PerDimMeanVarianceNormalize(input, inputMeansAndInvStdDevs[featureStreamInfo].first, inputMeansAndInvStdDevs[featureStreamInfo].second);
auto classifierOutput = FullyConnectedDNNLayer(normalizedinput, hiddenLayerDim, device, nonLinearity, std::wstring(L"FullyConnectedInput"));
for (size_t i = 1; i < numHiddenLayers; ++i)
classifierOutput = FullyConnectedDNNLayer(classifierOutput, hiddenLayerDim, device, nonLinearity, std::wstring(L"FullyConnectedHidden"));
auto outputTimesParam = Parameter(NDArrayView::RandomUniform<float>({ numOutputClasses, hiddenLayerDim }, -0.05, 0.05, 1, device), L"outputTimesParam");
auto outputBiasParam = Parameter(NDArrayView::RandomUniform<float>({ numOutputClasses }, -0.05, 0.05, 1, device), L"outputBiasParam");
classifierOutput = Plus(outputBiasParam, Times(outputTimesParam, classifierOutput), L"classifierOutput");
auto labels = InputVariable({ numOutputClasses }, DataType::Float, L"labels");
auto trainingLoss = CNTK::CrossEntropyWithSoftmax(classifierOutput, labels, L"lossFunction");;
auto prediction = CNTK::ClassificationError(classifierOutput, labels, L"classificationError");
double learningRatePerSample = 0.02;
minibatchSource = TextFormatMinibatchSource(L"SimpleDataTrain_cntk_text.txt", { { L"features", inputDim }, { L"labels", numOutputClasses } });
Trainer trainer(classifierOutput, trainingLoss, prediction, { SGDLearner(classifierOutput->Parameters(), learningRatePerSample) }, distributedTrainer);
size_t outputFrequencyInMinibatches = 20;
for (size_t i = 0; i < numMinibatchesToTrain; ++i)
{
auto minibatchData = minibatchSource->GetNextMinibatch(minibatchSize, device);
trainer.TrainMinibatch({ { input, minibatchData[featureStreamInfo].m_data }, { labels, minibatchData[labelStreamInfo].m_data } }, device);
PrintTrainingProgress(trainer, i, outputFrequencyInMinibatches);
}
}
void TestFrameMode()
{
{
auto communicator = MPICommunicator();
auto distributedTrainer = CreateDataParallelDistributedTrainer(communicator, false);
TrainSimpleDistributedFeedForwardClassifer(DeviceDescriptor::CPUDevice(), distributedTrainer, communicator->CurrentWorker().m_globalRank);
if (IsGPUAvailable())
TrainSimpleDistributedFeedForwardClassifer(DeviceDescriptor::GPUDevice(0), distributedTrainer, communicator->CurrentWorker().m_globalRank);
}
if (Is1bitSGDAvailable())
{
{
auto communicator = QuantizedMPICommunicator(true, true, 32);
auto distributedTrainer = CreateQuantizedDataParallelDistributedTrainer(communicator, false);
TrainSimpleDistributedFeedForwardClassifer(DeviceDescriptor::CPUDevice(), distributedTrainer, communicator->CurrentWorker().m_globalRank);
if (IsGPUAvailable())
TrainSimpleDistributedFeedForwardClassifer(DeviceDescriptor::GPUDevice(0), distributedTrainer, communicator->CurrentWorker().m_globalRank);
}
{
auto communicator = MPICommunicator();
auto distributedTrainer = CreateBlockMomentumDistributedTrainer(communicator, 1024);
TrainSimpleDistributedFeedForwardClassifer(DeviceDescriptor::CPUDevice(), distributedTrainer, communicator->CurrentWorker().m_globalRank);
if (IsGPUAvailable())
TrainSimpleDistributedFeedForwardClassifer(DeviceDescriptor::GPUDevice(0), distributedTrainer, communicator->CurrentWorker().m_globalRank);
}
}
}

Просмотреть файл

@ -7,99 +7,15 @@
#include "CNTKLibrary.h"
#include "Common.h"
#include <cstdio>
using namespace CNTK;
using namespace std::placeholders;
bool Is1bitSGDAvailable()
void TestFrameMode();
int main(int argc, char* argv[])
{
static bool is1bitSGDAvailable;
static bool isInitialized = false;
if (!isInitialized)
{
const char* p = getenv("TEST_1BIT_SGD");
// Check the environment variable TEST_1BIT_SGD to decide whether to run on a CPU-only device.
if (p != nullptr && 0 == strcmp(p, "0"))
{
is1bitSGDAvailable = false;
}
else
{
is1bitSGDAvailable = true;
}
isInitialized = true;
}
return is1bitSGDAvailable;
}
// TODO: Move to other file.
void TrainSimpleDistributedFeedForwardClassifer(const DeviceDescriptor& device, DistributedTrainerPtr distributedTrainer, size_t rank)
{
const size_t inputDim = 2;
const size_t numOutputClasses = 2;
const size_t hiddenLayerDim = 50;
const size_t numHiddenLayers = 2;
const size_t minibatchSize = 25;
const size_t numSamplesPerSweep = 10000;
const size_t numSweepsToTrainWith = 2;
const size_t numMinibatchesToTrain = (numSamplesPerSweep * numSweepsToTrainWith) / minibatchSize;
auto featureStreamName = L"features";
auto labelsStreamName = L"labels";
auto minibatchSource = TextFormatMinibatchSource(L"SimpleDataTrain_cntk_text.txt", { { featureStreamName, inputDim }, { labelsStreamName, numOutputClasses } }, MinibatchSource::FullDataSweep, false);
auto featureStreamInfo = minibatchSource->StreamInfo(featureStreamName);
auto labelStreamInfo = minibatchSource->StreamInfo(labelsStreamName);
std::unordered_map<StreamInformation, std::pair<NDArrayViewPtr, NDArrayViewPtr>> inputMeansAndInvStdDevs = { { featureStreamInfo, { nullptr, nullptr } } };
ComputeInputPerDimMeansAndInvStdDevs(minibatchSource, inputMeansAndInvStdDevs);
auto nonLinearity = std::bind(Sigmoid, _1, L"Sigmoid");
auto input = InputVariable({ inputDim }, DataType::Float, L"features");
auto normalizedinput = PerDimMeanVarianceNormalize(input, inputMeansAndInvStdDevs[featureStreamInfo].first, inputMeansAndInvStdDevs[featureStreamInfo].second);
auto classifierOutput = FullyConnectedDNNLayer(normalizedinput, hiddenLayerDim, device, nonLinearity, std::wstring(L"FullyConnectedInput") );
for (size_t i = 1; i < numHiddenLayers; ++i)
classifierOutput = FullyConnectedDNNLayer(classifierOutput, hiddenLayerDim, device, nonLinearity, std::wstring(L"FullyConnectedHidden"));
auto outputTimesParam = Parameter(NDArrayView::RandomUniform<float>({ numOutputClasses, hiddenLayerDim }, -0.05, 0.05, 1, device), L"outputTimesParam");
auto outputBiasParam = Parameter(NDArrayView::RandomUniform<float>({ numOutputClasses }, -0.05, 0.05, 1, device), L"outputBiasParam");
classifierOutput = Plus(outputBiasParam, Times(outputTimesParam, classifierOutput), L"classifierOutput");
auto labels = InputVariable({ numOutputClasses }, DataType::Float, L"labels");
auto trainingLoss = CNTK::CrossEntropyWithSoftmax(classifierOutput, labels, L"lossFunction");;
auto prediction = CNTK::ClassificationError(classifierOutput, labels, L"classificationError");
// Test save and reload of model
{
Variable classifierOutputVar = classifierOutput;
Variable trainingLossVar = trainingLoss;
Variable predictionVar = prediction;
auto combinedNet = Combine({ trainingLoss, prediction, classifierOutput }, L"feedForwardClassifier");
SaveAndReloadModel<float>(combinedNet, { &input, &labels, &trainingLossVar, &predictionVar, &classifierOutputVar }, device, rank);
classifierOutput = classifierOutputVar;
trainingLoss = trainingLossVar;
prediction = predictionVar;
}
double learningRatePerSample = 0.02;
minibatchSource = TextFormatMinibatchSource(L"SimpleDataTrain_cntk_text.txt", { { L"features", inputDim }, { L"labels", numOutputClasses } });
Trainer trainer(classifierOutput, trainingLoss, prediction, { SGDLearner(classifierOutput->Parameters(), learningRatePerSample) }, distributedTrainer);
size_t outputFrequencyInMinibatches = 20;
for (size_t i = 0; i < numMinibatchesToTrain; ++i)
{
auto minibatchData = minibatchSource->GetNextMinibatch(minibatchSize, device);
trainer.TrainMinibatch({ { input, minibatchData[featureStreamInfo].m_data }, { labels, minibatchData[labelStreamInfo].m_data } }, device);
PrintTrainingProgress(trainer, i, outputFrequencyInMinibatches);
}
}
int main(int /*argc*/, char* /*argv*/[])
{
#if defined(_MSC_VER)
// in case of asserts in debug mode, print the message into stderr and throw exception
if (_CrtSetReportHook2(_CRT_RPTHOOK_INSTALL, HandleDebugAssert) == -1) {
@ -108,47 +24,38 @@ int main(int /*argc*/, char* /*argv*/[])
}
#endif
if (argc != 2)
{
fprintf(stderr, "Expecting a log file parameter.\n");
return -1; // Unexpected number of parameters given.
}
{
auto communicator = MPICommunicator();
std::string logFilename = argv[1] + std::to_string(communicator->CurrentWorker().m_globalRank);
auto result = freopen(logFilename.c_str(), "w", stdout);
if (result == nullptr)
{
fprintf(stderr, "Could not redirect stdout.\n");
return -1;
}
}
// Lets disable automatic unpacking of PackedValue object to detect any accidental unpacking
// which will have a silent performance degradation otherwise
Internal::SetAutomaticUnpackingOfPackedValues(/*disable =*/ true);
{
auto communicator = MPICommunicator();
auto distributedTrainer = CreateDataParallelDistributedTrainer(communicator, false);
TrainSimpleDistributedFeedForwardClassifer(DeviceDescriptor::CPUDevice(), distributedTrainer, communicator->CurrentWorker().m_globalRank);
TestFrameMode();
if (IsGPUAvailable())
TrainSimpleDistributedFeedForwardClassifer(DeviceDescriptor::GPUDevice(0), distributedTrainer, communicator->CurrentWorker().m_globalRank);
}
if (Is1bitSGDAvailable())
{
{
auto communicator = QuantizedMPICommunicator(true, true, 32);
auto distributedTrainer = CreateQuantizedDataParallelDistributedTrainer(communicator, false);
TrainSimpleDistributedFeedForwardClassifer(DeviceDescriptor::CPUDevice(), distributedTrainer, communicator->CurrentWorker().m_globalRank);
if (IsGPUAvailable())
TrainSimpleDistributedFeedForwardClassifer(DeviceDescriptor::GPUDevice(0), distributedTrainer, communicator->CurrentWorker().m_globalRank);
}
{
auto communicator = MPICommunicator();
auto distributedTrainer = CreateBlockMomentumDistributedTrainer(communicator, 1024);
TrainSimpleDistributedFeedForwardClassifer(DeviceDescriptor::CPUDevice(), distributedTrainer, communicator->CurrentWorker().m_globalRank);
if (IsGPUAvailable())
TrainSimpleDistributedFeedForwardClassifer(DeviceDescriptor::GPUDevice(0), distributedTrainer, communicator->CurrentWorker().m_globalRank);
}
}
fprintf(stderr, "\nCNTKv2LibraryDistribution tests: Passed\n");
fflush(stderr);
printf("\nCNTKv2LibraryDistribution tests: Passed\n");
fflush(stdout);
#if defined(_MSC_VER)
_CrtSetReportHook2(_CRT_RPTHOOK_REMOVE, HandleDebugAssert);
#endif
DistributedCommunicator::Finalize();
fclose(stdout);
return 0;
}

Просмотреть файл

@ -92,7 +92,6 @@
<AdditionalOptions>/d2Zi+ %(AdditionalOptions)</AdditionalOptions>
<TreatWarningAsError>true</TreatWarningAsError>
<RuntimeLibrary Condition="'$(Configuration)|$(Platform)'=='Release|x64'">MultiThreaded</RuntimeLibrary>
<BrowseInformation Condition="'$(Configuration)|$(Platform)'=='Release|x64'">true</BrowseInformation>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>
@ -101,9 +100,6 @@
<OptimizeReferences>true</OptimizeReferences>
<AdditionalDependencies>CNTKLibrary-2.0.lib;kernel32.lib;user32.lib;gdi32.lib;winspool.lib;comdlg32.lib;advapi32.lib;shell32.lib;ole32.lib;oleaut32.lib;uuid.lib;odbc32.lib;odbccp32.lib;%(AdditionalDependencies)</AdditionalDependencies>
</Link>
<Bscmake>
<PreserveSbr Condition="'$(Configuration)|$(Platform)'=='Release|x64'">true</PreserveSbr>
</Bscmake>
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="$(CpuOnlyBuild)">
<ClCompile>
@ -114,6 +110,7 @@
</ItemDefinitionGroup>
<ItemGroup>
<ClCompile Include="..\V2LibraryTests\Common.cpp" />
<ClCompile Include="FrameModeTests.cpp" />
<ClCompile Include="Main.cpp" />
</ItemGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />

Просмотреть файл

@ -21,5 +21,8 @@
<ClCompile Include="..\V2LibraryTests\Common.cpp">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="FrameModeTests.cpp">
<Filter>Source Files</Filter>
</ClCompile>
</ItemGroup>
</Project>

Просмотреть файл

@ -53,3 +53,27 @@ bool IsGPUAvailable()
return isGPUDeviceAvailable;
}
bool Is1bitSGDAvailable()
{
static bool is1bitSGDAvailable;
static bool isInitialized = false;
if (!isInitialized)
{
const char* p = getenv("TEST_1BIT_SGD");
// Check the environment variable TEST_1BIT_SGD to decide whether to run on a CPU-only device.
if (p != nullptr && 0 == strcmp(p, "0"))
{
is1bitSGDAvailable = false;
}
else
{
is1bitSGDAvailable = true;
}
isInitialized = true;
}
return is1bitSGDAvailable;
}