Apdating to the latest support of different MBLayouts

This commit is contained in:
Eldar Akchurin 2016-04-28 15:45:54 +02:00
Родитель 3caeed2dbd
Коммит 6fbce83eeb
4 изменённых файлов: 51 добавлений и 174 удалений

Просмотреть файл

@ -25,26 +25,10 @@
namespace Microsoft { namespace MSR { namespace CNTK {
CompositeDataReader::CompositeDataReader(const std::string& precision) : m_layout(make_shared<MBLayout>()),
m_precision(precision),
CompositeDataReader::CompositeDataReader(const ConfigParameters& config, MemoryProviderPtr provider) : m_layout(make_shared<MBLayout>()),
m_corpus(std::make_shared<CorpusDescriptor>()),
m_endOfEpoch(false)
m_provider(provider)
{
}
void CompositeDataReader::Init(const ConfigParameters& config)
{
m_provider = std::make_shared<HeapMemoryProvider>();
// if prefetch - launching asynchronously,
// otherwise deferring - synchronous execution during .get() call
bool prefetch = config(L"prefetch", true);
m_launchType = prefetch ? launch::async : launch::deferred;
// Layout can be asked before actual reading.
// TODO: should be gone when SGD changed.
m_layout->Init(0, 0);
// Identifying packing mode.
bool frameMode = config(L"frameMode", true);
bool truncated = config(L"truncated", false);
@ -60,6 +44,11 @@ void CompositeDataReader::Init(const ConfigParameters& config)
else if (truncated)
{
m_packingMode = PackingMode::truncated;
m_truncationLength = config(L"truncationLength", 0);
if (m_truncationLength == 0)
{
InvalidArgument("Truncation length cannot be 0.");
}
}
else
{
@ -105,124 +94,14 @@ void CompositeDataReader::Init(const ConfigParameters& config)
}
}
void CompositeDataReader::StartMinibatchLoop(size_t mbSize, size_t epoch, size_t requestedEpochSamples)
std::vector<StreamDescriptionPtr> CompositeDataReader::GetStreamDescriptions()
{
return StartDistributedMinibatchLoop(mbSize, epoch, 0, 1, requestedEpochSamples);
return m_streams;
}
void CompositeDataReader::StartDistributedMinibatchLoop(
size_t requestedMBSize,
size_t epoch,
size_t subsetNum,
size_t numSubsets,
size_t requestedEpochSamples /*= requestDataSize*/)
Minibatch CompositeDataReader::ReadMinibatch()
{
EpochConfiguration config;
config.m_workerRank = subsetNum;
config.m_numberOfWorkers = numSubsets;
config.m_minibatchSizeInSamples = requestedMBSize;
config.m_totalEpochSizeInSamples = requestedEpochSamples;
config.m_epochIndex = epoch;
// Make sure there are no outstanding reads.
if (m_prefetchTask.valid())
{
m_prefetchTask.wait();
}
m_endOfEpoch = false;
// Nothing is running, let's reconfigure the packer according to the new epoch.
StartEpoch(config);
// Ok, start reading in sync or async manner.
m_prefetchTask = std::async(m_launchType, [this]()
{
return m_packer->ReadMinibatch();
});
}
bool CompositeDataReader::GetMinibatch(StreamMinibatchInputs& matrices)
{
if (m_endOfEpoch)
{
return false;
}
// Check that all matrices have the same device id.
// If not we should inject the IMemoryProvider per stream.
int deviceId = matrices.begin()->second.matrix->GetDeviceId();
for (auto mx : matrices)
{
if (mx.second.matrix->GetDeviceId() != deviceId)
{
assert(false);
}
}
assert(m_prefetchTask.valid());
Minibatch minibatch = m_prefetchTask.get();
if (minibatch.m_endOfEpoch)
{
m_endOfEpoch = true;
if (minibatch.m_data.empty())
{
return false;
}
}
if (!minibatch.m_data.empty())
{
// TODO: Use alternating pinned buffer in the packer, do not copy anything, but pack into the pinned memory.
// Copy returned minibatch to the matrices.
for (const auto& mx : matrices)
{
assert(m_nameToStreamId.find(mx.first) != m_nameToStreamId.end());
size_t streamId = m_nameToStreamId[mx.first];
const auto& stream = minibatch.m_data[streamId];
m_layout->CopyFrom(stream->m_layout);
size_t columnNumber = m_layout->GetNumCols();
size_t rowNumber = m_streams[streamId]->m_sampleLayout->GetNumElements();
if (m_precision == "float")
{
auto* data = reinterpret_cast<const float*>(stream->m_data);
matrices.GetInputMatrix<float>(mx.first).SetValue(rowNumber, columnNumber, mx.second.matrix->GetDeviceId(), const_cast<float*>(data), matrixFlagNormal);
}
else
{
assert(m_precision == "double");
auto* data = reinterpret_cast<const double*>(stream->m_data);
matrices.GetInputMatrix<double>(mx.first).SetValue(rowNumber, columnNumber, mx.second.matrix->GetDeviceId(), const_cast<double*>(data), matrixFlagNormal);
}
}
}
m_prefetchTask = std::async(m_launchType, [this]()
{
return m_packer->ReadMinibatch();
});
return !minibatch.m_data.empty();
}
bool CompositeDataReader::DataEnd()
{
// Note: Return value never used.
return false;
}
void CompositeDataReader::CopyMBLayoutTo(MBLayoutPtr layout)
{
layout->CopyFrom(m_layout);
}
size_t CompositeDataReader::GetNumParallelSequences()
{
return m_layout->GetNumParallelSequences();
return m_packer->ReadMinibatch();
}
void CompositeDataReader::CreateDeserializers(const ConfigParameters& readerConfig)
@ -263,8 +142,10 @@ IDataDeserializerPtr CompositeDataReader::CreateDeserializer(const ConfigParamet
return IDataDeserializerPtr(d);
}
void CompositeDataReader::StartEpoch(const EpochConfiguration& config)
void CompositeDataReader::StartEpoch(const EpochConfiguration& cfg)
{
EpochConfiguration config = cfg;
if (config.m_totalEpochSizeInSamples <= 0)
{
RuntimeError("Unsupported minibatch size '%d'.", (int)config.m_totalEpochSizeInSamples);
@ -289,6 +170,7 @@ void CompositeDataReader::StartEpoch(const EpochConfiguration& config)
break;
case PackingMode::truncated:
{
config.m_truncationSize = m_truncationLength;
m_packer = std::make_shared<TruncatedBPTTPacker>(
m_provider,
m_randomizer,
@ -298,7 +180,8 @@ void CompositeDataReader::StartEpoch(const EpochConfiguration& config)
default:
LogicError("Unsupported type of packer '%d'.", (int)m_packingMode);
}
m_packer->StartEpoch(config);
}
}}}
}}}

