diff --git a/HdfsAccessor.go b/HdfsAccessor.go index effe1d6..b9c1fa7 100644 --- a/HdfsAccessor.go +++ b/HdfsAccessor.go @@ -42,7 +42,7 @@ type UidCacheEntry struct { var _ HdfsAccessor = (*hdfsAccessorImpl)(nil) // ensure hdfsAccessorImpl implements HdfsAccessor // Creates an instance of HdfsAccessor -func NewHdfsAccessor(nameNodeAddresses string, retryPolicy *RetryPolicy, clock Clock) (HdfsAccessor, error) { +func NewHdfsAccessor(nameNodeAddresses string, retryPolicy *RetryPolicy, lazyMount bool, clock Clock) (HdfsAccessor, error) { nns := strings.Split(nameNodeAddresses, ",") this := &hdfsAccessorImpl{ @@ -52,65 +52,86 @@ func NewHdfsAccessor(nameNodeAddresses string, retryPolicy *RetryPolicy, clock C Clock: clock, UserNameToUidCache: make(map[string]UidCacheEntry)} - //TODO: support deferred on-demand creation to allow successful mounting before HDFS is available - client, err := this.ConnectToNameNode() - if err != nil { - return nil, err + if !lazyMount { + // If --mount.lazy isn't requested, connecting to the name node right away + if err := this.ConnectMetadataClient(this.RetryPolicy.StartOperation()); err != nil { + return nil, err + } } - this.MetadataClient = client + return this, nil } -func (this *hdfsAccessorImpl) ConnectToNameNode() (*hdfs.Client, error) { - op := this.RetryPolicy.StartOperation() - startIdx := this.CurrentNameNodeIdx - for { - // connecting to HDFS name nodes in round-robin fashion: - nnIdx := (startIdx + op.Attempt - 1) % len(this.NameNodeAddresses) - nnAddr := this.NameNodeAddresses[nnIdx] +// Establishes connection to the name node (assigns MetadataClient field) +func (this *hdfsAccessorImpl) ConnectMetadataClient(op *Op) error { + client, err := this.ConnectToNameNode(op) + if err != nil { + return err + } + this.MetadataClient = client + return nil +} - // Performing an attempt to connect to the name node - client, err := hdfs.New(nnAddr) +// Establishes connection to a name node in the context of some other operation +func (this *hdfsAccessorImpl) ConnectToNameNode(op *Op) (*hdfs.Client, error) { + for { + // connecting to HDFS name node + nnAddr := this.NameNodeAddresses[this.CurrentNameNodeIdx] + client, err := this.connectToNameNodeAttempt(nnAddr) if err != nil { + // Connection failed, updating CurrentNameNodeIdx to try different name node next time + this.CurrentNameNodeIdx = (this.CurrentNameNodeIdx + 1) % len(this.NameNodeAddresses) if op.ShouldRetry("connect %s: %s", nnAddr, err) { continue } else { return nil, err } } - // connection is OK, but we need to check whether name node is operating ans expected - // (this also checks whether name node is Active) - // Performing this check, by doing Stat() for a path inside root directory - // Note: The file '/$' doesn't have to be present - // - both nil and ErrNotExists error codes indicate success of the operation - _, statErr := client.Stat("/$") + return client, nil + } +} - if pathError, ok := statErr.(*os.PathError); statErr == nil || ok && (pathError.Err == os.ErrNotExist) { - // Succesfully connected, memoizing the index of the name node, to speedup next connect - this.CurrentNameNodeIdx = nnIdx - return client, nil - } else { - //TODO: how to close connection ? - if op.ShouldRetry("healthcheck %s: %s", nnAddr, statErr) { - continue - } else { - return nil, statErr - } - } +// Performs a single attempt to connect to the name node +func (this *hdfsAccessorImpl) connectToNameNodeAttempt(nnAddr string) (*hdfs.Client, error) { + // Performing an attempt to connect to the name node + client, err := hdfs.New(nnAddr) + if err != nil { + return nil, err + } + // connection is OK, but we need to check whether name node is operating ans expected + // (this also checks whether name node is Active) + // Performing this check, by doing Stat() for a path inside root directory + // Note: The file '/$' doesn't have to be present + // - both nil and ErrNotExists error codes indicate success of the operation + _, statErr := client.Stat("/$") + + if pathError, ok := statErr.(*os.PathError); statErr == nil || ok && (pathError.Err == os.ErrNotExist) { + // Succesfully connected + return client, nil + } else { + //TODO: how to close connection ? + return nil, statErr } } // Opens HDFS file for reading func (this *hdfsAccessorImpl) OpenRead(path string) (HdfsReader, error) { - client, err1 := this.ConnectToNameNode() - if err1 != nil { - return nil, err1 + op := this.RetryPolicy.StartOperation() + for { + client, err1 := this.ConnectToNameNode(op) + if err1 != nil { + return nil, err1 + } + reader, err2 := client.Open(path) + if err2 != nil { + if op.ShouldRetry("[%s] OpenRead: %s", path, err2) { + continue + } else { + return nil, err2 + } + } + return NewHdfsReader(reader), nil } - reader, err2 := client.Open(path) - if err2 != nil { - return nil, err2 - } - return NewHdfsReader(reader), nil } // Opens HDFS file for writing @@ -122,7 +143,35 @@ func (this *hdfsAccessorImpl) OpenWrite(path string) (HdfsWriter, error) { func (this *hdfsAccessorImpl) ReadDir(path string) ([]Attrs, error) { this.MetadataClientMutex.Lock() defer this.MetadataClientMutex.Unlock() + op := this.RetryPolicy.StartOperation() + for { + if this.MetadataClient == nil { + if err := this.ConnectMetadataClient(op); err != nil { + return nil, err + } + } + attrs, err := this.readDirAttempt(path) + if err != nil { + if pathError, ok := err.(*os.PathError); ok && (pathError.Err == os.ErrNotExist) { + // benign error (path not found) + return nil, err + } + // We've got error from this client, setting to nil, so we try another one next time + this.MetadataClient = nil + // TODO: attempt to gracefully close the conenction + if op.ShouldRetry("[%s]:ReadDir: %s", path, err) { + continue + } else { + return nil, err + } + } + return attrs, nil + } +} + +// Performs 1 attempt to enumerate HDFS directory +func (this *hdfsAccessorImpl) readDirAttempt(path string) ([]Attrs, error) { files, err := this.MetadataClient.ReadDir(path) if err != nil { return nil, err @@ -140,11 +189,32 @@ func (this *hdfsAccessorImpl) Stat(path string) (Attrs, error) { this.MetadataClientMutex.Lock() defer this.MetadataClientMutex.Unlock() - fileInfo, err := this.MetadataClient.Stat(path) - if err != nil { - return Attrs{}, err + op := this.RetryPolicy.StartOperation() + for { + if this.MetadataClient == nil { + if err := this.ConnectMetadataClient(op); err != nil { + return Attrs{}, err + } + } + + fileInfo, err := this.MetadataClient.Stat(path) + if err != nil { + if pathError, ok := err.(*os.PathError); ok && (pathError.Err == os.ErrNotExist) { + // benign error (path not found) + return Attrs{}, err + } + + // We've got error from this client, setting to nil, so we try another one next time + this.MetadataClient = nil + // TODO: attempt to gracefully close the conenction + if op.ShouldRetry("[%s]:Stat: %s", path, err) { + continue + } else { + return Attrs{}, err + } + } + return this.AttrsFromFileInfo(fileInfo), nil } - return this.AttrsFromFileInfo(fileInfo), nil } // Converts os.FileInfo + underlying proto-buf data into Attrs structure @@ -173,7 +243,6 @@ func (this *hdfsAccessorImpl) LookupUid(userName string) uint32 { if ok && this.Clock.Now().Before(cacheEntry.Expires) { return cacheEntry.Uid } - print("u:" + userName + "\n") u, err := user.Lookup(userName) var uid64 uint64 if err == nil { diff --git a/main.go b/main.go index b1203ea..176c5cf 100644 --- a/main.go +++ b/main.go @@ -23,6 +23,8 @@ func main() { sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + lazyMount := flag.Bool("lazy", false, "Allows to mount HDFS filesystem before HDFS is available") + flag.Usage = Usage flag.Parse() @@ -34,7 +36,7 @@ func main() { retryPolicy := NewDefaultRetryPolicy(WallClock{}) // TODO: add command line options to customize retry polic - hdfsAccessor, err := NewHdfsAccessor(flag.Arg(0), retryPolicy, WallClock{}) + hdfsAccessor, err := NewHdfsAccessor(flag.Arg(0), retryPolicy, *lazyMount, WallClock{}) if err != nil { log.Fatal("Error/NewHdfsAccessor: ", err) } @@ -62,6 +64,9 @@ func main() { //TODO: before doing that we need to finish deferred flushes log.Print("Signal received: " + x.String()) fileSystem.Unmount() // this will cause Serve() call below to exit + // Also reseting retry policy properties to stop useless retries + retryPolicy.MaxAttempts = 0 + retryPolicy.MaxDelay = 0 } }() err = fs.Serve(c, fileSystem)