From 6fbce83eebbdbf69c782632aadc17c408a84ade5 Mon Sep 17 00:00:00 2001 From: Eldar Akchurin Date: Thu, 28 Apr 2016 15:45:54 +0200 Subject: [PATCH] Apdating to the latest support of different MBLayouts --- .../CompositeDataReader.cpp | 153 +++--------------- .../CompositeDataReader/CompositeDataReader.h | 36 ++--- .../Readers/CompositeDataReader/Exports.cpp | 11 +- .../LSTM/FullUtterance/cntk.cntk | 25 +-- 4 files changed, 51 insertions(+), 174 deletions(-) diff --git a/Source/Readers/CompositeDataReader/CompositeDataReader.cpp b/Source/Readers/CompositeDataReader/CompositeDataReader.cpp index 1c9e69280..5bbcba163 100644 --- a/Source/Readers/CompositeDataReader/CompositeDataReader.cpp +++ b/Source/Readers/CompositeDataReader/CompositeDataReader.cpp @@ -25,26 +25,10 @@ namespace Microsoft { namespace MSR { namespace CNTK { -CompositeDataReader::CompositeDataReader(const std::string& precision) : m_layout(make_shared()), - m_precision(precision), +CompositeDataReader::CompositeDataReader(const ConfigParameters& config, MemoryProviderPtr provider) : m_layout(make_shared()), m_corpus(std::make_shared()), - m_endOfEpoch(false) + m_provider(provider) { -} - -void CompositeDataReader::Init(const ConfigParameters& config) -{ - m_provider = std::make_shared(); - - // if prefetch - launching asynchronously, - // otherwise deferring - synchronous execution during .get() call - bool prefetch = config(L"prefetch", true); - m_launchType = prefetch ? launch::async : launch::deferred; - - // Layout can be asked before actual reading. - // TODO: should be gone when SGD changed. - m_layout->Init(0, 0); - // Identifying packing mode. bool frameMode = config(L"frameMode", true); bool truncated = config(L"truncated", false); @@ -60,6 +44,11 @@ void CompositeDataReader::Init(const ConfigParameters& config) else if (truncated) { m_packingMode = PackingMode::truncated; + m_truncationLength = config(L"truncationLength", 0); + if (m_truncationLength == 0) + { + InvalidArgument("Truncation length cannot be 0."); + } } else { @@ -105,124 +94,14 @@ void CompositeDataReader::Init(const ConfigParameters& config) } } -void CompositeDataReader::StartMinibatchLoop(size_t mbSize, size_t epoch, size_t requestedEpochSamples) +std::vector CompositeDataReader::GetStreamDescriptions() { - return StartDistributedMinibatchLoop(mbSize, epoch, 0, 1, requestedEpochSamples); + return m_streams; } -void CompositeDataReader::StartDistributedMinibatchLoop( - size_t requestedMBSize, - size_t epoch, - size_t subsetNum, - size_t numSubsets, - size_t requestedEpochSamples /*= requestDataSize*/) +Minibatch CompositeDataReader::ReadMinibatch() { - EpochConfiguration config; - config.m_workerRank = subsetNum; - config.m_numberOfWorkers = numSubsets; - config.m_minibatchSizeInSamples = requestedMBSize; - config.m_totalEpochSizeInSamples = requestedEpochSamples; - config.m_epochIndex = epoch; - - // Make sure there are no outstanding reads. - if (m_prefetchTask.valid()) - { - m_prefetchTask.wait(); - } - - m_endOfEpoch = false; - - // Nothing is running, let's reconfigure the packer according to the new epoch. - StartEpoch(config); - - // Ok, start reading in sync or async manner. - m_prefetchTask = std::async(m_launchType, [this]() - { - return m_packer->ReadMinibatch(); - }); -} - -bool CompositeDataReader::GetMinibatch(StreamMinibatchInputs& matrices) -{ - if (m_endOfEpoch) - { - return false; - } - - // Check that all matrices have the same device id. - // If not we should inject the IMemoryProvider per stream. - int deviceId = matrices.begin()->second.matrix->GetDeviceId(); - for (auto mx : matrices) - { - if (mx.second.matrix->GetDeviceId() != deviceId) - { - assert(false); - } - } - - assert(m_prefetchTask.valid()); - - Minibatch minibatch = m_prefetchTask.get(); - if (minibatch.m_endOfEpoch) - { - m_endOfEpoch = true; - if (minibatch.m_data.empty()) - { - return false; - } - } - - if (!minibatch.m_data.empty()) - { - // TODO: Use alternating pinned buffer in the packer, do not copy anything, but pack into the pinned memory. - // Copy returned minibatch to the matrices. - for (const auto& mx : matrices) - { - assert(m_nameToStreamId.find(mx.first) != m_nameToStreamId.end()); - size_t streamId = m_nameToStreamId[mx.first]; - - const auto& stream = minibatch.m_data[streamId]; - m_layout->CopyFrom(stream->m_layout); - - size_t columnNumber = m_layout->GetNumCols(); - size_t rowNumber = m_streams[streamId]->m_sampleLayout->GetNumElements(); - - if (m_precision == "float") - { - auto* data = reinterpret_cast(stream->m_data); - matrices.GetInputMatrix(mx.first).SetValue(rowNumber, columnNumber, mx.second.matrix->GetDeviceId(), const_cast(data), matrixFlagNormal); - } - else - { - assert(m_precision == "double"); - auto* data = reinterpret_cast(stream->m_data); - matrices.GetInputMatrix(mx.first).SetValue(rowNumber, columnNumber, mx.second.matrix->GetDeviceId(), const_cast(data), matrixFlagNormal); - } - } - } - - m_prefetchTask = std::async(m_launchType, [this]() - { - return m_packer->ReadMinibatch(); - }); - - return !minibatch.m_data.empty(); -} - -bool CompositeDataReader::DataEnd() -{ - // Note: Return value never used. - return false; -} - -void CompositeDataReader::CopyMBLayoutTo(MBLayoutPtr layout) -{ - layout->CopyFrom(m_layout); -} - -size_t CompositeDataReader::GetNumParallelSequences() -{ - return m_layout->GetNumParallelSequences(); + return m_packer->ReadMinibatch(); } void CompositeDataReader::CreateDeserializers(const ConfigParameters& readerConfig) @@ -263,8 +142,10 @@ IDataDeserializerPtr CompositeDataReader::CreateDeserializer(const ConfigParamet return IDataDeserializerPtr(d); } -void CompositeDataReader::StartEpoch(const EpochConfiguration& config) +void CompositeDataReader::StartEpoch(const EpochConfiguration& cfg) { + EpochConfiguration config = cfg; + if (config.m_totalEpochSizeInSamples <= 0) { RuntimeError("Unsupported minibatch size '%d'.", (int)config.m_totalEpochSizeInSamples); @@ -289,6 +170,7 @@ void CompositeDataReader::StartEpoch(const EpochConfiguration& config) break; case PackingMode::truncated: { + config.m_truncationSize = m_truncationLength; m_packer = std::make_shared( m_provider, m_randomizer, @@ -298,7 +180,8 @@ void CompositeDataReader::StartEpoch(const EpochConfiguration& config) default: LogicError("Unsupported type of packer '%d'.", (int)m_packingMode); } + + m_packer->StartEpoch(config); } -}}} - +}}} \ No newline at end of file diff --git a/Source/Readers/CompositeDataReader/CompositeDataReader.h b/Source/Readers/CompositeDataReader/CompositeDataReader.h index 3aae682cb..70b60cbe4 100644 --- a/Source/Readers/CompositeDataReader/CompositeDataReader.h +++ b/Source/Readers/CompositeDataReader/CompositeDataReader.h @@ -9,6 +9,7 @@ #include #include #include "DataReader.h" +#include namespace Microsoft { namespace MSR { namespace CNTK { @@ -48,41 +49,24 @@ struct Minibatch; // TODO: Add transformers as the next step. // TODO: Same code as in ReaderLib shim, the one in the ReaderLib will be deleted as the next step. // TODO: Change this interface when SGD is changed. -class CompositeDataReader : public IDataReader, protected Plugin, public ScriptableObjects::Object +class CompositeDataReader : public Reader, protected Plugin { public: - CompositeDataReader(const std::string& precision); + CompositeDataReader(const ConfigParameters& parameters, MemoryProviderPtr provider); - // Currently we do not support BS configuration. - virtual void Init(const ScriptableObjects::IConfigRecord& /*config*/) override - { - assert(false); - } + // Describes the streams this reader produces. + std::vector GetStreamDescriptions() override; - virtual void Init(const ConfigParameters& config) override; + // Starts a new epoch with the provided configuration + void StartEpoch(const EpochConfiguration& config) override; - virtual void Destroy() override - { - delete this; - } - - virtual void StartMinibatchLoop(size_t mbSize, size_t epoch, size_t requestedEpochSamples = requestDataSize) override; - virtual void StartDistributedMinibatchLoop(size_t requestedMBSize, size_t epoch, size_t subsetNum, size_t numSubsets, size_t requestedEpochSamples) override; - - virtual bool SupportsDistributedMBRead() const override - { - return true; - } - - virtual bool GetMinibatch(StreamMinibatchInputs& matrices) override; - virtual bool DataEnd() override; - void CopyMBLayoutTo(MBLayoutPtr) override; - virtual size_t GetNumParallelSequences() override; + // Reads a minibatch that contains data across all streams. + Minibatch ReadMinibatch() override; private: void CreateDeserializers(const ConfigParameters& readerConfig); IDataDeserializerPtr CreateDeserializer(const ConfigParameters& readerConfig, bool primary); - void StartEpoch(const EpochConfiguration& config); + enum class PackingMode { diff --git a/Source/Readers/CompositeDataReader/Exports.cpp b/Source/Readers/CompositeDataReader/Exports.cpp index 0c5a0bdd6..1ae6f56e9 100644 --- a/Source/Readers/CompositeDataReader/Exports.cpp +++ b/Source/Readers/CompositeDataReader/Exports.cpp @@ -11,17 +11,24 @@ #define DATAREADER_EXPORTS #include "DataReader.h" #include "CompositeDataReader.h" +#include "ReaderShim.h" +#include "HeapMemoryProvider.h" namespace Microsoft { namespace MSR { namespace CNTK { +auto factory = [](const ConfigParameters& parameters) -> ReaderPtr +{ + return std::make_shared(parameters, std::make_shared()); +}; + extern "C" DATAREADER_API void GetReaderF(IDataReader** preader) { - *preader = new CompositeDataReader("float"); + *preader = new ReaderShim(factory); } extern "C" DATAREADER_API void GetReaderD(IDataReader** preader) { - *preader = new CompositeDataReader("double"); + *preader = new ReaderShim(factory); } }}} diff --git a/Tests/EndToEndTests/Speech/ExperimentalHtkmlfReader/LSTM/FullUtterance/cntk.cntk b/Tests/EndToEndTests/Speech/ExperimentalHtkmlfReader/LSTM/FullUtterance/cntk.cntk index 0dae99e51..483484a66 100644 --- a/Tests/EndToEndTests/Speech/ExperimentalHtkmlfReader/LSTM/FullUtterance/cntk.cntk +++ b/Tests/EndToEndTests/Speech/ExperimentalHtkmlfReader/LSTM/FullUtterance/cntk.cntk @@ -67,11 +67,11 @@ speechTrain = [ ] # define network using BrainScript - BrainScriptNetworkBuilder=[ + BrainScriptNetworkBuilder = [ # import some namespaces # TODO: allow to say import BS.RNNs LSTMP or import BS.RNNs to import all (literally creates new dict members mirroring those) - RecurrentLSTMP = BS.RNNs.RecurrentLSTMP + RecurrentLSTMP2 = BS.RNNs.RecurrentLSTMP2 Parameters = BS.Parameters useSelfStabilization = true @@ -82,8 +82,8 @@ speechTrain = [ labelDim = 132 // hidden dimensions - cellDim = 1024 - hiddenDim = 256 + innerCellDim = 1024 + hiddenDim = 256 numLSTMLayers = 3 // number of hidden LSTM model layers // features @@ -97,21 +97,24 @@ speechTrain = [ // define the stack of hidden LSTM layers LSTMoutput[k:1..numLSTMLayers] = if k == 1 - then RecurrentLSTMP(baseFeatDim, hiddenDim, cellDim, featNorm, enableSelfStabilization=useSelfStabilization) - else RecurrentLSTMP(hiddenDim, hiddenDim, cellDim, LSTMoutput[k-1], enableSelfStabilization=useSelfStabilization) + then RecurrentLSTMP2 (hiddenDim, cellDim=innerCellDim, featNorm, inputDim=baseFeatDim, enableSelfStabilization=useSelfStabilization).h + else RecurrentLSTMP2 (hiddenDim, cellDim=innerCellDim, LSTMoutput[k-1], inputDim=hiddenDim, enableSelfStabilization=useSelfStabilization).h // and add a softmax layer on top - W(x) = Parameters.WeightParam(labelDim, hiddenDim) * Parameters.Stabilize (x, enabled=useSelfStabilization) - B = Parameters.BiasParam(labelDim) + W(x) = Parameters.WeightParam (labelDim, hiddenDim) * Parameters.Stabilize (x, enabled=useSelfStabilization) + B = Parameters.BiasParam (labelDim) z = W(LSTMoutput[numLSTMLayers]) + B; // top-level input to Softmax // training - cr = CrossEntropyWithSoftmax(labels, z, tag='criterion') // this is the objective - Err = ErrorPrediction(labels, z, tag='eval') // this also gets tracked + useExplicitCriterion = true + crNode = CrossEntropyWithSoftmax(labels, z) // this is the objective, as a node + crExplicit = -(ReducePlus (labels .* LogSoftmax (z))) // manually-defined per-sample objective + cr = Pass (if useExplicitCriterion then crExplicit else crNode, tag='criterion') + Err = ErrorPrediction(labels, z, tag='evaluation') // this also gets tracked // decoding logPrior = LogPrior(labels) - ScaledLogLikelihood = Minus(z, logPrior, tag='output') // sadly we can't say x - y since we want to assign a tag + ScaledLogLikelihood = Minus(z, logPrior, tag='output') // sadly we can't say x - y since we want to assign a tag ] ]