зеркало из https://github.com/github/vitess-gh.git
limiting maximum concurrency
This commit is contained in:
Родитель
3337a17354
Коммит
48f7f7e945
|
@ -365,34 +365,22 @@ func cmdLs(args []string) {
|
|||
if *longListing && isDir {
|
||||
fmt.Printf("total: %v\n", len(children))
|
||||
}
|
||||
wg := sync.WaitGroup{}
|
||||
mutex := sync.Mutex{}
|
||||
statMap := make(map[string]zk.Stat)
|
||||
sort.Strings(children)
|
||||
for _, child := range children {
|
||||
localPath := path.Join(zkPath, child)
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
stat, err := zconn.Exists(localPath)
|
||||
if err != nil {
|
||||
log.Printf("ls: cannot access: %v: %v", localPath, err)
|
||||
} else {
|
||||
mutex.Lock()
|
||||
statMap[localPath] = stat
|
||||
mutex.Unlock()
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
stats := make([]zk.Stat, len(children))
|
||||
f := func(i int) {
|
||||
localPath := path.Join(zkPath, children[i])
|
||||
stat, err := zconn.Exists(localPath)
|
||||
if err != nil {
|
||||
log.Printf("ls: cannot access: %v: %v", localPath, err)
|
||||
} else {
|
||||
stats[i] = stat
|
||||
}
|
||||
}
|
||||
fmap(f, len(children), defaultConcurrency)
|
||||
|
||||
wg.Wait()
|
||||
// Lock to read statMap consistently.
|
||||
mutex.Lock()
|
||||
defer mutex.Unlock()
|
||||
|
||||
for _, child := range children {
|
||||
for i, child := range children {
|
||||
localPath := path.Join(zkPath, child)
|
||||
if stat := statMap[localPath]; stat != nil {
|
||||
if stat := stats[i]; stat != nil {
|
||||
fmtPath(stat, localPath, showFullPath)
|
||||
}
|
||||
}
|
||||
|
@ -783,6 +771,13 @@ func multiFileCp(args []string) {
|
|||
}
|
||||
}
|
||||
|
||||
type zkItem struct {
|
||||
path string
|
||||
data string
|
||||
stat zk.Stat
|
||||
err error
|
||||
}
|
||||
|
||||
// Store a zk tree in a zip archive. This won't be immediately useful to
|
||||
// zip tools since even "directories" can contain data.
|
||||
func cmdZip(args []string) {
|
||||
|
@ -813,9 +808,21 @@ func cmdZip(args []string) {
|
|||
}
|
||||
}
|
||||
|
||||
items := make(chan *zkItem, 16)
|
||||
f := func(i int) {
|
||||
data, stat, err := zconn.Get(pathList[i])
|
||||
items <- &zkItem{pathList[i], data, stat, err}
|
||||
}
|
||||
go func() {
|
||||
// FIXME(msolomon) don't love the fmap pattern here - mixing
|
||||
// bounded and unbounded structures, hence this extra closure.
|
||||
fmap(f, len(pathList), defaultConcurrency)
|
||||
close(items)
|
||||
}()
|
||||
|
||||
zipWriter := zip.NewWriter(zipFile)
|
||||
for _, path := range pathList {
|
||||
data, stat, err := zconn.Get(path)
|
||||
for item := range items {
|
||||
path, data, stat, err := item.path, item.data, item.stat, item.err
|
||||
if err != nil {
|
||||
log.Fatal("zip: get failed: %v", err)
|
||||
}
|
||||
|
@ -883,3 +890,26 @@ func cmdUnzip(args []string) {
|
|||
rc.Close()
|
||||
}
|
||||
}
|
||||
|
||||
const defaultConcurrency = 16
|
||||
|
||||
func fmap(f func(i int), xrange, concurrency int) {
|
||||
wg := sync.WaitGroup{}
|
||||
work := make(chan int, 1)
|
||||
for j := 0; j < concurrency; j++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
for i := range work {
|
||||
f(i)
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
for x := 0; x < xrange; x++ {
|
||||
work <- x
|
||||
}
|
||||
close(work)
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче