update_stream.py now uses only vtctl / vtgate.

Adding missing APIs to vtctl/s query.go.
This commit is contained in:
Alain Jobart 2015-08-17 15:49:12 -07:00
Родитель bf388091a2
Коммит 6d36cd24c4
3 изменённых файлов: 188 добавлений и 27 удалений

Просмотреть файл

@ -8,6 +8,7 @@ import (
"encoding/json" "encoding/json"
"flag" "flag"
"fmt" "fmt"
"strconv"
"strings" "strings"
"time" "time"
@ -50,6 +51,21 @@ func init() {
commandVtTabletExecute, commandVtTabletExecute,
"[-bind_variables <JSON map>] [-connect_timeout <connect timeout>] [-transaction_id <transaction_id>] [-tablet_type <tablet_type>] -keyspace <keyspace> -shard <shard> <tablet alias> <sql>", "[-bind_variables <JSON map>] [-connect_timeout <connect timeout>] [-transaction_id <transaction_id>] [-tablet_type <tablet_type>] -keyspace <keyspace> -shard <shard> <tablet alias> <sql>",
"Executes the given query on the given tablet."}) "Executes the given query on the given tablet."})
addCommand(queriesGroupName, command{
"VtTabletBegin",
commandVtTabletBegin,
"[-connect_timeout <connect timeout>] [-tablet_type <tablet_type>] -keyspace <keyspace> -shard <shard> <tablet alias>",
"Starts a transaction on the provided server."})
addCommand(queriesGroupName, command{
"VtTabletCommit",
commandVtTabletCommit,
"[-connect_timeout <connect timeout>] [-tablet_type <tablet_type>] -keyspace <keyspace> -shard <shard> <tablet alias> <transaction_id>",
"Commits a transaction on the provided server."})
addCommand(queriesGroupName, command{
"VtTabletRollback",
commandVtTabletRollback,
"[-connect_timeout <connect timeout>] [-tablet_type <tablet_type>] -keyspace <keyspace> -shard <shard> <tablet alias> <transaction_id>",
"Rollbacks a transaction on the provided server."})
addCommand(queriesGroupName, command{ addCommand(queriesGroupName, command{
"VtTabletStreamHealth", "VtTabletStreamHealth",
commandVtTabletStreamHealth, commandVtTabletStreamHealth,
@ -196,7 +212,7 @@ func commandVtTabletExecute(ctx context.Context, wr *wrangler.Wrangler, subFlags
return err return err
} }
if subFlags.NArg() != 2 { if subFlags.NArg() != 2 {
return fmt.Errorf("the <tablet_alis> and <sql> arguments are required for the VtTabletExecute command") return fmt.Errorf("the <tablet_alias> and <sql> arguments are required for the VtTabletExecute command")
} }
tt, err := topo.ParseTabletType(*tabletType) tt, err := topo.ParseTabletType(*tabletType)
if err != nil { if err != nil {
@ -228,6 +244,132 @@ func commandVtTabletExecute(ctx context.Context, wr *wrangler.Wrangler, subFlags
return printJSON(wr, qr) return printJSON(wr, qr)
} }
func commandVtTabletBegin(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
keyspace := subFlags.String("keyspace", "", "keyspace the tablet belongs to")
shard := subFlags.String("shard", "", "shard the tablet belongs to")
tabletType := subFlags.String("tablet_type", "unknown", "tablet type we expect from the tablet (use unknown to use sessionId)")
connectTimeout := subFlags.Duration("connect_timeout", 30*time.Second, "Connection timeout for vttablet client")
if err := subFlags.Parse(args); err != nil {
return err
}
if subFlags.NArg() != 1 {
return fmt.Errorf("the <tablet_alias> argument is required for the VtTabletBegin command")
}
tt, err := topo.ParseTabletType(*tabletType)
if err != nil {
return err
}
tabletAlias, err := topo.ParseTabletAliasString(subFlags.Arg(0))
if err != nil {
return err
}
tabletInfo, err := wr.TopoServer().GetTablet(ctx, tabletAlias)
if err != nil {
return err
}
ep, err := topo.TabletEndPoint(tabletInfo.Tablet)
if err != nil {
return fmt.Errorf("cannot get EndPoint from tablet record: %v", err)
}
conn, err := tabletconn.GetDialer()(ctx, ep, *keyspace, *shard, tt, *connectTimeout)
if err != nil {
return fmt.Errorf("cannot connect to tablet %v: %v", tabletAlias, err)
}
defer conn.Close()
transactionID, err := conn.Begin(ctx)
if err != nil {
return fmt.Errorf("Begin failed: %v", err)
}
result := map[string]int64{
"transaction_id": transactionID,
}
return printJSON(wr, result)
}
func commandVtTabletCommit(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
keyspace := subFlags.String("keyspace", "", "keyspace the tablet belongs to")
shard := subFlags.String("shard", "", "shard the tablet belongs to")
tabletType := subFlags.String("tablet_type", "unknown", "tablet type we expect from the tablet (use unknown to use sessionId)")
connectTimeout := subFlags.Duration("connect_timeout", 30*time.Second, "Connection timeout for vttablet client")
if err := subFlags.Parse(args); err != nil {
return err
}
if subFlags.NArg() != 2 {
return fmt.Errorf("the <tablet_alias> and <transaction_id> arguments are required for the VtTabletCommit command")
}
transactionID, err := strconv.ParseInt(subFlags.Arg(1), 10, 64)
if err != nil {
return err
}
tt, err := topo.ParseTabletType(*tabletType)
if err != nil {
return err
}
tabletAlias, err := topo.ParseTabletAliasString(subFlags.Arg(0))
if err != nil {
return err
}
tabletInfo, err := wr.TopoServer().GetTablet(ctx, tabletAlias)
if err != nil {
return err
}
ep, err := topo.TabletEndPoint(tabletInfo.Tablet)
if err != nil {
return fmt.Errorf("cannot get EndPoint from tablet record: %v", err)
}
conn, err := tabletconn.GetDialer()(ctx, ep, *keyspace, *shard, tt, *connectTimeout)
if err != nil {
return fmt.Errorf("cannot connect to tablet %v: %v", tabletAlias, err)
}
defer conn.Close()
return conn.Commit(ctx, transactionID)
}
func commandVtTabletRollback(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
keyspace := subFlags.String("keyspace", "", "keyspace the tablet belongs to")
shard := subFlags.String("shard", "", "shard the tablet belongs to")
tabletType := subFlags.String("tablet_type", "unknown", "tablet type we expect from the tablet (use unknown to use sessionId)")
connectTimeout := subFlags.Duration("connect_timeout", 30*time.Second, "Connection timeout for vttablet client")
if err := subFlags.Parse(args); err != nil {
return err
}
if subFlags.NArg() != 2 {
return fmt.Errorf("the <tablet_alias> and <transaction_id> arguments are required for the VtTabletRollback command")
}
transactionID, err := strconv.ParseInt(subFlags.Arg(1), 10, 64)
if err != nil {
return err
}
tt, err := topo.ParseTabletType(*tabletType)
if err != nil {
return err
}
tabletAlias, err := topo.ParseTabletAliasString(subFlags.Arg(0))
if err != nil {
return err
}
tabletInfo, err := wr.TopoServer().GetTablet(ctx, tabletAlias)
if err != nil {
return err
}
ep, err := topo.TabletEndPoint(tabletInfo.Tablet)
if err != nil {
return fmt.Errorf("cannot get EndPoint from tablet record: %v", err)
}
conn, err := tabletconn.GetDialer()(ctx, ep, *keyspace, *shard, tt, *connectTimeout)
if err != nil {
return fmt.Errorf("cannot connect to tablet %v: %v", tabletAlias, err)
}
defer conn.Close()
return conn.Rollback(ctx, transactionID)
}
func commandVtTabletStreamHealth(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { func commandVtTabletStreamHealth(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
count := subFlags.Int("count", 1, "number of responses to wait for") count := subFlags.Int("count", 1, "number of responses to wait for")
connectTimeout := subFlags.Duration("connect_timeout", 30*time.Second, "Connection timeout for vttablet client") connectTimeout := subFlags.Duration("connect_timeout", 30*time.Second, "Connection timeout for vttablet client")

Просмотреть файл

@ -713,6 +713,42 @@ class Tablet(object):
args.extend([self.tablet_alias, sql]) args.extend([self.tablet_alias, sql])
return utils.run_vtctl_json(args, auto_log=auto_log) return utils.run_vtctl_json(args, auto_log=auto_log)
def begin(self, auto_log=True):
"""begin uses 'vtctl VtTabletBegin' to start a transaction.
"""
args = [
'VtTabletBegin',
'-keyspace', self.keyspace,
'-shard', self.shard,
self.tablet_alias,
]
result = utils.run_vtctl_json(args, auto_log=auto_log)
return result['transaction_id']
def commit(self, transaction_id, auto_log=True):
"""commit uses 'vtctl VtTabletCommit' to commit a transaction.
"""
args = [
'VtTabletCommit',
'-keyspace', self.keyspace,
'-shard', self.shard,
self.tablet_alias,
str(transaction_id),
]
return utils.run_vtctl(args, auto_log=auto_log)
def rollback(self, transaction_id, auto_log=True):
"""rollback uses 'vtctl VtTabletRollback' to rollback a transaction.
"""
args = [
'VtTabletRollback',
'-keyspace', self.keyspace,
'-shard', self.shard,
self.tablet_alias,
str(transaction_id),
]
return utils.run_vtctl(args, auto_log=auto_log)
def kill_tablets(tablets): def kill_tablets(tablets):
for t in tablets: for t in tablets:

Просмотреть файл

@ -6,22 +6,15 @@ import warnings
warnings.simplefilter('ignore') warnings.simplefilter('ignore')
import logging import logging
import os
import time
import traceback import traceback
import threading import threading
import unittest import unittest
import MySQLdb
import environment import environment
import tablet import tablet
import utils import utils
from vtdb import dbexceptions from vtdb import dbexceptions
from vtdb import topology
from vtdb import update_stream from vtdb import update_stream
from vtdb import vtclient
from zk import zkocc
from mysql_flavor import mysql_flavor from mysql_flavor import mysql_flavor
master_tablet = tablet.Tablet() master_tablet = tablet.Tablet()
@ -69,6 +62,9 @@ def setUpModule():
replica_tablet.init_mysql()] replica_tablet.init_mysql()]
utils.wait_procs(setup_procs) utils.wait_procs(setup_procs)
# start a vtctld so the vtctl insert commands are just RPCs, not forks
utils.Vtctld().start()
# Start up a master mysql and vttablet # Start up a master mysql and vttablet
logging.debug('Setting up tablets') logging.debug('Setting up tablets')
utils.run_vtctl(['CreateKeyspace', 'test_keyspace']) utils.run_vtctl(['CreateKeyspace', 'test_keyspace'])
@ -142,14 +138,6 @@ class TestUpdateStream(unittest.TestCase):
"insert into vt_b (eid, name, foo) values (%d, 'name %s', 'foo %s')" % "insert into vt_b (eid, name, foo) values (%d, 'name %s', 'foo %s')" %
(x, x, x) for x in xrange(count)] (x, x, x) for x in xrange(count)]
def setUp(self):
self.vtgate_client = zkocc.ZkOccConnection(utils.vtgate.addr(),
'test_nj', 30.0)
topology.read_topology(self.vtgate_client)
def tearDown(self):
self.vtgate_client.close()
def _get_master_stream_conn(self): def _get_master_stream_conn(self):
protocol, endpoint = master_tablet.update_stream_python_endpoint() protocol, endpoint = master_tablet.update_stream_python_endpoint()
return update_stream.connect(protocol, endpoint, 30) return update_stream.connect(protocol, endpoint, 30)
@ -240,17 +228,12 @@ class TestUpdateStream(unittest.TestCase):
self.fail("Update stream returned error '%s'" % str(e)) self.fail("Update stream returned error '%s'" % str(e))
logging.debug('Streamed %d transactions before exiting', txn_count) logging.debug('Streamed %d transactions before exiting', txn_count)
def _exec_vt_txn(self, query_list=None): def _exec_vt_txn(self, query_list):
if not query_list: tid = master_tablet.begin(auto_log=False)
return for query in query_list:
vtdb_conn = vtclient.VtOCCConnection(self.vtgate_client, master_tablet.execute(query, transaction_id=tid, auto_log=False)
'test_keyspace', '0', 'master', 30) master_tablet.commit(tid, auto_log=False)
vtdb_conn.connect() return
vtdb_cursor = vtdb_conn.cursor()
vtdb_conn.begin()
for q in query_list:
vtdb_cursor.execute(q, {})
vtdb_conn.commit()
def test_stream_parity(self): def test_stream_parity(self):
"""test_stream_parity checks the parity of streams received """test_stream_parity checks the parity of streams received