Added -lazy flag for lazy mounts and implemented automatic retries inside HdfsAccessor
This commit is contained in:
Родитель
6fda1cc91d
Коммит
f6a0b31322
161
HdfsAccessor.go
161
HdfsAccessor.go
|
@ -42,7 +42,7 @@ type UidCacheEntry struct {
|
|||
var _ HdfsAccessor = (*hdfsAccessorImpl)(nil) // ensure hdfsAccessorImpl implements HdfsAccessor
|
||||
|
||||
// Creates an instance of HdfsAccessor
|
||||
func NewHdfsAccessor(nameNodeAddresses string, retryPolicy *RetryPolicy, clock Clock) (HdfsAccessor, error) {
|
||||
func NewHdfsAccessor(nameNodeAddresses string, retryPolicy *RetryPolicy, lazyMount bool, clock Clock) (HdfsAccessor, error) {
|
||||
nns := strings.Split(nameNodeAddresses, ",")
|
||||
|
||||
this := &hdfsAccessorImpl{
|
||||
|
@ -52,65 +52,86 @@ func NewHdfsAccessor(nameNodeAddresses string, retryPolicy *RetryPolicy, clock C
|
|||
Clock: clock,
|
||||
UserNameToUidCache: make(map[string]UidCacheEntry)}
|
||||
|
||||
//TODO: support deferred on-demand creation to allow successful mounting before HDFS is available
|
||||
client, err := this.ConnectToNameNode()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
if !lazyMount {
|
||||
// If --mount.lazy isn't requested, connecting to the name node right away
|
||||
if err := this.ConnectMetadataClient(this.RetryPolicy.StartOperation()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
this.MetadataClient = client
|
||||
|
||||
return this, nil
|
||||
}
|
||||
|
||||
func (this *hdfsAccessorImpl) ConnectToNameNode() (*hdfs.Client, error) {
|
||||
op := this.RetryPolicy.StartOperation()
|
||||
startIdx := this.CurrentNameNodeIdx
|
||||
for {
|
||||
// connecting to HDFS name nodes in round-robin fashion:
|
||||
nnIdx := (startIdx + op.Attempt - 1) % len(this.NameNodeAddresses)
|
||||
nnAddr := this.NameNodeAddresses[nnIdx]
|
||||
// Establishes connection to the name node (assigns MetadataClient field)
|
||||
func (this *hdfsAccessorImpl) ConnectMetadataClient(op *Op) error {
|
||||
client, err := this.ConnectToNameNode(op)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
this.MetadataClient = client
|
||||
return nil
|
||||
}
|
||||
|
||||
// Performing an attempt to connect to the name node
|
||||
client, err := hdfs.New(nnAddr)
|
||||
// Establishes connection to a name node in the context of some other operation
|
||||
func (this *hdfsAccessorImpl) ConnectToNameNode(op *Op) (*hdfs.Client, error) {
|
||||
for {
|
||||
// connecting to HDFS name node
|
||||
nnAddr := this.NameNodeAddresses[this.CurrentNameNodeIdx]
|
||||
client, err := this.connectToNameNodeAttempt(nnAddr)
|
||||
if err != nil {
|
||||
// Connection failed, updating CurrentNameNodeIdx to try different name node next time
|
||||
this.CurrentNameNodeIdx = (this.CurrentNameNodeIdx + 1) % len(this.NameNodeAddresses)
|
||||
if op.ShouldRetry("connect %s: %s", nnAddr, err) {
|
||||
continue
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
// connection is OK, but we need to check whether name node is operating ans expected
|
||||
// (this also checks whether name node is Active)
|
||||
// Performing this check, by doing Stat() for a path inside root directory
|
||||
// Note: The file '/$' doesn't have to be present
|
||||
// - both nil and ErrNotExists error codes indicate success of the operation
|
||||
_, statErr := client.Stat("/$")
|
||||
return client, nil
|
||||
}
|
||||
}
|
||||
|
||||
if pathError, ok := statErr.(*os.PathError); statErr == nil || ok && (pathError.Err == os.ErrNotExist) {
|
||||
// Succesfully connected, memoizing the index of the name node, to speedup next connect
|
||||
this.CurrentNameNodeIdx = nnIdx
|
||||
return client, nil
|
||||
} else {
|
||||
//TODO: how to close connection ?
|
||||
if op.ShouldRetry("healthcheck %s: %s", nnAddr, statErr) {
|
||||
continue
|
||||
} else {
|
||||
return nil, statErr
|
||||
}
|
||||
}
|
||||
// Performs a single attempt to connect to the name node
|
||||
func (this *hdfsAccessorImpl) connectToNameNodeAttempt(nnAddr string) (*hdfs.Client, error) {
|
||||
// Performing an attempt to connect to the name node
|
||||
client, err := hdfs.New(nnAddr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// connection is OK, but we need to check whether name node is operating ans expected
|
||||
// (this also checks whether name node is Active)
|
||||
// Performing this check, by doing Stat() for a path inside root directory
|
||||
// Note: The file '/$' doesn't have to be present
|
||||
// - both nil and ErrNotExists error codes indicate success of the operation
|
||||
_, statErr := client.Stat("/$")
|
||||
|
||||
if pathError, ok := statErr.(*os.PathError); statErr == nil || ok && (pathError.Err == os.ErrNotExist) {
|
||||
// Succesfully connected
|
||||
return client, nil
|
||||
} else {
|
||||
//TODO: how to close connection ?
|
||||
return nil, statErr
|
||||
}
|
||||
}
|
||||
|
||||
// Opens HDFS file for reading
|
||||
func (this *hdfsAccessorImpl) OpenRead(path string) (HdfsReader, error) {
|
||||
client, err1 := this.ConnectToNameNode()
|
||||
if err1 != nil {
|
||||
return nil, err1
|
||||
op := this.RetryPolicy.StartOperation()
|
||||
for {
|
||||
client, err1 := this.ConnectToNameNode(op)
|
||||
if err1 != nil {
|
||||
return nil, err1
|
||||
}
|
||||
reader, err2 := client.Open(path)
|
||||
if err2 != nil {
|
||||
if op.ShouldRetry("[%s] OpenRead: %s", path, err2) {
|
||||
continue
|
||||
} else {
|
||||
return nil, err2
|
||||
}
|
||||
}
|
||||
return NewHdfsReader(reader), nil
|
||||
}
|
||||
reader, err2 := client.Open(path)
|
||||
if err2 != nil {
|
||||
return nil, err2
|
||||
}
|
||||
return NewHdfsReader(reader), nil
|
||||
}
|
||||
|
||||
// Opens HDFS file for writing
|
||||
|
@ -122,7 +143,35 @@ func (this *hdfsAccessorImpl) OpenWrite(path string) (HdfsWriter, error) {
|
|||
func (this *hdfsAccessorImpl) ReadDir(path string) ([]Attrs, error) {
|
||||
this.MetadataClientMutex.Lock()
|
||||
defer this.MetadataClientMutex.Unlock()
|
||||
op := this.RetryPolicy.StartOperation()
|
||||
for {
|
||||
if this.MetadataClient == nil {
|
||||
if err := this.ConnectMetadataClient(op); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
attrs, err := this.readDirAttempt(path)
|
||||
if err != nil {
|
||||
if pathError, ok := err.(*os.PathError); ok && (pathError.Err == os.ErrNotExist) {
|
||||
// benign error (path not found)
|
||||
return nil, err
|
||||
}
|
||||
// We've got error from this client, setting to nil, so we try another one next time
|
||||
this.MetadataClient = nil
|
||||
// TODO: attempt to gracefully close the conenction
|
||||
if op.ShouldRetry("[%s]:ReadDir: %s", path, err) {
|
||||
continue
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return attrs, nil
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Performs 1 attempt to enumerate HDFS directory
|
||||
func (this *hdfsAccessorImpl) readDirAttempt(path string) ([]Attrs, error) {
|
||||
files, err := this.MetadataClient.ReadDir(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -140,11 +189,32 @@ func (this *hdfsAccessorImpl) Stat(path string) (Attrs, error) {
|
|||
this.MetadataClientMutex.Lock()
|
||||
defer this.MetadataClientMutex.Unlock()
|
||||
|
||||
fileInfo, err := this.MetadataClient.Stat(path)
|
||||
if err != nil {
|
||||
return Attrs{}, err
|
||||
op := this.RetryPolicy.StartOperation()
|
||||
for {
|
||||
if this.MetadataClient == nil {
|
||||
if err := this.ConnectMetadataClient(op); err != nil {
|
||||
return Attrs{}, err
|
||||
}
|
||||
}
|
||||
|
||||
fileInfo, err := this.MetadataClient.Stat(path)
|
||||
if err != nil {
|
||||
if pathError, ok := err.(*os.PathError); ok && (pathError.Err == os.ErrNotExist) {
|
||||
// benign error (path not found)
|
||||
return Attrs{}, err
|
||||
}
|
||||
|
||||
// We've got error from this client, setting to nil, so we try another one next time
|
||||
this.MetadataClient = nil
|
||||
// TODO: attempt to gracefully close the conenction
|
||||
if op.ShouldRetry("[%s]:Stat: %s", path, err) {
|
||||
continue
|
||||
} else {
|
||||
return Attrs{}, err
|
||||
}
|
||||
}
|
||||
return this.AttrsFromFileInfo(fileInfo), nil
|
||||
}
|
||||
return this.AttrsFromFileInfo(fileInfo), nil
|
||||
}
|
||||
|
||||
// Converts os.FileInfo + underlying proto-buf data into Attrs structure
|
||||
|
@ -173,7 +243,6 @@ func (this *hdfsAccessorImpl) LookupUid(userName string) uint32 {
|
|||
if ok && this.Clock.Now().Before(cacheEntry.Expires) {
|
||||
return cacheEntry.Uid
|
||||
}
|
||||
print("u:" + userName + "\n")
|
||||
u, err := user.Lookup(userName)
|
||||
var uid64 uint64
|
||||
if err == nil {
|
||||
|
|
7
main.go
7
main.go
|
@ -23,6 +23,8 @@ func main() {
|
|||
sigs := make(chan os.Signal, 1)
|
||||
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
lazyMount := flag.Bool("lazy", false, "Allows to mount HDFS filesystem before HDFS is available")
|
||||
|
||||
flag.Usage = Usage
|
||||
flag.Parse()
|
||||
|
||||
|
@ -34,7 +36,7 @@ func main() {
|
|||
retryPolicy := NewDefaultRetryPolicy(WallClock{})
|
||||
// TODO: add command line options to customize retry polic
|
||||
|
||||
hdfsAccessor, err := NewHdfsAccessor(flag.Arg(0), retryPolicy, WallClock{})
|
||||
hdfsAccessor, err := NewHdfsAccessor(flag.Arg(0), retryPolicy, *lazyMount, WallClock{})
|
||||
if err != nil {
|
||||
log.Fatal("Error/NewHdfsAccessor: ", err)
|
||||
}
|
||||
|
@ -62,6 +64,9 @@ func main() {
|
|||
//TODO: before doing that we need to finish deferred flushes
|
||||
log.Print("Signal received: " + x.String())
|
||||
fileSystem.Unmount() // this will cause Serve() call below to exit
|
||||
// Also reseting retry policy properties to stop useless retries
|
||||
retryPolicy.MaxAttempts = 0
|
||||
retryPolicy.MaxDelay = 0
|
||||
}
|
||||
}()
|
||||
err = fs.Serve(c, fileSystem)
|
||||
|
|
Загрузка…
Ссылка в новой задаче