diff --git a/go/cmd/vtcombo/tablet_map.go b/go/cmd/vtcombo/tablet_map.go index 6628a993bb..c62d80e9f5 100644 --- a/go/cmd/vtcombo/tablet_map.go +++ b/go/cmd/vtcombo/tablet_map.go @@ -10,6 +10,7 @@ import ( "time" log "github.com/golang/glog" + "github.com/golang/protobuf/proto" "golang.org/x/net/context" "github.com/youtube/vitess/go/sqltypes" @@ -149,7 +150,7 @@ func initTabletMap(ts topo.Server, tpb *vttestpb.VTTestTopology, mysqld mysqlctl for _, cell := range tpb.Cells { dbname := spb.DbNameOverride if dbname == "" { - dbname = fmt.Sprintf("vt_%v_%v_%v", cell, keyspace, shard) + dbname = fmt.Sprintf("vt_%v_%v", keyspace, shard) } dbcfgs.App.DbName = dbname @@ -347,6 +348,7 @@ func (itc *internalTabletConn) StreamExecute(ctx context.Context, target *queryp result <- reply.Copy() return nil }) + finalErr = tabletconn.TabletErrorFromGRPC(vterrors.ToGRPCError(finalErr)) // the client will only access finalErr after the // channel is closed, and then it's already set. @@ -485,9 +487,42 @@ func (itc *internalTabletConn) StreamHealth(ctx context.Context) (tabletconn.Str }, nil } +type updateStreamAdapter struct { + c chan *querypb.StreamEvent + err *error +} + +func (a *updateStreamAdapter) Recv() (*querypb.StreamEvent, error) { + r, ok := <-a.c + if !ok { + if *a.err == nil { + return nil, io.EOF + } + return nil, *a.err + } + return r, nil +} + // UpdateStream is part of tabletconn.TabletConn. Not implemented here. func (itc *internalTabletConn) UpdateStream(ctx context.Context, target *querypb.Target, position string, timestamp int64) (tabletconn.StreamEventReader, error) { - return nil, fmt.Errorf("not implemented in vtcombo") + result := make(chan *querypb.StreamEvent, 10) + var finalErr error + + go func() { + finalErr = itc.tablet.qsc.QueryService().UpdateStream(ctx, target, position, timestamp, func(reply *querypb.StreamEvent) error { + // We need to deep-copy the reply before returning, + // because the underlying buffers are reused. + result <- proto.Clone(reply).(*querypb.StreamEvent) + return nil + }) + finalErr = tabletconn.TabletErrorFromGRPC(vterrors.ToGRPCError(finalErr)) + + // the client will only access finalErr after the + // channel is closed, and then it's already set. + close(result) + }() + + return &updateStreamAdapter{result, &finalErr}, nil } // diff --git a/py/vttest/local_database.py b/py/vttest/local_database.py index 9f0942ee92..2021c0497c 100644 --- a/py/vttest/local_database.py +++ b/py/vttest/local_database.py @@ -139,12 +139,11 @@ class LocalDatabase(object): # redirected keyspaces have no underlying database continue - for cell in self.topology.cells: - for spb in kpb.shards: - db_name = spb.db_name_override - if not db_name: - db_name = 'vt_%s_%s_%s' % (cell, kpb.name, spb.name) - cmds.append('create database `%s`' % db_name) + for spb in kpb.shards: + db_name = spb.db_name_override + if not db_name: + db_name = 'vt_%s_%s' % (kpb.name, spb.name) + cmds.append('create database `%s`' % db_name) logging.info('Creating databases') self.mysql_execute(cmds) @@ -185,12 +184,11 @@ class LocalDatabase(object): cmds = self.get_sql_commands_from_file(filepath, schema_dir) # Run the cmds on each shard and cell in the keyspace. - for cell in self.topology.cells: - for spb in kpb.shards: - db_name = spb.db_name_override - if not db_name: - db_name = 'vt_%s_%s_%s' % (cell, kpb.name, spb.name) - self.mysql_execute(cmds, db_name=db_name) + for spb in kpb.shards: + db_name = spb.db_name_override + if not db_name: + db_name = 'vt_%s_%s' % (kpb.name, spb.name) + self.mysql_execute(cmds, db_name=db_name) def populate_with_random_data(self): """Populates all shards with randomly generated data.""" @@ -200,12 +198,11 @@ class LocalDatabase(object): # redirected keyspaces have no underlying database continue - for cell in self.topology.cells: - for spb in kpb.shards: - db_name = spb.db_name_override - if not db_name: - db_name = 'vt_%s_%s_%s' % (cell, kpb.name, spb.name) - self.populate_shard_with_random_data(db_name) + for spb in kpb.shards: + db_name = spb.db_name_override + if not db_name: + db_name = 'vt_%s_%s' % (kpb.name, spb.name) + self.populate_shard_with_random_data(db_name) def populate_shard_with_random_data(self, db_name): """Populates the given database with randomly generated data. diff --git a/test/vttest_sample_test.py b/test/vttest_sample_test.py index 270b8f3ecf..d750dbda85 100755 --- a/test/vttest_sample_test.py +++ b/test/vttest_sample_test.py @@ -26,17 +26,20 @@ import json import os import struct import subprocess +import time import urllib import unittest from google.protobuf import text_format +from vtproto import topodata_pb2 from vtproto import vttest_pb2 +from vtdb import dbexceptions +from vtdb import proto3_encoding from vtdb import vtgate_client from vtdb import vtgate_cursor -from vtdb import dbexceptions import utils import environment @@ -96,6 +99,11 @@ class TestMysqlctl(unittest.TestCase): 'using protocol: %s.\n' 'Press enter to continue.' % (vtgate_addr, protocol)) + # Remember the current timestamp after we sleep for a bit, so we + # can use it for UpdateStream later. + time.sleep(2) + before_insert = long(time.time()) + # Connect to vtgate. conn = vtgate_client.connect(protocol, vtgate_addr, conn_timeout) @@ -138,6 +146,7 @@ class TestMysqlctl(unittest.TestCase): cursor.begin() for i in xrange(id_start, id_start+rowcount): bind_variables['id'] = i + bind_variables['keyspace_id'] = get_keyspace_id(i) cursor.execute(insert, bind_variables) cursor.commit() cursor.close() @@ -165,6 +174,55 @@ class TestMysqlctl(unittest.TestCase): self.assertEqual(result[0][1], 'test 123') cursor.close() + # Try to get the update stream from the connection. This makes + # sure that part works as well. + count = 0 + for (event, _) in conn.update_stream( + 'test_keyspace', topodata_pb2.MASTER, + timestamp=before_insert, + shard='-80'): + for statement in event.statements: + if statement.table_name == 'test_table': + count += 1 + if count == rowcount + 1: + # We're getting the initial value, plus the 500 updates. + break + + # Insert a sentinel value into the second shard. + row_id = 0x8100000000000000 + keyspace_id = get_keyspace_id(row_id) + cursor = conn.cursor( + tablet_type='master', keyspace='test_keyspace', + keyspace_ids=[pack_kid(keyspace_id)], + writable=True) + cursor.begin() + bind_variables = { + 'id': row_id, + 'msg': 'test %s' % row_id, + 'keyspace_id': keyspace_id, + } + cursor.execute(insert, bind_variables) + cursor.commit() + cursor.close() + + # Try to connect to an update stream on the other shard. + # We may get some random update stream events, but we should not get any + # event that's related to the first shard. Only events related to + # the Insert we just did. + found = False + for (event, _) in conn.update_stream( + 'test_keyspace', topodata_pb2.MASTER, + timestamp=before_insert, + shard='80-'): + for statement in event.statements: + self.assertEqual(statement.table_name, 'test_table') + fields, rows = proto3_encoding.convert_stream_event_statement(statement) + self.assertEqual(fields[0], 'id') + self.assertEqual(rows[0][0], row_id) + found = True + if found: + break + # Clean up the connection conn.close() diff --git a/test/vttest_schema/default/test_table.sql b/test/vttest_schema/default/test_table.sql index eaacf34130..72e6b10edb 100644 --- a/test/vttest_schema/default/test_table.sql +++ b/test/vttest_schema/default/test_table.sql @@ -1,5 +1,5 @@ CREATE TABLE test_table ( - `id` BIGINT(20) NOT NULL, + `id` BIGINT(20) UNSIGNED NOT NULL, `msg` VARCHAR(64), `keyspace_id` BIGINT(20) UNSIGNED NOT NULL, PRIMARY KEY (id) diff --git a/test/vttest_schema/test_keyspace/test_table.sql b/test/vttest_schema/test_keyspace/test_table.sql index eaacf34130..72e6b10edb 100644 --- a/test/vttest_schema/test_keyspace/test_table.sql +++ b/test/vttest_schema/test_keyspace/test_table.sql @@ -1,5 +1,5 @@ CREATE TABLE test_table ( - `id` BIGINT(20) NOT NULL, + `id` BIGINT(20) UNSIGNED NOT NULL, `msg` VARCHAR(64), `keyspace_id` BIGINT(20) UNSIGNED NOT NULL, PRIMARY KEY (id)