diff --git a/go/cmd/zk/zkcmd.go b/go/cmd/zk/zkcmd.go index 5517088501..c71fe711c5 100644 --- a/go/cmd/zk/zkcmd.go +++ b/go/cmd/zk/zkcmd.go @@ -365,34 +365,22 @@ func cmdLs(args []string) { if *longListing && isDir { fmt.Printf("total: %v\n", len(children)) } - wg := sync.WaitGroup{} - mutex := sync.Mutex{} - statMap := make(map[string]zk.Stat) sort.Strings(children) - for _, child := range children { - localPath := path.Join(zkPath, child) - wg.Add(1) - go func() { - stat, err := zconn.Exists(localPath) - if err != nil { - log.Printf("ls: cannot access: %v: %v", localPath, err) - } else { - mutex.Lock() - statMap[localPath] = stat - mutex.Unlock() - } - wg.Done() - }() + stats := make([]zk.Stat, len(children)) + f := func(i int) { + localPath := path.Join(zkPath, children[i]) + stat, err := zconn.Exists(localPath) + if err != nil { + log.Printf("ls: cannot access: %v: %v", localPath, err) + } else { + stats[i] = stat + } } + fmap(f, len(children), defaultConcurrency) - wg.Wait() - // Lock to read statMap consistently. - mutex.Lock() - defer mutex.Unlock() - - for _, child := range children { + for i, child := range children { localPath := path.Join(zkPath, child) - if stat := statMap[localPath]; stat != nil { + if stat := stats[i]; stat != nil { fmtPath(stat, localPath, showFullPath) } } @@ -783,6 +771,13 @@ func multiFileCp(args []string) { } } +type zkItem struct { + path string + data string + stat zk.Stat + err error +} + // Store a zk tree in a zip archive. This won't be immediately useful to // zip tools since even "directories" can contain data. func cmdZip(args []string) { @@ -813,9 +808,21 @@ func cmdZip(args []string) { } } + items := make(chan *zkItem, 16) + f := func(i int) { + data, stat, err := zconn.Get(pathList[i]) + items <- &zkItem{pathList[i], data, stat, err} + } + go func() { + // FIXME(msolomon) don't love the fmap pattern here - mixing + // bounded and unbounded structures, hence this extra closure. + fmap(f, len(pathList), defaultConcurrency) + close(items) + }() + zipWriter := zip.NewWriter(zipFile) - for _, path := range pathList { - data, stat, err := zconn.Get(path) + for item := range items { + path, data, stat, err := item.path, item.data, item.stat, item.err if err != nil { log.Fatal("zip: get failed: %v", err) } @@ -883,3 +890,26 @@ func cmdUnzip(args []string) { rc.Close() } } + +const defaultConcurrency = 16 + +func fmap(f func(i int), xrange, concurrency int) { + wg := sync.WaitGroup{} + work := make(chan int, 1) + for j := 0; j < concurrency; j++ { + wg.Add(1) + go func() { + for i := range work { + f(i) + } + wg.Done() + }() + } + + for x := 0; x < xrange; x++ { + work <- x + } + close(work) + + wg.Wait() +}