This commit is contained in:
Dafeng Wang 2017-10-30 10:27:19 -07:00
Родитель 363c52589a
Коммит 2f387a1442
5 изменённых файлов: 65 добавлений и 17 удалений

Просмотреть файл

@ -43,6 +43,9 @@ func (this *FaultTolerantHdfsAccessor) OpenRead(path string) (ReadSeekCloser, er
}
if IsSuccessOrBenignError(err) || !op.ShouldRetry("[%s] OpenRead: %s", path, err) {
return nil, err
}else{
// Clean up the bad connection, to let underline connection to get automatic refresh
this.Impl.Close()
}
}
}
@ -60,6 +63,9 @@ func (this *FaultTolerantHdfsAccessor) ReadDir(path string) ([]Attrs, error) {
result, err := this.Impl.ReadDir(path)
if IsSuccessOrBenignError(err) || !op.ShouldRetry("[%s] ReadDir: %s", path, err) {
return result, err
}else{
// Clean up the bad connection, to let underline connection to get automatic refresh
this.Impl.Close()
}
}
}
@ -71,6 +77,9 @@ func (this *FaultTolerantHdfsAccessor) Stat(path string) (Attrs, error) {
result, err := this.Impl.Stat(path)
if IsSuccessOrBenignError(err) || !op.ShouldRetry("[%s] Stat: %s", path, err) {
return result, err
}else{
// Clean up the bad connection, to let underline connection to get automatic refresh
this.Impl.Close()
}
}
}
@ -82,6 +91,9 @@ func (this *FaultTolerantHdfsAccessor) StatFs() (FsInfo, error) {
result, err := this.Impl.StatFs()
if IsSuccessOrBenignError(err) || !op.ShouldRetry("StatFs: %s", err) {
return result, err
}else{
// Clean up the bad connection, to let underline connection to get automatic refresh
this.Impl.Close()
}
}
}
@ -93,6 +105,9 @@ func (this *FaultTolerantHdfsAccessor) Mkdir(path string, mode os.FileMode) erro
err := this.Impl.Mkdir(path, mode)
if IsSuccessOrBenignError(err) || !op.ShouldRetry("[%s] Mkdir %s: %s", path, mode, err) {
return err
}else{
// Clean up the bad connection, to let underline connection to get automatic refresh
this.Impl.Close()
}
}
}
@ -104,6 +119,9 @@ func (this *FaultTolerantHdfsAccessor) Remove(path string) error {
err := this.Impl.Remove(path)
if IsSuccessOrBenignError(err) || !op.ShouldRetry("[%s] Remove: %s", path, err) {
return err
}else{
// Clean up the bad connection, to let underline connection to get automatic refresh
this.Impl.Close()
}
}
}
@ -115,6 +133,9 @@ func (this *FaultTolerantHdfsAccessor) Rename(oldPath string, newPath string) er
err := this.Impl.Rename(oldPath, newPath)
if IsSuccessOrBenignError(err) || !op.ShouldRetry("[%s] Rename to %s: %s", oldPath, newPath, err) {
return err
}else{
// Clean up the bad connection, to let underline connection to get automatic refresh
this.Impl.Close()
}
}
}
@ -126,6 +147,9 @@ func (this *FaultTolerantHdfsAccessor) Chmod(path string, mode os.FileMode) erro
err := this.Impl.Chmod(path, mode)
if IsSuccessOrBenignError(err) || !op.ShouldRetry("Chmod [%s] to [%d]: %s", path, mode, err) {
return err
}else{
// Clean up the bad connection, to let underline connection to get automatic refresh
this.Impl.Close()
}
}
}
@ -137,6 +161,14 @@ func (this *FaultTolerantHdfsAccessor) Chown(path string, user, group string) er
err := this.Impl.Chown(path, user, group)
if IsSuccessOrBenignError(err) || !op.ShouldRetry("Chown [%s] to [%s:%s]: %s", path, user, group, err) {
return err
}else{
// Clean up the bad connection, to let underline connection to get automatic refresh
this.Impl.Close()
}
}
}
// Close underline connection if needed
func (this *FaultTolerantHdfsAccessor) Close() error {
return this.Impl.Close()
}

Просмотреть файл

