Parallelize dumping tables, make split accept the empty string as a min/max key.

This commit is contained in:
Ric Szopa 2012-11-19 15:25:41 -08:00
Родитель c02070d8ab
Коммит 0be6c0825c
3 изменённых файлов: 71 добавлений и 56 удалений

Просмотреть файл

@ -38,12 +38,11 @@ func init() {
simulateFailures = statErr == nil
}
/* compress a single file with gzip, leaving the src file intact.
Also computes the md5 hash on the fly.
FIXME(msolomon) not sure how well Go will schedule cpu intensive tasks
might be better if this forked off workers.
*/
// compressFile compresses a single file with gzip, leaving the src
// file intact. Also computes the md5 hash on the fly.
func compressFile(srcPath, dstPath string) (*SnapshotFile, error) {
// FIXME(msolomon) not sure how well Go will schedule cpu intensive tasks
// might be better if this forked off workers.
relog.Info("compressFile: starting to compress %v into %v", srcPath, dstPath)
srcFile, err := os.OpenFile(srcPath, os.O_RDONLY, 0)
if err != nil {
@ -96,7 +95,7 @@ func compressFile(srcPath, dstPath string) (*SnapshotFile, error) {
return &SnapshotFile{dstPath, hash}, nil
}
// compress multiple files in parallel
// compressFile compresses multiple files in parallel.
func compressFiles(sources, destinations []string) ([]SnapshotFile, error) {
if len(sources) != len(destinations) || len(sources) == 0 {
panic(fmt.Errorf("bad array lengths: %v %v", len(sources), len(destinations)))
@ -112,12 +111,7 @@ func compressFiles(sources, destinations []string) ([]SnapshotFile, error) {
resultQueue := make(chan error, len(sources))
for i := 0; i < compressConcurrency; i++ {
go func() {
for {
i, ok := <-workQueue
if !ok {
return
}
for i := range workQueue {
sf, err := compressFile(sources[i], destinations[i])
if err == nil {
snapshotFiles[i] = *sf
@ -150,10 +144,10 @@ func compressFiles(sources, destinations []string) ([]SnapshotFile, error) {
return snapshotFiles, nil
}
// This function fetches data from the web server.
// It then sends it to a tee, which on one side has an md5 checksum
// reader, and on the other a gunzip reader writing to a file.
// It will compare the md5 checksum after the copy is done.
// fetchFile fetches data from the web server. It then sends it to a
// tee, which on one side has an md5 checksum reader, and on the other
// a gunzip reader writing to a file. It will compare the md5
// checksum after the copy is done.
func fetchFile(srcUrl, srcHash, dstFilename string) error {
relog.Info("fetchFile: starting to fetch %v", dstFilename)
@ -232,8 +226,8 @@ func fetchFile(srcUrl, srcHash, dstFilename string) error {
return os.Rename(dstFile.Name(), dstFilename)
}
// This function fetches data from the web server,
// retrying a few times.
// fetchFileWithRetry fetches data from the web server, retrying a few
// times.
func fetchFileWithRetry(srcUrl, srcHash, dstFilename string) (err error) {
for i := 0; i < fetchRetryCount; i++ {
err = fetchFile(srcUrl, srcHash, dstFilename)
@ -261,11 +255,7 @@ func fetchFiles(snapshotManifest *SnapshotManifest, destinationPath string) (err
resultQueue := make(chan error, len(snapshotManifest.Files))
for i := 0; i < fetchConcurrency; i++ {
go func() {
for {
fi, ok := <-workQueue
if !ok {
return
}
for fi := range workQueue {
filename := fi.getLocalFilename(destinationPath)
furl := "http://" + snapshotManifest.Addr + fi.Path
resultQueue <- fetchFileWithRetry(furl, fi.Hash, filename)

Просмотреть файл

@ -116,8 +116,10 @@ var selectIntoOutfile = `SELECT * INTO OUTFILE "{{.TableOutputPath}}"
CHARACTER SET binary
FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '"' ESCAPED BY '\\'
LINES TERMINATED BY '\n'
FROM {{.TableName}} WHERE {{.KeyspaceIdColumnName}} >= 0x{{.StartKey}} AND
{{.KeyspaceIdColumnName}} < 0x{{.EndKey}}`
FROM {{.TableName}} WHERE
{{if .StartKey}}{{ .KeyspaceIdColumnName }} >= 0x{{.StartKey}} {{end}}
{{if and .StartKey .EndKey}} AND {{end}}
{{if .EndKey}} {{.KeyspaceIdColumnName}} < 0x{{.EndKey}} {{end}}`
var loadDataInfile = `LOAD DATA INFILE '{{.TableInputPath}}' INTO TABLE {{.TableName}}
CHARACTER SET binary
@ -284,41 +286,64 @@ func (mysqld *Mysqld) CreateSplitSnapshot(dbName, keyName string, startKey, endK
return ssmFile, nil
}
const dumpConcurrency = 4
// createSplitSnapshotManifest exports each table to a CSV-like file
// and compresses the results.
func (mysqld *Mysqld) createSplitSnapshotManifest(dbName, keyName string, startKey, endKey key.HexKeyspaceId, cloneSourcePath string, sd *SchemaDefinition) ([]SnapshotFile, error) {
// export each table to a CSV-like file, compress the results
tableFiles := make([]string, len(sd.TableDefinitions))
// FIXME(msolomon) parallelize
for i, td := range sd.TableDefinitions {
relog.Info("Dump table %v...", td.Name)
filename := path.Join(cloneSourcePath, td.Name+".csv")
tableFiles[i] = filename
n := len(sd.TableDefinitions)
errors := make(chan error)
work := make(chan int, n)
queryParams := map[string]string{
"TableName": dbName + "." + td.Name,
"KeyspaceIdColumnName": keyName,
// FIXME(alainjobart): move these to bind params
"TableOutputPath": filename,
"StartKey": string(startKey),
"EndKey": string(endKey),
}
query := mustFillStringTemplate(selectIntoOutfile, queryParams)
relog.Info(" %v", query)
if err := mysqld.executeSuperQuery(query); err != nil {
return nil, err
for i := 0; i < n; i++ {
work <- i
}
close(work)
tableFiles := make([]string, n)
compressedFiles := make([]string, n)
for i := 0; i < dumpConcurrency; i++ {
go func() {
for i := range work {
td := sd.TableDefinitions[i]
relog.Info("Dump table %v...", td.Name)
filename := path.Join(cloneSourcePath, td.Name+".csv")
queryParams := map[string]string{
"TableName": dbName + "." + td.Name,
"KeyspaceIdColumnName": keyName,
// FIXME(alainjobart): move these to bind params
"TableOutputPath": filename,
"StartKey": string(startKey),
"EndKey": string(endKey),
}
err := mysqld.executeSuperQuery(mustFillStringTemplate(selectIntoOutfile, queryParams))
if err == nil {
tableFiles[i] = filename
compressedFiles[i] = filename + ".gz"
}
errors <- err
}
}()
}
var err error
for i := 0; i < n; i++ {
if dumpErr := <-errors; dumpErr != nil {
err = dumpErr
}
}
sources := make([]string, 0, 128)
destinations := make([]string, 0, 128)
for _, srcPath := range tableFiles {
sources = append(sources, srcPath)
destinations = append(destinations, srcPath+".gz")
}
dataFiles, err := compressFiles(sources, destinations)
if err != nil {
// prune files to free up disk space, if it errors,
// we'll figure out later
for _, srcPath := range tableFiles {
if srcPath != "" {
os.Remove(srcPath)
}
}
return nil, err
}
dataFiles, err := compressFiles(tableFiles, compressedFiles)
if err != nil {
for _, srcPath := range tableFiles {
os.Remove(srcPath)
}

Просмотреть файл

@ -295,7 +295,7 @@ def run_test_mysqlctl_split():
tablet_62344.populate('vt_test_keyspace', create_vt_insert_test,
populate_vt_insert_test)
err = tablet_62344.mysqlctl('-port 6700 -mysql-port 3700 partialsnapshot --start=0000000000000000 --end=0000000000000003 vt_test_keyspace id').wait()
err = tablet_62344.mysqlctl('-port 6700 -mysql-port 3700 partialsnapshot --end=0000000000000003 vt_test_keyspace id').wait()
if err != 0:
raise utils.TestError('mysqlctl partialsnapshot failed')