vtcombo fixes: UpdateStream and dbname.

Plumbing UpdateStream through for vtcombo. Adding a test for it in
vttest_smaple_test.py.

Fixing the db name for vtcombo databases: it should not include the
cell. Not in practice vtgate is only connecting to the master cell, so
it didn't matter much, but this is more correct.
This commit is contained in:
Alain Jobart 2016-09-08 09:04:37 -07:00
Родитель 971b2ed6b2
Коммит 40ff7cb517
5 изменённых файлов: 113 добавлений и 23 удалений

Просмотреть файл

@ -10,6 +10,7 @@ import (
"time"
log "github.com/golang/glog"
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
"github.com/youtube/vitess/go/sqltypes"
@ -149,7 +150,7 @@ func initTabletMap(ts topo.Server, tpb *vttestpb.VTTestTopology, mysqld mysqlctl
for _, cell := range tpb.Cells {
dbname := spb.DbNameOverride
if dbname == "" {
dbname = fmt.Sprintf("vt_%v_%v_%v", cell, keyspace, shard)
dbname = fmt.Sprintf("vt_%v_%v", keyspace, shard)
}
dbcfgs.App.DbName = dbname
@ -347,6 +348,7 @@ func (itc *internalTabletConn) StreamExecute(ctx context.Context, target *queryp
result <- reply.Copy()
return nil
})
finalErr = tabletconn.TabletErrorFromGRPC(vterrors.ToGRPCError(finalErr))
// the client will only access finalErr after the
// channel is closed, and then it's already set.
@ -485,9 +487,42 @@ func (itc *internalTabletConn) StreamHealth(ctx context.Context) (tabletconn.Str
}, nil
}
type updateStreamAdapter struct {
c chan *querypb.StreamEvent
err *error
}
func (a *updateStreamAdapter) Recv() (*querypb.StreamEvent, error) {
r, ok := <-a.c
if !ok {
if *a.err == nil {
return nil, io.EOF
}
return nil, *a.err
}
return r, nil
}
// UpdateStream is part of tabletconn.TabletConn. Not implemented here.
func (itc *internalTabletConn) UpdateStream(ctx context.Context, target *querypb.Target, position string, timestamp int64) (tabletconn.StreamEventReader, error) {
return nil, fmt.Errorf("not implemented in vtcombo")
result := make(chan *querypb.StreamEvent, 10)
var finalErr error
go func() {
finalErr = itc.tablet.qsc.QueryService().UpdateStream(ctx, target, position, timestamp, func(reply *querypb.StreamEvent) error {
// We need to deep-copy the reply before returning,
// because the underlying buffers are reused.
result <- proto.Clone(reply).(*querypb.StreamEvent)
return nil
})
finalErr = tabletconn.TabletErrorFromGRPC(vterrors.ToGRPCError(finalErr))
// the client will only access finalErr after the
// channel is closed, and then it's already set.
close(result)
}()
return &updateStreamAdapter{result, &finalErr}, nil
}
//

Просмотреть файл

@ -139,11 +139,10 @@ class LocalDatabase(object):
# redirected keyspaces have no underlying database
continue
for cell in self.topology.cells:
for spb in kpb.shards:
db_name = spb.db_name_override
if not db_name:
db_name = 'vt_%s_%s_%s' % (cell, kpb.name, spb.name)
db_name = 'vt_%s_%s' % (kpb.name, spb.name)
cmds.append('create database `%s`' % db_name)
logging.info('Creating databases')
self.mysql_execute(cmds)
@ -185,11 +184,10 @@ class LocalDatabase(object):
cmds = self.get_sql_commands_from_file(filepath, schema_dir)
# Run the cmds on each shard and cell in the keyspace.
for cell in self.topology.cells:
for spb in kpb.shards:
db_name = spb.db_name_override
if not db_name:
db_name = 'vt_%s_%s_%s' % (cell, kpb.name, spb.name)
db_name = 'vt_%s_%s' % (kpb.name, spb.name)
self.mysql_execute(cmds, db_name=db_name)
def populate_with_random_data(self):
@ -200,11 +198,10 @@ class LocalDatabase(object):
# redirected keyspaces have no underlying database
continue
for cell in self.topology.cells:
for spb in kpb.shards:
db_name = spb.db_name_override
if not db_name:
db_name = 'vt_%s_%s_%s' % (cell, kpb.name, spb.name)
db_name = 'vt_%s_%s' % (kpb.name, spb.name)
self.populate_shard_with_random_data(db_name)
def populate_shard_with_random_data(self, db_name):

Просмотреть файл

@ -26,17 +26,20 @@ import json
import os
import struct
import subprocess
import time
import urllib
import unittest
from google.protobuf import text_format
from vtproto import topodata_pb2
from vtproto import vttest_pb2
from vtdb import dbexceptions
from vtdb import proto3_encoding
from vtdb import vtgate_client
from vtdb import vtgate_cursor
from vtdb import dbexceptions
import utils
import environment
@ -96,6 +99,11 @@ class TestMysqlctl(unittest.TestCase):
'using protocol: %s.\n'
'Press enter to continue.' % (vtgate_addr, protocol))
# Remember the current timestamp after we sleep for a bit, so we
# can use it for UpdateStream later.
time.sleep(2)
before_insert = long(time.time())
# Connect to vtgate.
conn = vtgate_client.connect(protocol, vtgate_addr, conn_timeout)
@ -138,6 +146,7 @@ class TestMysqlctl(unittest.TestCase):
cursor.begin()
for i in xrange(id_start, id_start+rowcount):
bind_variables['id'] = i
bind_variables['keyspace_id'] = get_keyspace_id(i)
cursor.execute(insert, bind_variables)
cursor.commit()
cursor.close()
@ -165,6 +174,55 @@ class TestMysqlctl(unittest.TestCase):
self.assertEqual(result[0][1], 'test 123')
cursor.close()
# Try to get the update stream from the connection. This makes
# sure that part works as well.
count = 0
for (event, _) in conn.update_stream(
'test_keyspace', topodata_pb2.MASTER,
timestamp=before_insert,
shard='-80'):
for statement in event.statements:
if statement.table_name == 'test_table':
count += 1
if count == rowcount + 1:
# We're getting the initial value, plus the 500 updates.
break
# Insert a sentinel value into the second shard.
row_id = 0x8100000000000000
keyspace_id = get_keyspace_id(row_id)
cursor = conn.cursor(
tablet_type='master', keyspace='test_keyspace',
keyspace_ids=[pack_kid(keyspace_id)],
writable=True)
cursor.begin()
bind_variables = {
'id': row_id,
'msg': 'test %s' % row_id,
'keyspace_id': keyspace_id,
}
cursor.execute(insert, bind_variables)
cursor.commit()
cursor.close()
# Try to connect to an update stream on the other shard.
# We may get some random update stream events, but we should not get any
# event that's related to the first shard. Only events related to
# the Insert we just did.
found = False
for (event, _) in conn.update_stream(
'test_keyspace', topodata_pb2.MASTER,
timestamp=before_insert,
shard='80-'):
for statement in event.statements:
self.assertEqual(statement.table_name, 'test_table')
fields, rows = proto3_encoding.convert_stream_event_statement(statement)
self.assertEqual(fields[0], 'id')
self.assertEqual(rows[0][0], row_id)
found = True
if found:
break
# Clean up the connection
conn.close()

Просмотреть файл

@ -1,5 +1,5 @@
CREATE TABLE test_table (
`id` BIGINT(20) NOT NULL,
`id` BIGINT(20) UNSIGNED NOT NULL,
`msg` VARCHAR(64),
`keyspace_id` BIGINT(20) UNSIGNED NOT NULL,
PRIMARY KEY (id)

Просмотреть файл

@ -1,5 +1,5 @@
CREATE TABLE test_table (
`id` BIGINT(20) NOT NULL,
`id` BIGINT(20) UNSIGNED NOT NULL,
`msg` VARCHAR(64),
`keyspace_id` BIGINT(20) UNSIGNED NOT NULL,
PRIMARY KEY (id)