Просмотреть файл

@ -9,6 +9,7 @@
#include <string>
#include <future>
#include "DataReader.h"
#include <Reader.h>
namespace Microsoft { namespace MSR { namespace CNTK {
@ -48,41 +49,24 @@ struct Minibatch;
// TODO: Add transformers as the next step.
// TODO: Same code as in ReaderLib shim, the one in the ReaderLib will be deleted as the next step.
// TODO: Change this interface when SGD is changed.
class CompositeDataReader : public IDataReader, protected Plugin, public ScriptableObjects::Object
class CompositeDataReader : public Reader, protected Plugin
{
public:
CompositeDataReader(const std::string& precision);
CompositeDataReader(const ConfigParameters& parameters, MemoryProviderPtr provider);
// Currently we do not support BS configuration.
virtual void Init(const ScriptableObjects::IConfigRecord& /*config*/) override
{
assert(false);
}
// Describes the streams this reader produces.
std::vector<StreamDescriptionPtr> GetStreamDescriptions() override;
virtual void Init(const ConfigParameters& config) override;
// Starts a new epoch with the provided configuration
void StartEpoch(const EpochConfiguration& config) override;
virtual void Destroy() override
{
delete this;
}
virtual void StartMinibatchLoop(size_t mbSize, size_t epoch, size_t requestedEpochSamples = requestDataSize) override;
virtual void StartDistributedMinibatchLoop(size_t requestedMBSize, size_t epoch, size_t subsetNum, size_t numSubsets, size_t requestedEpochSamples) override;
virtual bool SupportsDistributedMBRead() const override
{
return true;
}
virtual bool GetMinibatch(StreamMinibatchInputs& matrices) override;
virtual bool DataEnd() override;
void CopyMBLayoutTo(MBLayoutPtr) override;
virtual size_t GetNumParallelSequences() override;
// Reads a minibatch that contains data across all streams.
Minibatch ReadMinibatch() override;
private:
void CreateDeserializers(const ConfigParameters& readerConfig);
IDataDeserializerPtr CreateDeserializer(const ConfigParameters& readerConfig, bool primary);
void StartEpoch(const EpochConfiguration& config);
enum class PackingMode
{

Просмотреть файл

@ -11,17 +11,24 @@
#define DATAREADER_EXPORTS
#include "DataReader.h"
#include "CompositeDataReader.h"
#include "ReaderShim.h"
#include "HeapMemoryProvider.h"
namespace Microsoft { namespace MSR { namespace CNTK {
auto factory = [](const ConfigParameters& parameters) -> ReaderPtr
{
return std::make_shared<CompositeDataReader>(parameters, std::make_shared<HeapMemoryProvider>());
};
extern "C" DATAREADER_API void GetReaderF(IDataReader** preader)
{
*preader = new CompositeDataReader("float");
*preader = new ReaderShim<float>(factory);
}
extern "C" DATAREADER_API void GetReaderD(IDataReader** preader)
{
*preader = new CompositeDataReader("double");
*preader = new ReaderShim<double>(factory);
}
}}}

Просмотреть файл

@ -67,11 +67,11 @@ speechTrain = [
]
# define network using BrainScript
BrainScriptNetworkBuilder=[
BrainScriptNetworkBuilder = [
# import some namespaces
# TODO: allow to say import BS.RNNs LSTMP or import BS.RNNs to import all (literally creates new dict members mirroring those)
RecurrentLSTMP = BS.RNNs.RecurrentLSTMP
RecurrentLSTMP2 = BS.RNNs.RecurrentLSTMP2
Parameters = BS.Parameters
useSelfStabilization = true
@ -82,8 +82,8 @@ speechTrain = [
labelDim = 132
// hidden dimensions
cellDim = 1024
hiddenDim = 256
innerCellDim = 1024
hiddenDim = 256
numLSTMLayers = 3 // number of hidden LSTM model layers
// features
@ -97,21 +97,24 @@ speechTrain = [
// define the stack of hidden LSTM layers
LSTMoutput[k:1..numLSTMLayers] =
if k == 1
then RecurrentLSTMP(baseFeatDim, hiddenDim, cellDim, featNorm, enableSelfStabilization=useSelfStabilization)
else RecurrentLSTMP(hiddenDim, hiddenDim, cellDim, LSTMoutput[k-1], enableSelfStabilization=useSelfStabilization)
then RecurrentLSTMP2 (hiddenDim, cellDim=innerCellDim, featNorm, inputDim=baseFeatDim, enableSelfStabilization=useSelfStabilization).h
else RecurrentLSTMP2 (hiddenDim, cellDim=innerCellDim, LSTMoutput[k-1], inputDim=hiddenDim, enableSelfStabilization=useSelfStabilization).h
// and add a softmax layer on top
W(x) = Parameters.WeightParam(labelDim, hiddenDim) * Parameters.Stabilize (x, enabled=useSelfStabilization)
B = Parameters.BiasParam(labelDim)
W(x) = Parameters.WeightParam (labelDim, hiddenDim) * Parameters.Stabilize (x, enabled=useSelfStabilization)
B = Parameters.BiasParam (labelDim)
z = W(LSTMoutput[numLSTMLayers]) + B; // top-level input to Softmax
// training
cr = CrossEntropyWithSoftmax(labels, z, tag='criterion') // this is the objective
Err = ErrorPrediction(labels, z, tag='eval') // this also gets tracked
useExplicitCriterion = true
crNode = CrossEntropyWithSoftmax(labels, z) // this is the objective, as a node
crExplicit = -(ReducePlus (labels .* LogSoftmax (z))) // manually-defined per-sample objective
cr = Pass (if useExplicitCriterion then crExplicit else crNode, tag='criterion')
Err = ErrorPrediction(labels, z, tag='evaluation') // this also gets tracked
// decoding
logPrior = LogPrior(labels)
ScaledLogLikelihood = Minus(z, logPrior, tag='output') // sadly we can't say x - y since we want to assign a tag
ScaledLogLikelihood = Minus(z, logPrior, tag='output') // sadly we can't say x - y since we want to assign a tag
]
]