Merge branch 'master' into vtgate

This commit is contained in:
Sugu Sougoumarane 2014-01-15 21:30:21 -08:00
Родитель be67244006 86339ec108
Коммит 7dc8438210
9 изменённых файлов: 382 добавлений и 67 удалений

Просмотреть файл

@ -0,0 +1,56 @@
package csvsplitter
import (
"bufio"
"bytes"
"io"
)
type CSVReader struct {
reader *bufio.Reader
delim byte
buf *bytes.Buffer
}
func NewCSVReader(r io.Reader, delim byte) *CSVReader {
return &CSVReader{
reader: bufio.NewReader(r),
delim: delim,
buf: bytes.NewBuffer(make([]byte, 0, 1024)),
}
}
// ReadRecord returns a keyspaceId and a line from which it was
// extracted, with the keyspaceId stripped.
func (r CSVReader) ReadRecord() (line []byte, err error) {
defer r.buf.Reset()
escaped := false
inQuote := false
for {
b, err := r.reader.ReadByte()
if err != nil {
// Assumption: the csv file ends with a
// newline. Otherwise io.EOF should be treated
// separately.
return nil, err
}
r.buf.WriteByte(b)
if escaped {
escaped = false
continue
}
switch b {
case '\\':
escaped = true
case '"':
inQuote = !inQuote
case '\n':
if !inQuote {
return r.buf.Bytes(), nil
}
}
}
}

Просмотреть файл

@ -0,0 +1,75 @@
package csvsplitter
import (
"io"
"os"
"testing"
"github.com/youtube/vitess/go/testfiles"
)
func readLines(t *testing.T, name string) []string {
file, err := os.Open(testfiles.Locate(name))
if err != nil {
t.Fatalf("Cannot open %v: %v", name, err)
}
r := NewCSVReader(file, ',')
lines := make([]string, 0)
for {
line, err := r.ReadRecord()
if err == io.EOF {
break
}
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
lines = append(lines, string(line))
}
return lines
}
func checkWantedLines(t *testing.T, got, expected []string) {
if len(got) != len(expected) {
t.Fatalf("Wrong number of records: expected %v, got %v", len(expected), len(got))
}
for i, wanted := range expected {
if got[i] != wanted {
t.Errorf("Wrong line: expected %q got %q", wanted, got[i])
}
}
}
func TestCSVReader1(t *testing.T) {
// csvsplitter_mean.csv was generated using "select keyspaced_id,
// tablename.* into outfile".
lines := readLines(t, "csvsplitter_mean.csv")
wantedTable := []string{
"1,\"x\x9c\xf3H\xcd\xc9\xc9W(\xcf/\xcaI\x01\\0\x18\xab\x04=\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\",1,\"ala\\\nhas a cat\\\n\",1\n",
"2,\"x\x9c\xf3\xc8\xcfIT\xc8-\xcdK\xc9\a\\0\x13\xfe\x03\xc8\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\",2,\"ala\\\ntiene un gato\\\\\\\n\r\\\n\",2\n",
"3,\"x\x9cs\xceL\xccW\xc8\xcd\xcfK\xc9\a\\0\x13\x88\x03\xba\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\",3,\"ala\\\nha un gatto\\\\n\\\n\",3\n",
"4,\"x\x9cs\xca\xcf\xcb\xca/-R\xc8\xcd\xcfKI\x05\\0#:\x05\x13\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\",3,\",,,ala \\\"\\\n,a un chat\",4\n",
}
checkWantedLines(t, lines, wantedTable)
}
func TestCSVReader2(t *testing.T) {
// mean.csvcsvsplitter_mean.csv was generated from
// csvsplitter_mean.csv and changing the ids into hex
lines := readLines(t, "csvsplitter_mean_hex.csv")
wantedTable := []string{
"\"78fe\",\"x\x9c\xf3H\xcd\xc9\xc9W(\xcf/\xcaI\x01\\0\x18\xab\x04=\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\",1,\"ala\\\nhas a cat\\\n\",1\n",
"\"34ef\",\"x\x9c\xf3\xc8\xcfIT\xc8-\xcdK\xc9\a\\0\x13\xfe\x03\xc8\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\",2,\"ala\\\ntiene un gato\\\\\\\n\r\\\n\",2\n",
"\"A4F6\",\"x\x9cs\xceL\xccW\xc8\xcd\xcfK\xc9\a\\0\x13\x88\x03\xba\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\",3,\"ala\\\nha un gatto\\\\n\\\n\",3\n",
"\"1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef\",\"x\x9cs\xca\xcf\xcb\xca/-R\xc8\xcd\xcfKI\x05\\0#:\x05\x13\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\",3,\",,,ala \\\"\\\n,a un chat\",4\n",
}
checkWantedLines(t, lines, wantedTable)
}

