Refactoring: HdfsReader interface -> ReadSeekCloser
This commit is contained in:
Родитель
72dbf23f38
Коммит
de9eb65685
|
@ -33,7 +33,7 @@ func (this *FaultTolerantHdfsAccessor) EnsureConnected() error {
|
|||
}
|
||||
|
||||
// Opens HDFS file for reading
|
||||
func (this *FaultTolerantHdfsAccessor) OpenRead(path string) (HdfsReader, error) {
|
||||
func (this *FaultTolerantHdfsAccessor) OpenRead(path string) (ReadSeekCloser, error) {
|
||||
op := this.RetryPolicy.StartOperation()
|
||||
for {
|
||||
result, err := this.Impl.OpenRead(path)
|
||||
|
|
|
@ -64,8 +64,8 @@ func TestOpenReadWithRetries(t *testing.T) {
|
|||
mockCtrl := gomock.NewController(t)
|
||||
hdfsAccessor := NewMockHdfsAccessor(mockCtrl)
|
||||
ftHdfsAccessor := NewFaultTolerantHdfsAccessor(hdfsAccessor, atMost2Attempts())
|
||||
mockReader := NewMockHdfsReader(mockCtrl)
|
||||
var result HdfsReader
|
||||
mockReader := NewMockReadSeekCloser(mockCtrl)
|
||||
var result ReadSeekCloser
|
||||
var err error
|
||||
hdfsAccessor.EXPECT().OpenRead("/test/file").Return(nil, errors.New("Injected failure"))
|
||||
hdfsAccessor.EXPECT().OpenRead("/test/file").Return(mockReader, nil)
|
||||
|
|
|
@ -2,18 +2,18 @@
|
|||
// Licensed under the MIT license. See LICENSE file in the project root for details.
|
||||
package main
|
||||
|
||||
// Implements HdfsReader interface with automatic retries (acts as a proxy to HdfsReader)
|
||||
// Implements ReadSeekCloser interface with automatic retries (acts as a proxy to HdfsReader)
|
||||
type FaultTolerantHdfsReader struct {
|
||||
Path string
|
||||
Impl HdfsReader
|
||||
Impl ReadSeekCloser
|
||||
HdfsAccessor HdfsAccessor
|
||||
RetryPolicy *RetryPolicy
|
||||
Offset int64
|
||||
}
|
||||
|
||||
var _ HdfsReader = (*FaultTolerantHdfsReader)(nil) // ensure FaultTolerantHdfsReaderImpl implements HdfsReader
|
||||
var _ ReadSeekCloser = (*FaultTolerantHdfsReader)(nil) // ensure FaultTolerantHdfsReaderImpl implements ReadSeekCloser
|
||||
// Creates new instance of FaultTolerantHdfsReader
|
||||
func NewFaultTolerantHdfsReader(path string, impl HdfsReader, hdfsAccessor HdfsAccessor, retryPolicy *RetryPolicy) *FaultTolerantHdfsReader {
|
||||
func NewFaultTolerantHdfsReader(path string, impl ReadSeekCloser, hdfsAccessor HdfsAccessor, retryPolicy *RetryPolicy) *FaultTolerantHdfsReader {
|
||||
return &FaultTolerantHdfsReader{Path: path, Impl: impl, HdfsAccessor: hdfsAccessor, RetryPolicy: retryPolicy}
|
||||
}
|
||||
|
||||
|
|
|
@ -13,7 +13,7 @@ import (
|
|||
// Testing retry logic for Read()
|
||||
func TestSeekAndReadWithRetries(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
hdfsReader := NewMockHdfsReader(mockCtrl)
|
||||
hdfsReader := NewMockReadSeekCloser(mockCtrl)
|
||||
hdfsAccessor := NewMockHdfsAccessor(mockCtrl)
|
||||
ftHdfsReader := NewFaultTolerantHdfsReader("/path/to/file", hdfsReader, hdfsAccessor, atMost2Attempts())
|
||||
|
||||
|
@ -34,7 +34,7 @@ func TestSeekAndReadWithRetries(t *testing.T) {
|
|||
// As a result, ftHdfsReader should close the stream...
|
||||
hdfsReader.EXPECT().Close().Return(nil)
|
||||
// ...and invoke an OpenRead() to get new HdfsReader
|
||||
newHdfsReader := NewMockHdfsReader(mockCtrl)
|
||||
newHdfsReader := NewMockReadSeekCloser(mockCtrl)
|
||||
hdfsAccessor.EXPECT().OpenRead("/path/to/file").Return(newHdfsReader, nil)
|
||||
// It should seek at corret position (1060), and repeat the read
|
||||
newHdfsReader.EXPECT().Seek(int64(1060)).Return(nil)
|
||||
|
@ -47,7 +47,7 @@ func TestSeekAndReadWithRetries(t *testing.T) {
|
|||
// No retries on benigh errors (e.g. EOF)
|
||||
func TestNoRetryOnEOF(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
hdfsReader := NewMockHdfsReader(mockCtrl)
|
||||
hdfsReader := NewMockReadSeekCloser(mockCtrl)
|
||||
hdfsAccessor := NewMockHdfsAccessor(mockCtrl)
|
||||
ftHdfsReader := NewFaultTolerantHdfsReader("/path/to/file", hdfsReader, hdfsAccessor, atMost2Attempts())
|
||||
|
||||
|
|
|
@ -10,7 +10,7 @@ type FileFragment struct {
|
|||
}
|
||||
|
||||
// Reads into the file fragment buffer from the backend
|
||||
func (this *FileFragment) ReadFromBackend(hdfsReader HdfsReader, offset *int64, minBytesToRead int, maxBytesToRead int) error {
|
||||
func (this *FileFragment) ReadFromBackend(hdfsReader ReadSeekCloser, offset *int64, minBytesToRead int, maxBytesToRead int) error {
|
||||
if cap(this.Data) < maxBytesToRead {
|
||||
// not enough capacity - realloating
|
||||
this.Data = make([]byte, maxBytesToRead)
|
||||
|
|
|
@ -15,13 +15,13 @@ import (
|
|||
// handle unordered reads which aren't far away from each other, so backend stream can
|
||||
// be read sequentially without seek
|
||||
type FileHandleReader struct {
|
||||
HdfsReader HdfsReader // Backend reader
|
||||
Offset int64 // Current offset for backend reader
|
||||
Buffer1 *FileFragment // Most recent fragment from the backend reader
|
||||
Buffer2 *FileFragment // Least recent fragment read from the backend
|
||||
Holes int64 // tracks number of encountered "holes" TODO: find better name
|
||||
CacheHits int64 // tracks number of cache hits (read requests from buffer)
|
||||
Seeks int64 // tracks number of seeks performed on the backend stream
|
||||
HdfsReader ReadSeekCloser // Backend reader
|
||||
Offset int64 // Current offset for backend reader
|
||||
Buffer1 *FileFragment // Most recent fragment from the backend reader
|
||||
Buffer2 *FileFragment // Least recent fragment read from the backend
|
||||
Holes int64 // tracks number of encountered "holes" TODO: find better name
|
||||
CacheHits int64 // tracks number of cache hits (read requests from buffer)
|
||||
Seeks int64 // tracks number of seeks performed on the backend stream
|
||||
}
|
||||
|
||||
// Opens the reader (creates backend reader)
|
||||
|
|
|
@ -14,7 +14,7 @@ import (
|
|||
// Testing reading of an empty file
|
||||
func TestEmptyFile(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
hdfsReader := NewMockHdfsReader(mockCtrl)
|
||||
hdfsReader := NewMockReadSeekCloser(mockCtrl)
|
||||
handle := createTestHandle(t, mockCtrl, hdfsReader)
|
||||
hdfsReader.whenReadReturn([]byte{}, io.EOF)
|
||||
handle.readAndVerify(t, 0, 1024, []byte{})
|
||||
|
@ -25,7 +25,7 @@ func TestEmptyFile(t *testing.T) {
|
|||
// Testing reading of a small "HelloWorld!" file using few Read() operations
|
||||
func TestSmallFileSequentialRead(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
hdfsReader := NewMockHdfsReader(mockCtrl)
|
||||
hdfsReader := NewMockReadSeekCloser(mockCtrl)
|
||||
handle := createTestHandle(t, mockCtrl, hdfsReader)
|
||||
|
||||
hdfsReader.whenReadReturn([]byte("Hel"), nil)
|
||||
|
@ -46,7 +46,7 @@ func TestSmallFileSequentialRead(t *testing.T) {
|
|||
// this should not cause Seek() on the backend HDFS reader
|
||||
func TestReoderedReadsDontCauseSeek(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
hdfsReader := NewMockHdfsReader(mockCtrl)
|
||||
hdfsReader := NewMockReadSeekCloser(mockCtrl)
|
||||
handle := createTestHandle(t, mockCtrl, hdfsReader)
|
||||
|
||||
hdfsReader.whenReadReturn([]byte("He"), nil)
|
||||
|
@ -64,7 +64,7 @@ func TestReoderedReadsDontCauseSeek(t *testing.T) {
|
|||
// Seak()->Read()->Read()->Seek()->Read()
|
||||
func TestSeekAndRead(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
hdfsReader := NewMockHdfsReader(mockCtrl)
|
||||
hdfsReader := NewMockReadSeekCloser(mockCtrl)
|
||||
handle := createTestHandle(t, mockCtrl, hdfsReader)
|
||||
|
||||
hdfsReader.expectSeek(1000000)
|
||||
|
@ -141,7 +141,7 @@ func RandomAccess(t *testing.T, fileSize int64, maxRead int) {
|
|||
///////////////// Test Helpers /////////////////////
|
||||
|
||||
// common setup for FileHandleReader testing
|
||||
func createTestHandle(t *testing.T, mockCtrl *gomock.Controller, hdfsReader HdfsReader) *FileHandle {
|
||||
func createTestHandle(t *testing.T, mockCtrl *gomock.Controller, hdfsReader ReadSeekCloser) *FileHandle {
|
||||
hdfsAccessor := NewMockHdfsAccessor(mockCtrl)
|
||||
hdfsAccessor.EXPECT().Stat("/test.dat").Return(Attrs{Name: "test.dat"}, nil)
|
||||
hdfsAccessor.EXPECT().OpenRead("/test.dat").Return(hdfsReader, nil)
|
||||
|
@ -153,7 +153,7 @@ func createTestHandle(t *testing.T, mockCtrl *gomock.Controller, hdfsReader Hdfs
|
|||
}
|
||||
|
||||
// sets hdfsReader mock to respond on Read() request in a certain way
|
||||
func (hdfsReader *MockHdfsReader) whenReadReturn(data []byte, err error) {
|
||||
func (hdfsReader *MockReadSeekCloser) whenReadReturn(data []byte, err error) {
|
||||
hdfsReader.EXPECT().Read(gomock.Any()).Do(
|
||||
func(buf []byte) {
|
||||
copy(buf, data)
|
||||
|
@ -161,7 +161,7 @@ func (hdfsReader *MockHdfsReader) whenReadReturn(data []byte, err error) {
|
|||
}
|
||||
|
||||
// sets hdfsReader mock to respond on Read() request in a certain way
|
||||
func (hdfsReader *MockHdfsReader) expectSeek(pos int64) {
|
||||
func (hdfsReader *MockReadSeekCloser) expectSeek(pos int64) {
|
||||
hdfsReader.EXPECT().Seek(pos).Return(nil)
|
||||
}
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ import (
|
|||
// Interface for accessing HDFS
|
||||
// Concurrency: thread safe: handles unlimited number of concurrent requests
|
||||
type HdfsAccessor interface {
|
||||
OpenRead(path string) (HdfsReader, error) // Opens HDFS file for reading
|
||||
OpenRead(path string) (ReadSeekCloser, error) // Opens HDFS file for reading
|
||||
OpenReadForRandomAccess(path string) (RandomAccessHdfsReader, uint64, error) // opens file for efficient concurrent random access
|
||||
OpenWrite(path string) (HdfsWriter, error) // Opens HDFS file for writing
|
||||
ReadDir(path string) ([]Attrs, error) // Enumerates HDFS directory
|
||||
|
@ -114,7 +114,7 @@ func (this *hdfsAccessorImpl) connectToNameNodeImpl(nnAddr string) (*hdfs.Client
|
|||
}
|
||||
|
||||
// Opens HDFS file for reading
|
||||
func (this *hdfsAccessorImpl) OpenRead(path string) (HdfsReader, error) {
|
||||
func (this *hdfsAccessorImpl) OpenRead(path string) (ReadSeekCloser, error) {
|
||||
client, err1 := this.ConnectToNameNode()
|
||||
if err1 != nil {
|
||||
return nil, err1
|
||||
|
|
|
@ -9,21 +9,14 @@ import (
|
|||
|
||||
// Allows to open an HDFS file as a seekable read-only stream
|
||||
// Concurrency: not thread safe: at most on request at a time
|
||||
type HdfsReader interface {
|
||||
Seek(pos int64) error // Seeks to a given position
|
||||
Position() (int64, error) // Returns current position
|
||||
Read(buffer []byte) (int, error) // Read a chunk of data
|
||||
Close() error // Closes the stream
|
||||
}
|
||||
|
||||
type hdfsReaderImpl struct {
|
||||
BackendReader *hdfs.FileReader
|
||||
}
|
||||
|
||||
var _ HdfsReader = (*hdfsReaderImpl)(nil) // ensure hdfsReaderImpl implements HdfsReader
|
||||
var _ ReadSeekCloser = (*hdfsReaderImpl)(nil) // ensure hdfsReaderImpl implements ReadSeekCloser
|
||||
|
||||
// Creates new instance of HdfsReader
|
||||
func NewHdfsReader(backendReader *hdfs.FileReader) HdfsReader {
|
||||
func NewHdfsReader(backendReader *hdfs.FileReader) ReadSeekCloser {
|
||||
return &hdfsReaderImpl{BackendReader: backendReader}
|
||||
}
|
||||
|
||||
|
|
2
Makefile
2
Makefile
|
@ -57,6 +57,6 @@ test: hdfs-mount \
|
|||
$(GOPATH)/src/github.com/golang/mock/gomock \
|
||||
$(MOCKGEN_DIR)/mockgen \
|
||||
mock_HdfsAccessor_test.go \
|
||||
mock_HdfsReader_test.go \
|
||||
mock_ReadSeekCloser_test.go \
|
||||
mock_HdfsWriter_test.go
|
||||
go test
|
||||
|
|
|
@ -8,30 +8,30 @@ import (
|
|||
)
|
||||
|
||||
// RandomAccessHdfsReader Implments io.ReaderAt, io.Closer providing efficient concurrent
|
||||
// random access to the file on HDFS. Concurrency is achieved by pooling HdfsReader objects.
|
||||
// random access to the file on HDFS. Concurrency is achieved by pooling ReadSeekCloser objects.
|
||||
// In order to optimize sequential read scenario of a fragment of the file, pool datastructure
|
||||
// is organized as a map keyed by the seek position, so sequential read of adjacent file chunks
|
||||
// with high probability goes to the same HdfsReader
|
||||
// with high probability goes to the same ReadSeekCloser
|
||||
type RandomAccessHdfsReader interface {
|
||||
io.ReaderAt
|
||||
io.Closer
|
||||
}
|
||||
|
||||
type randomAccessHdfsReaderImpl struct {
|
||||
HdfsAccessor HdfsAccessor // HDFS accessor used to create HdfsReader objects
|
||||
HdfsAccessor HdfsAccessor // HDFS accessor used to create ReadSeekCloser objects
|
||||
Path string // Path to the file
|
||||
Pool map[int64]HdfsReader // Pool of HdfsReader objects keyed by the seek position
|
||||
Pool map[int64]ReadSeekCloser // Pool of ReadSeekCloser objects keyed by the seek position
|
||||
PoolLock sync.Mutex // Exclusive lock for the Pool
|
||||
MaxReaders int // Maximum number of readers in the pool
|
||||
}
|
||||
|
||||
var _ RandomAccessHdfsReader = (*randomAccessHdfsReaderImpl)(nil) // ensure randomAccessHdfsReader implements RandomAccessHdfsReader
|
||||
var _ RandomAccessHdfsReader = (*randomAccessHdfsReaderImpl)(nil) // ensure randomAccessReadSeekCloser implements RandomAccessHdfsReader
|
||||
|
||||
func NewRandomAccessHdfsReader(hdfsAccessor HdfsAccessor, path string) RandomAccessHdfsReader {
|
||||
this := &randomAccessHdfsReaderImpl{
|
||||
HdfsAccessor: hdfsAccessor,
|
||||
Path: path,
|
||||
Pool: map[int64]HdfsReader{},
|
||||
Pool: map[int64]ReadSeekCloser{},
|
||||
MaxReaders: 100}
|
||||
return this
|
||||
}
|
||||
|
@ -77,7 +77,7 @@ func (this *randomAccessHdfsReaderImpl) Close() error {
|
|||
}
|
||||
|
||||
// Retrieves an optimal reader from pool or creates new one
|
||||
func (this *randomAccessHdfsReaderImpl) getReaderFromPoolOrCreateNew(offset int64) (HdfsReader, error) {
|
||||
func (this *randomAccessHdfsReaderImpl) getReaderFromPoolOrCreateNew(offset int64) (ReadSeekCloser, error) {
|
||||
reader, err := this.getReaderFromPool(offset)
|
||||
if err != nil {
|
||||
return reader, err
|
||||
|
@ -91,7 +91,7 @@ func (this *randomAccessHdfsReaderImpl) getReaderFromPoolOrCreateNew(offset int6
|
|||
}
|
||||
|
||||
// Retrievs an optimal reader from pool or nil if pool is empty
|
||||
func (this *randomAccessHdfsReaderImpl) getReaderFromPool(offset int64) (HdfsReader, error) {
|
||||
func (this *randomAccessHdfsReaderImpl) getReaderFromPool(offset int64) (ReadSeekCloser, error) {
|
||||
this.PoolLock.Lock()
|
||||
defer this.PoolLock.Unlock()
|
||||
if this.Pool == nil {
|
||||
|
@ -121,7 +121,7 @@ func (this *randomAccessHdfsReaderImpl) getReaderFromPool(offset int64) (HdfsRea
|
|||
}
|
||||
|
||||
// Returns idle reader back to the pool
|
||||
func (this *randomAccessHdfsReaderImpl) returnReaderToPool(reader HdfsReader) {
|
||||
func (this *randomAccessHdfsReaderImpl) returnReaderToPool(reader ReadSeekCloser) {
|
||||
this.PoolLock.Lock()
|
||||
defer this.PoolLock.Unlock()
|
||||
// If pool was destroyed or is full then closing current reader w/o returning
|
||||
|
|
|
@ -75,7 +75,7 @@ func (this *MockRandomAccessHdfsAccessor) EnsureConnected() error {
|
|||
}
|
||||
|
||||
// Opens HDFS file for reading
|
||||
func (this *MockRandomAccessHdfsAccessor) OpenRead(path string) (HdfsReader, error) {
|
||||
func (this *MockRandomAccessHdfsAccessor) OpenRead(path string) (ReadSeekCloser, error) {
|
||||
return &MockPseudoRandomHdfsReader{FileSize: int64(5 * 1024 * 1024 * 1024), ReaderStats: &this.ReaderStats}, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,14 @@
|
|||
// Copyright (c) Microsoft. All rights reserved.
|
||||
// Licensed under the MIT license. See LICENSE file in the project root for details.
|
||||
package main
|
||||
|
||||
import ()
|
||||
|
||||
// Implements simple Read()/Seek()/Close() interface to read from a file or stream
|
||||
// Concurrency: not thread safe: at most on request at a time
|
||||
type ReadSeekCloser interface {
|
||||
Seek(pos int64) error // Seeks to a given position
|
||||
Position() (int64, error) // Returns current position
|
||||
Read(buffer []byte) (int, error) // Read a chunk of data
|
||||
Close() error // Closes the stream
|
||||
}
|
Загрузка…
Ссылка в новой задаче