Fix for LibSVMBinaryReader to add prefetching, microbatches, and Linux support.

This commit is contained in:
thhoens 2015-12-16 09:53:10 -08:00 коммит произвёл Ryan
Родитель 7682c03a8f
Коммит ada359aecd
5 изменённых файлов: 1117 добавлений и 869 удалений

Просмотреть файл

@ -358,6 +358,25 @@ $(UCIFASTREADER): $(UCIFASTREADER_OBJ) | $(CNTKMATH_LIB)
@echo $(SEPARATOR)
$(CXX) $(LDFLAGS) -shared $(patsubst %,-L%, $(LIBDIR) $(LIBPATH)) $(patsubst %,$(RPATH)%, $(ORIGINDIR) $(LIBPATH)) -o $@ $^ -l$(CNTKMATH)
########################################
# LibSVMBinaryReader plugin
########################################
LIBSVMBINARYREADER_SRC =\
$(SOURCEDIR)/Readers/LibSVMBinaryReader/Exports.cpp \
$(SOURCEDIR)/Readers/LibSVMBinaryReader/LibSVMBinaryReader.cpp \
LIBSVMBINARYREADER_OBJ := $(patsubst %.cpp, $(OBJDIR)/%.o, $(LIBSVMBINARYREADER_SRC))
LIBSVMBINARYREADER:=$(LIBDIR)/LibSVMBinaryReader.so
ALL += $(LIBSVMBINARYREADER)
SRC+=$(LIBSVMBINARYREADER_SRC)
$(LIBSVMBINARYREADER): $(LIBSVMBINARYREADER_OBJ) | $(CNTKMATH_LIB)
@echo $(SEPARATOR)
$(CXX) $(LDFLAGS) -shared $(patsubst %,-L%, $(LIBDIR) $(LIBPATH)) $(patsubst %,$(RPATH)%, $(ORIGINDIR) $(LIBPATH)) -o $@ $^ -l$(CNTKMATH)
########################################
# Kaldi plugins
########################################

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -5,155 +5,265 @@
//
// LibSVMBinaryReader.h - Include file for the MTK and MLF format of features and samples
#pragma once
#include "stdafx.h"
#include "DataReader.h"
#include "DataWriter.h"
#include "Config.h"
#include "RandomOrdering.h"
#include <string>
#include <map>
#include <vector>
#include <string>
#include <random>
#include <future>
#include <mutex>
#include <condition_variable>
#include <deque>
#include <thread>
#if DEBUG
#include <cvmarkersobj.h>
using namespace Concurrency::diagnostic;
#endif
namespace Microsoft { namespace MSR { namespace CNTK {
namespace Microsoft {
namespace MSR {
namespace CNTK {
enum LabelKind
{
labelNone = 0, // no labels to worry about
labelCategory = 1, // category labels, creates mapping tables
labelRegression = 2, // regression labels
labelOther = 3, // some other type of label
};
template <typename T>
class BlockingQueue
{
private:
std::mutex d_mutex;
std::condition_variable d_condition;
std::deque<T> d_queue;
public:
void push(T const& value) {
{
std::unique_lock<std::mutex> lock(this->d_mutex);
d_queue.push_front(value);
}
this->d_condition.notify_one();
}
T pop() {
std::unique_lock<std::mutex> lock(this->d_mutex);
this->d_condition.wait(lock, [=]{ return !this->d_queue.empty(); });
T rc(std::move(this->d_queue.back()));
this->d_queue.pop_back();
return rc;
}
};
template<class ElemType>
class LibSVM_BinaryInput {
private:
HANDLE m_hndl;
HANDLE m_filemap;
HANDLE m_header;
HANDLE m_offsets;
HANDLE m_data;
template<class ElemType>
class BinaryMatrix {
public:
BinaryMatrix(wstring name, int deviceID, size_t numRows, size_t numCols) : m_matrixName(name), m_deviceID(deviceID), m_maxNumRows(numRows), m_numRows(0), m_maxNumCols(numCols), m_values(nullptr) {};
//BinaryMatrix(wstring name, size_t numRows, size_t numCols) : m_matrixName(name), m_maxNumRows(numRows), m_numRows(0), m_maxNumCols(numCols), m_values(nullptr) {};
virtual void Clear() = 0;
virtual void Dispose() = 0;
virtual void Fill(Matrix<ElemType>* ) = 0;
virtual void AddValues(void* , size_t ) = 0;
virtual void AddColIndices(void* , size_t ) = 0;
virtual void AddRowIndices(void* , size_t ) = 0;
virtual void UpdateNNz(size_t ) = 0;
virtual void UpdateCurMB(size_t mb) { m_numRows += mb; }
virtual void ResizeArrays(size_t ) = 0;
virtual void SetMaxRows(size_t maxRows) = 0;
protected:
wstring m_matrixName;
int m_deviceID;
ElemType* m_values;
size_t m_maxNumRows;
size_t m_maxNumCols;
//void* header_orig; // Don't need this since the header is at the start of the file
void* offsets_orig;
void* data_orig;
size_t m_numRows;
};
template<class ElemType>
class DenseBinaryMatrix : public BinaryMatrix<ElemType> {
public:
DenseBinaryMatrix(wstring name, int deviceID, size_t numRows, size_t numCols);
//DenseBinaryMatrix(wstring name, size_t numRows, size_t numCols);
virtual void Clear();
virtual void Dispose();
virtual void Fill(Matrix<ElemType>* matrix) override;
virtual void AddValues(void* values, size_t numRows) override;
virtual void AddColIndices(void* /*colIndices*/, size_t /*numCols*/) override { NOT_IMPLEMENTED }
virtual void AddRowIndices(void* /*rowIndices*/, size_t /*nnz*/) override { NOT_IMPLEMENTED }
virtual void UpdateNNz(size_t /*nnz*/) override { NOT_IMPLEMENTED }
virtual void ResizeArrays(size_t ) { NOT_IMPLEMENTED }
virtual void SetMaxRows(size_t maxRows) override;
protected:
};
void* header_buffer;
void* offsets_buffer;
void* data_buffer;
template<class ElemType>
class SparseBinaryMatrix : public BinaryMatrix<ElemType> {
public:
SparseBinaryMatrix(wstring name, int deviceID, size_t numRows, size_t numCols);
//SparseBinaryMatrix(wstring name, size_t numRows, size_t numCols);
virtual void Clear();
virtual void Dispose();
virtual void Fill( Matrix<ElemType>* matrix ) override;
virtual void AddValues(void* values, size_t nnz) override;
virtual void AddColIndices(void* colIndices, size_t numCols) override;
virtual void AddRowIndices(void* rowIndices, size_t nnz) override;
virtual void UpdateNNz(size_t nnz) override { m_nnz += nnz; }
virtual void ResizeArrays(size_t newMaxNNz) override;
virtual void SetMaxRows(size_t maxRows) override;
protected:
int32_t* m_rowIndices;
int32_t* m_colIndices;
size_t m_nnz;
size_t m_maxNNz;
};
size_t m_dim;
size_t mbSize;
size_t MAX_BUFFER = 400;
size_t m_labelDim;
ElemType* values; // = (ElemType*)malloc(sizeof(ElemType)* 230 * 1024);
int64_t* offsets; // = (int*)malloc(sizeof(int)* 230 * 1024);
int32_t* colIndices; // = (int*)malloc(sizeof(int) * (batchsize + 1));
int32_t* rowIndices; // = (int*)malloc(sizeof(int) * MAX_BUFFER * batchsize);
int32_t* classIndex; // = (int*)malloc(sizeof(int) * batchsize);
ElemType* classWeight; // = (ElemType*)malloc(sizeof(ElemType) * batchsize);
ElemType* m_labelsBuffer;
public:
int64_t numRows;
int64_t numBatches;
int32_t numCols;
int64_t totalNNz;
LibSVM_BinaryInput();
~LibSVM_BinaryInput();
void Init(std::wstring fileName, size_t dim);
bool SetupEpoch( size_t minibatchSize);
bool Next_Batch(Matrix<ElemType>& features, Matrix<ElemType>& labels, size_t actualmbsize, int batchIndex);
void Dispose();
};
template<class ElemType>
class LibSVMBinaryReader : public IDataReader<ElemType>
{
//public:
// typedef std::string LabelType;
// typedef unsigned LabelIdType;
private:
int* read_order; // array to shuffle to reorder the dataset
std::wstring m_featuresName;
size_t m_featuresDim;
LibSVM_BinaryInput<ElemType> featuresInput;
int64_t m_processedMinibatches;
size_t m_mbSize; // size of minibatch requested
LabelIdType m_labelIdMax; // maximum label ID we have encountered so far
LabelIdType m_labelDim; // maximum label ID we will ever see (used for array dimensions)
size_t m_mbStartSample; // starting sample # of the next minibatch
size_t m_epochSize; // size of an epoch
size_t m_epoch; // which epoch are we on
size_t m_epochStartSample; // the starting sample for the epoch
size_t m_totalSamples; // number of samples in the dataset
size_t m_randomizeRange; // randomization range
size_t m_featureCount; // feature count
size_t m_readNextSample; // next sample to read
bool m_labelFirst; // the label is the first element in a line
bool m_partialMinibatch; // a partial minibatch is allowed
LabelKind m_labelType; // labels are categories, create mapping table
RandomOrdering m_randomordering; // randomizing class
MBLayoutPtr m_pMBLayout;
std::wstring m_labelsName;
std::wstring m_labelsCategoryName;
std::wstring m_labelsMapName;
ElemType* m_qfeaturesBuffer;
ElemType* m_dfeaturesBuffer;
ElemType* m_labelsBuffer;
LabelIdType* m_labelsIdBuffer;
std::wstring m_labelFileToWrite; // set to the path if we need to write out the label file
bool m_endReached;
int m_traceLevel;
// feature and label data are parallel arrays
std::vector<ElemType> m_featureData;
std::vector<LabelIdType> m_labelIdData;
std::vector<LabelType> m_labelData;
// map is from ElemType to LabelType
// For LibSVMBinary, we really only need an int for label data, but we have to transmit in Matrix, so use ElemType instead
std::map<LabelIdType, LabelType> m_mapIdToLabel;
std::map<LabelType, LabelIdType> m_mapLabelToId;
// caching support
DataReader<ElemType>* m_cachingReader;
DataWriter<ElemType>* m_cachingWriter;
ConfigParameters m_readerConfig;
size_t RandomizeSweep(size_t epochSample);
//bool Randomize() {return m_randomizeRange != randomizeNone;}
bool Randomize() { return false; }
void SetupEpoch();
void StoreLabel(ElemType& labelStore, const LabelType& labelValue);
size_t RecordsToRead(size_t mbStartSample, bool tail=false);
void ReleaseMemory();
void WriteLabelFile();
template<class ElemType>
class SparseBinaryInput {
public:
SparseBinaryInput(std::wstring fileName);
~SparseBinaryInput();
void Init(std::map<std::wstring,std::wstring> rename);
void StartDistributedMinibatchLoop(size_t mbSize, size_t subsetNum, size_t numSubsets);
void ReadMinibatches(size_t* read_order, size_t numToRead);
size_t ReadMinibatch(void* data_buffer, std::map<std::wstring, shared_ptr<BinaryMatrix<ElemType>>>& matrices);
//void GetMinibatch(std::map<std::wstring, Matrix<ElemType>*>& matrices);
size_t FillMatrices(std::map<std::wstring, shared_ptr<BinaryMatrix<ElemType>>>& matrices);
size_t GetMBSize() { return m_mbSize; }
size_t GetNumMB() { return m_numBatches / ( m_mbSize / m_microBatchSize ); }
void Shuffle();
shared_ptr<BinaryMatrix<ElemType>> CreateMatrix(std::wstring matName, int deviceId);
//shared_ptr<BinaryMatrix<ElemType>> CreateMatrix(std::wstring matName);
virtual bool DataEnd(EndDataType endDataType);
virtual bool ReadRecord(size_t readSample);
public:
template<class ConfigRecordType> void InitFromConfig(const ConfigRecordType &);
virtual void Init(const ConfigParameters & config) override { InitFromConfig(config); }
virtual void Init(const ScriptableObjects::IConfigRecord & config) override { InitFromConfig(config); }
virtual void Destroy();
LibSVMBinaryReader() : m_pMBLayout(make_shared<MBLayout>()) { m_qfeaturesBuffer = NULL; m_dfeaturesBuffer = NULL; m_labelsBuffer = NULL; }
virtual ~LibSVMBinaryReader();
virtual void StartMinibatchLoop(size_t mbSize, size_t epoch, size_t requestedEpochSamples=requestDataSize);
virtual bool GetMinibatch(std::map<std::wstring, Matrix<ElemType>*>& matrices);
private:
void ReadOffsets(size_t startMB, size_t numMBs);
void FillReadOrder(size_t windowSize);
void* GetTempDataPointer(size_t numVals);
bool Randomize();
size_t GetNumParallelSequences() { return 1; }
void SetNumParallelSequences(const size_t) { };
void CopyMBLayoutTo(MBLayoutPtr pMBLayout) { pMBLayout->CopyFrom(m_pMBLayout); NOT_IMPLEMENTED; }
virtual const std::map<LabelIdType, LabelType>& GetLabelMapping(const std::wstring& sectionName);
virtual void SetLabelMapping(const std::wstring& sectionName, const std::map<LabelIdType, typename LabelType>& labelMapping);
virtual bool GetData(const std::wstring& sectionName, size_t numRecords, void* data, size_t& dataBufferSize, size_t recordStart=0);
ifstream m_inFile;
std::wstring m_fileName;
size_t m_fileSize;
virtual bool DataEnd(EndDataType endDataType);
void SetRandomSeed(int) { NOT_IMPLEMENTED; }
};
}}}
size_t m_offsetsStart;
int64_t* m_offsets;
size_t m_dataStart;
size_t m_nextMB; // starting sample # of the next minibatch
size_t m_epochSize; // size of an epoch
size_t m_numRows; // size of minibatch requested
size_t m_numBatches; // size of minibatch requested
int32_t m_microBatchSize;
size_t m_mbSize;
size_t m_startMB;
size_t m_endMB;
size_t m_curLower;
size_t m_subsetNum;
size_t m_numSubsets;
size_t m_windowSize;
size_t m_curWindowSize;
bool m_randomize;
size_t* m_readOrder; // array to shuffle to reorder the dataset
size_t m_readOrderLength;
size_t m_maxMBSize;
std::vector<std::wstring> m_features;
std::vector<std::wstring> m_labels;
std::map<std::wstring, int32_t> m_mappedNumCols;
int32_t m_tempValuesSize;
void* m_tempValues;
RandomOrdering m_randomordering; // randomizing class
std::mt19937_64 m_randomEngine;
#ifdef _WIN32
DWORD sysGran;
#else
int32_t sysGran;
#endif
BlockingQueue<void*> m_dataToProduce;
BlockingQueue<void*> m_dataToConsume;
};
template<class ElemType>
class LibSVMBinaryReader : public IDataReader <ElemType>
{
public:
using LabelType = typename IDataReader<ElemType>::LabelType;
using LabelIdType = typename IDataReader<ElemType>::LabelIdType;
virtual void Init(const ConfigParameters & config) override { InitFromConfig(config); }
virtual void Init(const ScriptableObjects::IConfigRecord & config) override { InitFromConfig(config); }
template<class ConfigRecordType> void InitFromConfig(const ConfigRecordType &);
virtual void Destroy();
LibSVMBinaryReader() : DSSMLabels(nullptr), DSSMCols(0) { m_pMBLayout = make_shared<MBLayout>(); };
virtual ~LibSVMBinaryReader();
virtual void StartMinibatchLoop(size_t mbSize, size_t epoch, size_t requestedEpochSamples = requestDataSize);
virtual void StartDistributedMinibatchLoop(size_t mbSize, size_t epoch, size_t subsetNum, size_t numSubsets, size_t requestedEpochSamples) override;
virtual bool GetMinibatch(std::map<std::wstring, Matrix<ElemType>*>& matrices);
virtual bool SupportsDistributedMBRead() const override { return true; }
template<class ConfigRecordType> void RenamedMatrices(const ConfigRecordType& readerConfig, std::map<std::wstring, std::wstring>& rename);
virtual void SetLabelMapping(const std::wstring& /*sectionName*/, const std::map<LabelIdType, LabelType>& /*labelMapping*/) { NOT_IMPLEMENTED };
virtual bool GetData(const std::wstring& /*sectionName*/, size_t /*numRecords*/, void* /*data*/, size_t& /*dataBufferSize*/, size_t /*recordStart = 0*/) { NOT_IMPLEMENTED };
virtual bool DataEnd(EndDataType endDataType);
size_t GetNumParallelSequences() { return m_pMBLayout->GetNumParallelSequences(); }
void CopyMBLayoutTo(MBLayoutPtr pMBLayout) { pMBLayout->CopyFrom(m_pMBLayout); };
//virtual bool DataEnd(EndDataType endDataType);
size_t NumberSlicesInEachRecurrentIter() { return 1; }
void SetNbrSlicesEachRecurrentIter(const size_t) { };
void SetSentenceEndInBatch(std::vector<size_t> &/*sentenceEnd*/){};
private:
#if DEBUG
marker_series* reader_series;
size_t cur_read;
#endif
clock_t timer;
void DoDSSMMatrix(Matrix<ElemType>& mat, size_t actualMBSize);
void CheckDataMatrices(std::map<std::wstring, Matrix<ElemType>*>& matrices);
MBLayoutPtr m_pMBLayout;
ConfigParameters m_readerConfig;
std::shared_ptr<SparseBinaryInput<ElemType>> m_dataInput;
std::map<std::wstring, shared_ptr<BinaryMatrix<ElemType>>> m_dataMatrices;
unsigned long m_randomize; // randomization range
ElemType* DSSMLabels;
size_t DSSMCols;
size_t m_mbSize; // size of minibatch requested
size_t m_requestedEpochSize; // size of an epoch
size_t m_epoch; // which epoch are we on
bool m_partialMinibatch; // a partial minibatch is allowed
bool m_prefetchEnabled;
std::future<size_t> m_pendingAsyncGetMinibatch;
};
}
}
}

Просмотреть файл

@ -5,12 +5,12 @@
#pragma once
#include "Platform.h"
#include "targetver.h"
#define WIN32_LEAN_AND_MEAN // Exclude rarely-used stuff from Windows headers
// Windows Header Files:
#include <windows.h>
#ifdef __WINDOWS__
#include "windows.h"
#endif
#include <stdio.h>
#include <math.h>
// TODO: reference additional headers your program requires here

Просмотреть файл

@ -5,4 +5,6 @@
// If you wish to build your application for a previous Windows platform, include WinSDKVer.h and
// set the _WIN32_WINNT macro to the platform you wish to support before including SDKDDKVer.h.
#ifdef __WINDOWS__
#include <SDKDDKVer.h>
#endif