Просмотреть файл

@ -376,7 +376,9 @@ func (nhw *namedHasherWriter) SnapshotFiles() ([]SnapshotFile, error) {
return nhw.snapshotFiles, nil
}
func (mysqld *Mysqld) dumpTable(td TableDefinition, dbName, keyName string, keyType key.KeyspaceIdType, mainCloneSourcePath string, cloneSourcePaths map[key.KeyRange]string, maximumFilesize uint64) (map[key.KeyRange][]SnapshotFile, error) {
// dumpTableSplit will dump a table, and then split it according to keyspace_id
// into multiple files.
func (mysqld *Mysqld) dumpTableSplit(td TableDefinition, dbName, keyName string, keyType key.KeyspaceIdType, mainCloneSourcePath string, cloneSourcePaths map[key.KeyRange]string, maximumFilesize uint64) (map[key.KeyRange][]SnapshotFile, error) {
filename := path.Join(mainCloneSourcePath, td.Name+".csv")
selectIntoOutfile := `SELECT {{.KeyspaceIdColumnName}}, {{.Columns}} INTO OUTFILE "{{.TableOutputPath}}" CHARACTER SET binary FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '"' ESCAPED BY '\\' LINES TERMINATED BY '\n' FROM {{.TableName}}`
queryParams := map[string]string{
@ -451,11 +453,80 @@ func (mysqld *Mysqld) dumpTable(td TableDefinition, dbName, keyName string, keyT
return snapshotFiles, nil
}
// dumpTableFull will dump the contents of a full table, and then
// chunk it up in multiple compressed files.
func (mysqld *Mysqld) dumpTableFull(td TableDefinition, dbName, mainCloneSourcePath string, cloneSourcePath string, maximumFilesize uint64) ([]SnapshotFile, error) {
filename := path.Join(mainCloneSourcePath, td.Name+".csv")
selectIntoOutfile := `SELECT {{.Columns}} INTO OUTFILE "{{.TableOutputPath}}" CHARACTER SET binary FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '"' ESCAPED BY '\\' LINES TERMINATED BY '\n' FROM {{.TableName}}`
queryParams := map[string]string{
"TableName": dbName + "." + td.Name,
"Columns": strings.Join(td.Columns, ", "),
"TableOutputPath": filename,
}
sio, err := fillStringTemplate(selectIntoOutfile, queryParams)
if err != nil {
return nil, err
}
if err := mysqld.executeSuperQuery(sio); err != nil {
return nil, err
}
file, err := os.Open(filename)
if err != nil {
return nil, err
}
defer func() {
file.Close()
if e := os.Remove(filename); e != nil {
log.Errorf("Cannot remove %v: %v", filename, e)
}
}()
filenamePattern := path.Join(cloneSourcePath, td.Name+".%v.csv.gz")
hasherWriter, err := newCompressedNamedHasherWriter(filenamePattern, mysqld.SnapshotDir, td.Name, maximumFilesize)
if err != nil {
return nil, err
}
splitter := csvsplitter.NewCSVReader(file, ',')
for {
line, err := splitter.ReadRecord()
if err == io.EOF {
break
}
if err != nil {
return nil, err
}
_, err = hasherWriter.Write(line)
if err != nil {
return nil, err
}
}
return hasherWriter.SnapshotFiles()
}
// CreateMultiSnapshot create snapshots of the data.
// - for a resharding snapshot, keyRanges+keyName+keyType are set,
// and tables is empty. This action will create multiple snapshots,
// one per keyRange.
// - for a vertical split, tables is set, and keyRanges+keyName+keyType are
// empty. It will create a single snapshot of the contents of the tables.
// Note combinations of table subset and keyranges are not supported.
func (mysqld *Mysqld) CreateMultiSnapshot(keyRanges []key.KeyRange, dbName, keyName string, keyType key.KeyspaceIdType, sourceAddr string, allowHierarchicalReplication bool, snapshotConcurrency int, tables []string, skipSlaveRestart bool, maximumFilesize uint64, hookExtraEnv map[string]string) (snapshotManifestFilenames []string, err error) {
if dbName == "" {
err = fmt.Errorf("no database name provided")
return
}
if len(tables) > 0 {
if len(keyRanges) > 1 {
return nil, fmt.Errorf("Cannot have both tables and keyranges")
}
if len(keyRanges) == 1 && keyRanges[0].IsPartial() {
return nil, fmt.Errorf("Cannot have both tables and keyranges")
}
}
// same logic applies here
log.Infof("validateCloneSource")
@ -465,8 +536,12 @@ func (mysqld *Mysqld) CreateMultiSnapshot(keyRanges []key.KeyRange, dbName, keyN
// clean out and start fresh
cloneSourcePaths := make(map[key.KeyRange]string)
for _, keyRange := range keyRanges {
cloneSourcePaths[keyRange] = path.Join(mysqld.SnapshotDir, dataDir, dbName+"-"+string(keyRange.Start.Hex())+","+string(keyRange.End.Hex()))
if len(tables) > 0 {
cloneSourcePaths[key.KeyRange{}] = path.Join(mysqld.SnapshotDir, dataDir, dbName+"-tables")
} else {
for _, keyRange := range keyRanges {
cloneSourcePaths[keyRange] = path.Join(mysqld.SnapshotDir, dataDir, dbName+"-"+string(keyRange.Start.Hex())+","+string(keyRange.End.Hex()))
}
}
for _, _path := range cloneSourcePaths {
if err = os.RemoveAll(_path); err != nil {
@ -516,12 +591,18 @@ func (mysqld *Mysqld) CreateMultiSnapshot(keyRanges []key.KeyRange, dbName, keyN
// we just skip views here
return nil
}
snapshotFiles, err := mysqld.dumpTable(table, dbName, keyName, keyType, mainCloneSourcePath, cloneSourcePaths, maximumFilesize)
if err != nil {
return
if len(tables) > 0 {
sfs, err := mysqld.dumpTableFull(table, dbName, mainCloneSourcePath, cloneSourcePaths[key.KeyRange{}], maximumFilesize)
if err != nil {
return err
}
datafiles[i] = map[key.KeyRange][]SnapshotFile{
key.KeyRange{}: sfs,
}
} else {
datafiles[i], err = mysqld.dumpTableSplit(table, dbName, keyName, keyType, mainCloneSourcePath, cloneSourcePaths, maximumFilesize)
}
datafiles[i] = snapshotFiles
return nil
return
}
if err = ConcurrentMap(snapshotConcurrency, len(sd.TableDefinitions), dumpTableWorker); err != nil {
return

Просмотреть файл

@ -269,44 +269,5 @@ class TestClone(unittest.TestCase):
def test_vtctl_clone_server(self):
self._test_vtctl_clone(server_mode=True)
# this test is useful to validate the table specification code works.
# it will be replaced soon by a vertical split test in resharding.
def test_multisnapshot_vtctl(self):
populate = sum([[
"insert into vt_insert_test_%s (msg) values ('test %s')" % (i, x)
for x in xrange(4)] for i in range(6)], [])
create = ['''create table vt_insert_test_%s (
id bigint auto_increment,
msg varchar(64),
primary key (id)
) Engine=InnoDB''' % i for i in range(6)]
# Start up a master mysql and vttablet
utils.run_vtctl(['CreateKeyspace',
'--sharding_column_name', 'id',
'--sharding_column_type', 'uint64',
'test_keyspace'])
tablet_62344.init_tablet('master', 'test_keyspace', '0')
utils.run_vtctl(['RebuildShardGraph', 'test_keyspace/0'])
utils.validate_topology()
tablet_62344.populate('vt_test_keyspace', create,
populate)
tablet_62344.start_vttablet()
utils.run_vtctl(['MultiSnapshot', '--force', '--tables=vt_insert_test_1,vt_insert_test_2,vt_insert_test_3', '--spec=-0000000000000003-', tablet_62344.tablet_alias])
if os.path.exists(os.path.join(environment.vtdataroot, 'snapshot/vt_0000062344/data/vt_test_keyspace-,0000000000000003/vt_insert_test_4.0.csv.gz')):
self.fail("Table vt_insert_test_4 wasn't supposed to be dumped.")
for kr in 'vt_test_keyspace-,0000000000000003', 'vt_test_keyspace-0000000000000003,':
path = os.path.join(environment.vtdataroot, 'snapshot/vt_0000062344/data/', kr, 'vt_insert_test_1.0.csv.gz')
with gzip.open(path) as f:
if len(f.readlines()) != 2:
self.fail("Data looks wrong in %s" % path)
tablet_62344.kill_vttablet()
if __name__ == '__main__':
utils.main()

Просмотреть файл

@ -410,7 +410,7 @@ primary key (name)
shard_1_slave2.init_tablet('spare', 'test_keyspace', '80-')
shard_1_rdonly.init_tablet('rdonly', 'test_keyspace', '80-')
utils.run_vtctl('RebuildKeyspaceGraph test_keyspace', auto_log=True)
utils.run_vtctl(['RebuildKeyspaceGraph', 'test_keyspace'], auto_log=True)
# create databases so vttablet can start behaving normally
for t in [shard_0_master, shard_0_replica, shard_1_master, shard_1_slave1,
@ -427,10 +427,10 @@ primary key (name)
shard_1_rdonly.wait_for_vttablet_state('SERVING')
# reparent to make the tablets work
utils.run_vtctl('ReparentShard -force test_keyspace/-80 ' +
shard_0_master.tablet_alias, auto_log=True)
utils.run_vtctl('ReparentShard -force test_keyspace/80- ' +
shard_1_master.tablet_alias, auto_log=True)
utils.run_vtctl(['ReparentShard', '-force', 'test_keyspace/-80',
shard_0_master.tablet_alias], auto_log=True)
utils.run_vtctl(['ReparentShard', '-force', 'test_keyspace/80-',
shard_1_master.tablet_alias], auto_log=True)
# create the tables
self._create_schema()
@ -456,12 +456,12 @@ primary key (name)
shard_3_replica.wait_for_vttablet_state('NOT_SERVING')
shard_3_rdonly.wait_for_vttablet_state('CONNECTING')
utils.run_vtctl('ReparentShard -force test_keyspace/80-C0 ' +
shard_2_master.tablet_alias, auto_log=True)
utils.run_vtctl('ReparentShard -force test_keyspace/C0- ' +
shard_3_master.tablet_alias, auto_log=True)
utils.run_vtctl(['ReparentShard', '-force', 'test_keyspace/80-C0',
shard_2_master.tablet_alias], auto_log=True)
utils.run_vtctl(['ReparentShard', '-force', 'test_keyspace/C0-',
shard_3_master.tablet_alias], auto_log=True)
utils.run_vtctl('RebuildKeyspaceGraph -use-served-types test_keyspace',
utils.run_vtctl(['RebuildKeyspaceGraph', '-use-served-types', 'test_keyspace'],
auto_log=True)
self._check_srv_keyspace('test_nj', 'test_keyspace',
'Partitions(master): -80 80-\n' +
@ -470,8 +470,8 @@ primary key (name)
'TabletTypes: master,rdonly,replica')
# take the snapshot for the split
utils.run_vtctl('MultiSnapshot --spec=80-C0- %s' %
(shard_1_slave1.tablet_alias), auto_log=True)
utils.run_vtctl(['MultiSnapshot', '--spec=80-C0-',
shard_1_slave1.tablet_alias], auto_log=True)
# wait for tablet's binlog server service to be enabled after snapshot,
# and check all the others while we're at it
@ -489,7 +489,7 @@ primary key (name)
self._check_startup_values()
# check the schema too
utils.run_vtctl('ValidateSchemaKeyspace test_keyspace', auto_log=True)
utils.run_vtctl(['ValidateSchemaKeyspace', 'test_keyspace'], auto_log=True)
# check the binlog players are running
self._wait_for_binlog_player_count(shard_2_master, 1)
@ -543,11 +543,11 @@ primary key (name)
self._check_lots_timeout(1000, 80, 5, base=1000)
# check we can't migrate the master just yet
utils.run_vtctl('MigrateServedTypes test_keyspace/80- master',
utils.run_vtctl(['MigrateServedTypes', 'test_keyspace/80-', 'master'],
expect_fail=True)
# now serve rdonly from the split shards
utils.run_vtctl('MigrateServedTypes test_keyspace/80- rdonly',
utils.run_vtctl(['MigrateServedTypes', 'test_keyspace/80-', 'rdonly'],
auto_log=True)
self._check_srv_keyspace('test_nj', 'test_keyspace',
'Partitions(master): -80 80-\n' +
@ -556,7 +556,7 @@ primary key (name)
'TabletTypes: master,rdonly,replica')
# then serve replica from the split shards
utils.run_vtctl('MigrateServedTypes test_keyspace/80- replica',
utils.run_vtctl(['MigrateServedTypes', 'test_keyspace/80-', 'replica'],
auto_log=True)
self._check_srv_keyspace('test_nj', 'test_keyspace',
'Partitions(master): -80 80-\n' +
@ -565,14 +565,14 @@ primary key (name)
'TabletTypes: master,rdonly,replica')
# move replica back and forth
utils.run_vtctl('MigrateServedTypes -reverse test_keyspace/80- replica',
utils.run_vtctl(['MigrateServedTypes', '-reverse', 'test_keyspace/80-', 'replica'],
auto_log=True)
self._check_srv_keyspace('test_nj', 'test_keyspace',
'Partitions(master): -80 80-\n' +
'Partitions(rdonly): -80 80-C0 C0-\n' +
'Partitions(replica): -80 80-\n' +
'TabletTypes: master,rdonly,replica')
utils.run_vtctl('MigrateServedTypes test_keyspace/80- replica',
utils.run_vtctl(['MigrateServedTypes', 'test_keyspace/80-', 'replica'],
auto_log=True)
self._check_srv_keyspace('test_nj', 'test_keyspace',
'Partitions(master): -80 80-\n' +
@ -613,7 +613,7 @@ primary key (name)
monitor_thread_2.lag_sum / monitor_thread_2.sample_count)
# then serve master from the split shards
utils.run_vtctl('MigrateServedTypes test_keyspace/80- master',
utils.run_vtctl(['MigrateServedTypes', 'test_keyspace/80-', 'master'],
auto_log=True)
self._check_srv_keyspace('test_nj', 'test_keyspace',
'Partitions(master): -80 80-C0 C0-\n' +

Просмотреть файл

@ -385,7 +385,7 @@ class TestTabletManager(unittest.TestCase):
sp = utils.run_bg(args, stdout=PIPE, stderr=PIPE)
# wait for it to start, and let's kill it
time.sleep(2.0)
time.sleep(4.0)
utils.run(['pkill', 'vtaction'])
out, err = sp.communicate()

142
test/vertical_split.py Executable file
Просмотреть файл

@ -0,0 +1,142 @@
#!/usr/bin/python
#
# Copyright 2013, Google Inc. All rights reserved.
# Use of this source code is governed by a BSD-style license that can
# be found in the LICENSE file.
import logging
import threading
import struct
import time
import unittest
import environment
import utils
import tablet
# source keyspace, with 4 tables
source_master = tablet.Tablet()
source_replica = tablet.Tablet()
source_rdonly = tablet.Tablet()
# destination keyspace, with just two tables
destination_master = tablet.Tablet()
destination_replica = tablet.Tablet()
destination_rdonly = tablet.Tablet()
def setUpModule():
try:
environment.topo_server_setup()
setup_procs = [
source_master.init_mysql(),
source_replica.init_mysql(),
source_rdonly.init_mysql(),
destination_master.init_mysql(),
destination_replica.init_mysql(),
destination_rdonly.init_mysql(),
]
utils.wait_procs(setup_procs)
except:
tearDownModule()
raise
def tearDownModule():
if utils.options.skip_teardown:
return
teardown_procs = [
source_master.teardown_mysql(),
source_replica.teardown_mysql(),
source_rdonly.teardown_mysql(),
destination_master.teardown_mysql(),
destination_replica.teardown_mysql(),
destination_rdonly.teardown_mysql(),
]
utils.wait_procs(teardown_procs, raise_on_error=False)
environment.topo_server_teardown()
utils.kill_sub_processes()
utils.remove_tmp_files()
source_master.remove_tree()
source_replica.remove_tree()
source_rdonly.remove_tree()
destination_master.remove_tree()
destination_replica.remove_tree()
destination_rdonly.remove_tree()
class TestVerticalSplit(unittest.TestCase):
def _create_source_schema(self):
create_table_template = '''create table %s(
id bigint auto_increment,
msg varchar(64),
primary key (id),
index by_msg (msg)
) Engine=InnoDB'''
create_view_template = '''create view %s(id, msg) as select id, msg from %s'''
for t in ['moving1', 'moving2', 'staying1', 'staying2']:
utils.run_vtctl(['ApplySchemaKeyspace',
'-simple',
'-sql=' + create_table_template % (t),
'source_keyspace'],
auto_log=True)
utils.run_vtctl(['ApplySchemaKeyspace',
'-simple',
'-sql=' + create_view_template % ('view1', 'moving1'),
'source_keyspace'],
auto_log=True)
def test_vertical_split(self):
utils.run_vtctl(['CreateKeyspace',
'source_keyspace'])
utils.run_vtctl(['CreateKeyspace',
'destination_keyspace'])
source_master.init_tablet('master', 'source_keyspace', '0')
source_replica.init_tablet('replica', 'source_keyspace', '0')
source_rdonly.init_tablet('rdonly', 'source_keyspace', '0')
destination_master.init_tablet('master', 'destination_keyspace', '0')
destination_replica.init_tablet('replica', 'destination_keyspace', '0')
destination_rdonly.init_tablet('rdonly', 'destination_keyspace', '0')
utils.run_vtctl(['RebuildKeyspaceGraph', 'source_keyspace'], auto_log=True)
utils.run_vtctl(['RebuildKeyspaceGraph', 'destination_keyspace'],
auto_log=True)
# create databases so vttablet can start behaving normally
for t in [source_master, source_replica, source_rdonly]:
t.create_db('vt_source_keyspace')
t.start_vttablet(wait_for_state=None)
for t in [destination_master, destination_replica, destination_rdonly]:
t.start_vttablet(wait_for_state=None)
# wait for the tablets
for t in [source_master, source_replica, source_rdonly]:
t.wait_for_vttablet_state('SERVING')
for t in [destination_master, destination_replica, destination_rdonly]:
t.wait_for_vttablet_state('CONNECTING')
# reparent to make the tablets work
utils.run_vtctl(['ReparentShard', '-force', 'source_keyspace/0',
source_master.tablet_alias], auto_log=True)
utils.run_vtctl(['ReparentShard', '-force', 'destination_keyspace/0',
destination_master.tablet_alias], auto_log=True)
# create the schema on the source keyspace
self._create_source_schema()
# take the snapshot for the split
utils.run_vtctl(['MultiSnapshot', '--tables=moving1,moving2,view1',
source_rdonly.tablet_alias], auto_log=True)
# kill everything
tablet.kill_tablets([source_master, source_replica, source_rdonly,
destination_master, destination_replica,
destination_rdonly])
if __name__ == '__main__':
utils.main()