@ -29,6 +29,7 @@ func TestStatWithRetries(t *testing.T) {
ftHdfsAccessor := NewFaultTolerantHdfsAccessor(hdfsAccessor, atMost2Attempts())
hdfsAccessor.EXPECT().Stat("/test/file").Return(Attrs{}, errors.New("Injected failure"))
hdfsAccessor.EXPECT().Stat("/test/file").Return(Attrs{Name: "file"}, nil)
hdfsAccessor.EXPECT().Close().Return(nil)
attrs, err := ftHdfsAccessor.Stat("/test/file")
assert.Nil(t, err)
assert.Equal(t, "file", attrs.Name)
@ -41,6 +42,7 @@ func TestMkdirWithRetries(t *testing.T) {
ftHdfsAccessor := NewFaultTolerantHdfsAccessor(hdfsAccessor, atMost2Attempts())
hdfsAccessor.EXPECT().Mkdir("/test/dir", os.FileMode(0757)).Return(errors.New("Injected failure"))
hdfsAccessor.EXPECT().Mkdir("/test/dir", os.FileMode(0757)).Return(nil)
hdfsAccessor.EXPECT().Close().Return(nil)
err := ftHdfsAccessor.Mkdir("/test/dir", os.FileMode(0757))
assert.Nil(t, err)
}
@ -54,6 +56,7 @@ func TestReadDirWithRetries(t *testing.T) {
var err error
hdfsAccessor.EXPECT().ReadDir("/test/dir").Return(nil, errors.New("Injected failure"))
hdfsAccessor.EXPECT().ReadDir("/test/dir").Return(make([]Attrs, 10), nil)
hdfsAccessor.EXPECT().Close().Return(nil)
result, err = ftHdfsAccessor.ReadDir("/test/dir")
assert.Nil(t, err)
assert.Equal(t, 10, len(result))
@ -69,6 +72,7 @@ func TestOpenReadWithRetries(t *testing.T) {
var err error
hdfsAccessor.EXPECT().OpenRead("/test/file").Return(nil, errors.New("Injected failure"))
hdfsAccessor.EXPECT().OpenRead("/test/file").Return(mockReader, nil)
hdfsAccessor.EXPECT().Close().Return(nil)
result, err = ftHdfsAccessor.OpenRead("/test/file")
assert.Nil(t, err)
assert.Equal(t, mockReader, result.(*FaultTolerantHdfsReader).Impl)

Просмотреть файл

@ -35,8 +35,7 @@ func (this *FaultTolerantHdfsReader) Read(buffer []byte) (int, error) {
// Seeking to the right offset
if err = this.Impl.Seek(this.Offset); err != nil {
// Those errors are non-recoverable propagating right away
this.Impl.Close()
this.Impl = nil
this.Close()
return 0, err
}
}
@ -51,9 +50,7 @@ func (this *FaultTolerantHdfsReader) Read(buffer []byte) (int, error) {
return nr, err
}
// On failure, we need to close the reader
this.Impl.Close()
// and reset it to nil, so next time we attempt to re-open the file
this.Impl = nil
this.Close()
}
}
@ -79,5 +76,8 @@ func (this *FaultTolerantHdfsReader) Position() (int64, error) {
// Closes the stream
func (this *FaultTolerantHdfsReader) Close() error {
return this.Impl.Close()
err:= this.Impl.Close()
this.Impl = nil
return err
}

Просмотреть файл

@ -31,12 +31,12 @@ type HdfsAccessor interface {
EnsureConnected() error // Ensures HDFS accessor is connected to the HDFS name node
Chown(path string, owner, group string) error // Changes the owner and group of the file
Chmod(path string, mode os.FileMode) error // Changes the mode of the file
Close() error // Close current meta connection if needed
}
type hdfsAccessorImpl struct {
Clock Clock // interface to get wall clock time
NameNodeAddresses []string // array of Address:port string for the name nodes
CurrentNameNodeIdx int // Index of the current name node in NameNodeAddresses array
MetadataClient *hdfs.Client // HDFS client used for metadata operations
MetadataClientMutex sync.Mutex // Serializing all metadata operations for simplicity (for now), TODO: allow N concurrent operations
UserNameToUidCache map[string]UidCacheEntry // cache for converting usernames to UIDs
@ -55,7 +55,6 @@ func NewHdfsAccessor(nameNodeAddresses string, clock Clock) (HdfsAccessor, error
this := &hdfsAccessorImpl{
NameNodeAddresses: nns,
CurrentNameNodeIdx: 0,
Clock: clock,
UserNameToUidCache: make(map[string]UidCacheEntry)}
return this, nil
@ -82,21 +81,21 @@ func (this *hdfsAccessorImpl) ConnectMetadataClient() error {
// Establishes connection to a name node in the context of some other operation
func (this *hdfsAccessorImpl) ConnectToNameNode() (*hdfs.Client, error) {
// connecting to HDFS name node
nnAddr := this.NameNodeAddresses[this.CurrentNameNodeIdx]
client, err := this.connectToNameNodeImpl(nnAddr)
client, err := this.connectToNameNodeImpl()
if err != nil {
// Connection failed, updating CurrentNameNodeIdx to try different name node next time
this.CurrentNameNodeIdx = (this.CurrentNameNodeIdx + 1) % len(this.NameNodeAddresses)
return nil, errors.New(fmt.Sprintf("%s: %s", nnAddr, err.Error()))
// Connection failed
return nil, errors.New(fmt.Sprintf("Fail to connect to name node with error: %s", err.Error()))
}
Info.Println("Connected to name node:", nnAddr)
Info.Println("Connected to name node")
return client, nil
}
// Performs an attempt to connect to the HDFS name
func (this *hdfsAccessorImpl) connectToNameNodeImpl(nnAddr string) (*hdfs.Client, error) {
func (this *hdfsAccessorImpl) connectToNameNodeImpl() (*hdfs.Client, error) {
// Performing an attempt to connect to the name node
client, err := hdfs.New(nnAddr)
client, err := hdfs.NewClient(hdfs.ClientOptions{
Addresses: this.NameNodeAddresses,
})
if err != nil {
return nil, err
}
@ -357,3 +356,16 @@ func (this *hdfsAccessorImpl) Chown(path string, user, group string) error {
}
return this.MetadataClient.Chown(path, user, group)
}
// Close current connection if needed
func (this *hdfsAccessorImpl) Close() error {
this.MetadataClientMutex.Lock()
defer this.MetadataClientMutex.Unlock()
if(this.MetadataClient != nil) {
err:= this.MetadataClient.Close()
this.MetadataClient = nil
return err
}
return nil
}

Просмотреть файл

@ -39,7 +39,7 @@ func NewNoRetryPolicy() *RetryPolicy {
func NewDefaultRetryPolicy(clock Clock) *RetryPolicy {
return &RetryPolicy{
Clock: clock,
MaxAttempts: 99999999999,
MaxAttempts: 10,
TimeLimit: 5 * time.Minute,
MinDelay: 1 * time.Second,
MaxDelay: 1 * time.Minute,