Merge pull request #1276 from alainjobart/resharding

Now vttablet will run a RebuildKeyspace in its cell if the SrvKeyspace object doesn't exist
This commit is contained in:
Alain Jobart 2015-11-03 16:58:41 -08:00
Родитель 2ee2b34b62 94477326c5
Коммит cb00443e17
68 изменённых файлов: 971 добавлений и 602 удалений

Просмотреть файл

@ -154,7 +154,7 @@ func initTabletMap(ts topo.Server, topology string, mysqld mysqlctl.MysqlDaemon,
// Rebuild the SrvKeyspace objects, we we can support range-based
// sharding queries.
wr := wrangler.New(logutil.NewConsoleLogger(), ts, nil, 30*time.Second /*lockTimeout*/)
wr := wrangler.New(logutil.NewConsoleLogger(), ts, nil)
for keyspace := range keyspaceMap {
if err := wr.RebuildKeyspaceGraph(ctx, keyspace, nil, true); err != nil {
log.Fatalf("cannot rebuild %v: %v", keyspace, err)

Просмотреть файл

@ -26,7 +26,6 @@ import (
var (
waitTime = flag.Duration("wait-time", 24*time.Hour, "time to wait on an action")
lockWaitTimeout = flag.Duration("lock-wait-timeout", time.Minute, "time to wait for a lock before starting an action")
)
func init() {
@ -82,7 +81,7 @@ func main() {
defer topo.CloseServers()
ctx, cancel := context.WithTimeout(context.Background(), *waitTime)
wr := wrangler.New(logutil.NewConsoleLogger(), topoServer, tmclient.NewTabletManagerClient(), *lockWaitTimeout)
wr := wrangler.New(logutil.NewConsoleLogger(), topoServer, tmclient.NewTabletManagerClient())
installSignalHandlers(cancel)
for _, f := range initFuncs {

Просмотреть файл

@ -22,7 +22,6 @@ import (
var (
actionTimeout = flag.Duration("action_timeout", time.Hour, "timeout for the total command")
dialTimeout = flag.Duration("dial_timeout", 30*time.Second, "time to wait for the dial phase")
lockWaitTimeout = flag.Duration("lock_wait_timeout", 10*time.Second, "time to wait for a topology server lock")
server = flag.String("server", "", "server to use for connection")
)
@ -33,7 +32,7 @@ func main() {
err := vtctlclient.RunCommandAndWait(
context.Background(), *server, flag.Args(),
*dialTimeout, *actionTimeout, *lockWaitTimeout,
*dialTimeout, *actionTimeout,
func(e *logutil.LoggerEvent) {
switch e.Level {
case logutil.LOGGER_INFO:

Просмотреть файл

@ -11,7 +11,6 @@ import (
"github.com/youtube/vitess/go/acl"
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/tabletmanager/actionnode"
"github.com/youtube/vitess/go/vt/tabletmanager/tmclient"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/topo/topoproto"
@ -22,7 +21,6 @@ import (
var (
actionTimeout = flag.Duration("action_timeout", wrangler.DefaultActionTimeout, "time to wait for an action before resorting to force")
lockTimeout = flag.Duration("lock_timeout", actionnode.DefaultLockTimeout, "lock time for wrangler/topo operations")
)
// ActionResult contains the result of an action. If Error, the aciton failed.
@ -102,7 +100,7 @@ func (ar *ActionRepository) ApplyKeyspaceAction(ctx context.Context, actionName,
}
ctx, cancel := context.WithTimeout(ctx, *actionTimeout)
wr := wrangler.New(logutil.NewConsoleLogger(), ar.ts, tmclient.NewTabletManagerClient(), *lockTimeout)
wr := wrangler.New(logutil.NewConsoleLogger(), ar.ts, tmclient.NewTabletManagerClient())
output, err := action(ctx, wr, keyspace, r)
cancel()
if err != nil {
@ -129,7 +127,7 @@ func (ar *ActionRepository) ApplyShardAction(ctx context.Context, actionName, ke
}
ctx, cancel := context.WithTimeout(ctx, *actionTimeout)
wr := wrangler.New(logutil.NewConsoleLogger(), ar.ts, tmclient.NewTabletManagerClient(), *lockTimeout)
wr := wrangler.New(logutil.NewConsoleLogger(), ar.ts, tmclient.NewTabletManagerClient())
output, err := action(ctx, wr, keyspace, shard, r)
cancel()
if err != nil {
@ -163,7 +161,7 @@ func (ar *ActionRepository) ApplyTabletAction(ctx context.Context, actionName st
// run the action
ctx, cancel := context.WithTimeout(ctx, *actionTimeout)
wr := wrangler.New(logutil.NewConsoleLogger(), ar.ts, tmclient.NewTabletManagerClient(), *lockTimeout)
wr := wrangler.New(logutil.NewConsoleLogger(), ar.ts, tmclient.NewTabletManagerClient())
output, err := action.method(ctx, wr, tabletAlias, r)
cancel()
if err != nil {

Просмотреть файл

@ -11,7 +11,6 @@ import (
"net/http"
"net/http/httptest"
"testing"
"time"
"golang.org/x/net/context"
@ -66,7 +65,7 @@ func TestAPI(t *testing.T) {
KeyRange: &pb.KeyRange{Start: nil, End: []byte{0x80}},
PortMap: map[string]int32{"vt": 200},
})
topotools.RebuildShard(ctx, logutil.NewConsoleLogger(), ts, "ks1", "-80", cells, 10*time.Second)
topotools.RebuildShard(ctx, logutil.NewConsoleLogger(), ts, "ks1", "-80", cells)
// Populate fake actions.
actionRepo.RegisterKeyspaceAction("TestKeyspaceAction",

Просмотреть файл

@ -80,7 +80,7 @@ func (s *streamHealthTabletServer) BroadcastHealth(terTimestamp int64, stats *pb
func TestTabletData(t *testing.T) {
db := fakesqldb.Register()
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
if err := ts.CreateKeyspace(context.Background(), "ks", &pbt.Keyspace{
ShardingColumnName: "keyspace_id",

Просмотреть файл

@ -12,7 +12,6 @@ import (
"github.com/youtube/vitess/go/vt/janitor"
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/servenv"
"github.com/youtube/vitess/go/vt/tabletmanager/actionnode"
"github.com/youtube/vitess/go/vt/tabletmanager/tmclient"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/wrangler"
@ -20,7 +19,6 @@ import (
var (
sleepTime = flag.Duration("sleep_time", 3*time.Minute, "how long to sleep between janitor runs")
lockTimeout = flag.Duration("lock_timeout", actionnode.DefaultLockTimeout, "lock time for wrangler/topo operations")
keyspace = flag.String("keyspace", "", "keyspace to manage")
shard = flag.String("shard", "", "shard to manage")
dryRunModules flagutil.StringListValue
@ -53,7 +51,7 @@ func main() {
ts := topo.GetServer()
scheduler, err := janitor.New(*keyspace, *shard, ts, wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), *lockTimeout), *sleepTime)
scheduler, err := janitor.New(*keyspace, *shard, ts, wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient()), *sleepTime)
if err != nil {
log.Errorf("janitor.New: %v", err)
exit.Return(1)

Просмотреть файл

@ -16,7 +16,6 @@ import (
"github.com/youtube/vitess/go/vt/tableacl"
"github.com/youtube/vitess/go/vt/tableacl/simpleacl"
"github.com/youtube/vitess/go/vt/tabletmanager"
"github.com/youtube/vitess/go/vt/tabletmanager/actionnode"
"github.com/youtube/vitess/go/vt/tabletserver"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/topo/topoproto"
@ -33,7 +32,6 @@ var (
tableAclConfig = flag.String("table-acl-config", "", "path to table access checker config file")
tabletPath = flag.String("tablet-path", "", "tablet alias")
overridesFile = flag.String("schema-override", "", "schema overrides file")
lockTimeout = flag.Duration("lock_timeout", actionnode.DefaultLockTimeout, "lock time for wrangler/topo operations")
agent *tabletmanager.ActionAgent
)
@ -108,7 +106,7 @@ func main() {
if servenv.GRPCPort != nil {
gRPCPort = int32(*servenv.GRPCPort)
}
agent, err = tabletmanager.NewActionAgent(context.Background(), mysqld, qsc, tabletAlias, dbcfgs, mycnf, int32(*servenv.Port), gRPCPort, *overridesFile, *lockTimeout)
agent, err = tabletmanager.NewActionAgent(context.Background(), mysqld, qsc, tabletAlias, dbcfgs, mycnf, int32(*servenv.Port), gRPCPort, *overridesFile)
if err != nil {
log.Error(err)
exit.Return(1)

Просмотреть файл

@ -60,7 +60,7 @@ func main() {
ts := topo.GetServer()
defer topo.CloseServers()
wi = worker.NewInstance(ts, *cell, 30*time.Second, *commandDisplayInterval)
wi = worker.NewInstance(ts, *cell, *commandDisplayInterval)
wi.InstallSignalHandlers()
wi.InitStatusHandling()

Просмотреть файл

@ -25,7 +25,6 @@ func ExecuteVtctl(ctx context.Context, server string, args []string) (string, er
// TODO(mberlin): Should these values be configurable as flags?
30*time.Second, // dialTimeout
time.Hour, // actionTimeout
10*time.Second, // lockWaitTimeout
CreateLoggerEventToBufferFunction(&output))
return output.String(), err

Просмотреть файл

@ -29,7 +29,6 @@ var _ = math.Inf
type ExecuteVtctlCommandRequest struct {
Args []string `protobuf:"bytes,1,rep,name=args" json:"args,omitempty"`
ActionTimeout int64 `protobuf:"varint,2,opt,name=action_timeout" json:"action_timeout,omitempty"`
LockTimeout int64 `protobuf:"varint,3,opt,name=lock_timeout" json:"lock_timeout,omitempty"`
}
func (m *ExecuteVtctlCommandRequest) Reset() { *m = ExecuteVtctlCommandRequest{} }

Просмотреть файл

@ -94,12 +94,19 @@ func (m *Session_ShardSession) GetTarget() *query.Target {
return nil
}
// ExecuteRequest is the payload to Execute
// ExecuteRequest is the payload to Execute.
type ExecuteRequest struct {
// caller_id identifies the caller. This is the effective caller ID,
// set by the application to further identify the caller.
CallerId *vtrpc.CallerID `protobuf:"bytes,1,opt,name=caller_id" json:"caller_id,omitempty"`
// session carries the current transaction data. It is returned by Begin.
// Do not fill it in if outside of a transaction.
Session *Session `protobuf:"bytes,2,opt,name=session" json:"session,omitempty"`
// query is the query and bind variables to execute.
Query *query.BoundQuery `protobuf:"bytes,3,opt,name=query" json:"query,omitempty"`
// tablet_type is the type of tablets that this query is targeted to.
TabletType topodata.TabletType `protobuf:"varint,4,opt,name=tablet_type,enum=topodata.TabletType" json:"tablet_type,omitempty"`
// not_in_transaction is deprecated and should not be used.
NotInTransaction bool `protobuf:"varint,5,opt,name=not_in_transaction" json:"not_in_transaction,omitempty"`
}
@ -128,10 +135,15 @@ func (m *ExecuteRequest) GetQuery() *query.BoundQuery {
return nil
}
// ExecuteResponse is the returned value from Execute
// ExecuteResponse is the returned value from Execute.
type ExecuteResponse struct {
// error contains an application level error if necessary. Note the
// session may have changed, even when an error is returned (for
// instance if a database integrity error happened).
Error *vtrpc.RPCError `protobuf:"bytes,1,opt,name=error" json:"error,omitempty"`
// session is the updated session information (only returned inside a transaction).
Session *Session `protobuf:"bytes,2,opt,name=session" json:"session,omitempty"`
// result contains the query result, only set if error is unset.
Result *query.QueryResult `protobuf:"bytes,3,opt,name=result" json:"result,omitempty"`
}
@ -160,14 +172,23 @@ func (m *ExecuteResponse) GetResult() *query.QueryResult {
return nil
}
// ExecuteShardsRequest is the payload to ExecuteShards
// ExecuteShardsRequest is the payload to ExecuteShards.
type ExecuteShardsRequest struct {
// caller_id identifies the caller. This is the effective caller ID,
// set by the application to further identify the caller.
CallerId *vtrpc.CallerID `protobuf:"bytes,1,opt,name=caller_id" json:"caller_id,omitempty"`
// session carries the current transaction data. It is returned by Begin.
// Do not fill it in if outside of a transaction.
Session *Session `protobuf:"bytes,2,opt,name=session" json:"session,omitempty"`
// query is the query and bind variables to execute.
Query *query.BoundQuery `protobuf:"bytes,3,opt,name=query" json:"query,omitempty"`
// keyspace to target the query to.
Keyspace string `protobuf:"bytes,4,opt,name=keyspace" json:"keyspace,omitempty"`
// shards to target the query to. A DML can only target one shard.
Shards []string `protobuf:"bytes,5,rep,name=shards" json:"shards,omitempty"`
// tablet_type is the type of tablets that this query is targeted to.
TabletType topodata.TabletType `protobuf:"varint,6,opt,name=tablet_type,enum=topodata.TabletType" json:"tablet_type,omitempty"`
// not_in_transaction is deprecated and should not be used.
NotInTransaction bool `protobuf:"varint,7,opt,name=not_in_transaction" json:"not_in_transaction,omitempty"`
}
@ -196,10 +217,15 @@ func (m *ExecuteShardsRequest) GetQuery() *query.BoundQuery {
return nil
}
// ExecuteShardsResponse is the returned value from ExecuteShards
// ExecuteShardsResponse is the returned value from ExecuteShards.
type ExecuteShardsResponse struct {
// error contains an application level error if necessary. Note the
// session may have changed, even when an error is returned (for
// instance if a database integrity error happened).
Error *vtrpc.RPCError `protobuf:"bytes,1,opt,name=error" json:"error,omitempty"`
// session is the updated session information (only returned inside a transaction).
Session *Session `protobuf:"bytes,2,opt,name=session" json:"session,omitempty"`
// result contains the query result, only set if error is unset.
Result *query.QueryResult `protobuf:"bytes,3,opt,name=result" json:"result,omitempty"`
}
@ -228,14 +254,24 @@ func (m *ExecuteShardsResponse) GetResult() *query.QueryResult {
return nil
}
// ExecuteKeyspaceIdsRequest is the payload to ExecuteKeyspaceIds
// ExecuteKeyspaceIdsRequest is the payload to ExecuteKeyspaceIds.
type ExecuteKeyspaceIdsRequest struct {
// caller_id identifies the caller. This is the effective caller ID,
// set by the application to further identify the caller.
CallerId *vtrpc.CallerID `protobuf:"bytes,1,opt,name=caller_id" json:"caller_id,omitempty"`
// session carries the current transaction data. It is returned by Begin.
// Do not fill it in if outside of a transaction.
Session *Session `protobuf:"bytes,2,opt,name=session" json:"session,omitempty"`
// query is the query and bind variables to execute.
Query *query.BoundQuery `protobuf:"bytes,3,opt,name=query" json:"query,omitempty"`
// keyspace to target the query to.
Keyspace string `protobuf:"bytes,4,opt,name=keyspace" json:"keyspace,omitempty"`
// keyspace_ids contains the list of keyspace_ids affected by this query.
// Will be used to find the shards to send the query to.
KeyspaceIds [][]byte `protobuf:"bytes,5,rep,name=keyspace_ids,proto3" json:"keyspace_ids,omitempty"`
// tablet_type is the type of tablets that this query is targeted to.
TabletType topodata.TabletType `protobuf:"varint,6,opt,name=tablet_type,enum=topodata.TabletType" json:"tablet_type,omitempty"`
// not_in_transaction is deprecated and should not be used.
NotInTransaction bool `protobuf:"varint,7,opt,name=not_in_transaction" json:"not_in_transaction,omitempty"`
}
@ -264,10 +300,15 @@ func (m *ExecuteKeyspaceIdsRequest) GetQuery() *query.BoundQuery {
return nil
}
// ExecuteKeyspaceIdsResponse is the returned value from ExecuteKeyspaceIds
// ExecuteKeyspaceIdsResponse is the returned value from ExecuteKeyspaceIds.
type ExecuteKeyspaceIdsResponse struct {
// error contains an application level error if necessary. Note the
// session may have changed, even when an error is returned (for
// instance if a database integrity error happened).
Error *vtrpc.RPCError `protobuf:"bytes,1,opt,name=error" json:"error,omitempty"`
// session is the updated session information (only returned inside a transaction).
Session *Session `protobuf:"bytes,2,opt,name=session" json:"session,omitempty"`
// result contains the query result, only set if error is unset.
Result *query.QueryResult `protobuf:"bytes,3,opt,name=result" json:"result,omitempty"`
}
@ -296,14 +337,24 @@ func (m *ExecuteKeyspaceIdsResponse) GetResult() *query.QueryResult {
return nil
}
// ExecuteKeyRangesRequest is the payload to ExecuteKeyRanges
// ExecuteKeyRangesRequest is the payload to ExecuteKeyRanges.
type ExecuteKeyRangesRequest struct {
// caller_id identifies the caller. This is the effective caller ID,
// set by the application to further identify the caller.
CallerId *vtrpc.CallerID `protobuf:"bytes,1,opt,name=caller_id" json:"caller_id,omitempty"`
// session carries the current transaction data. It is returned by Begin.
// Do not fill it in if outside of a transaction.
Session *Session `protobuf:"bytes,2,opt,name=session" json:"session,omitempty"`
// query is the query and bind variables to execute.
Query *query.BoundQuery `protobuf:"bytes,3,opt,name=query" json:"query,omitempty"`
// keyspace to target the query to
Keyspace string `protobuf:"bytes,4,opt,name=keyspace" json:"keyspace,omitempty"`
// key_ranges contains the list of key ranges affected by this query.
// Will be used to find the shards to send the query to.
KeyRanges []*topodata.KeyRange `protobuf:"bytes,5,rep,name=key_ranges" json:"key_ranges,omitempty"`
// tablet_type is the type of tablets that this query is targeted to.
TabletType topodata.TabletType `protobuf:"varint,6,opt,name=tablet_type,enum=topodata.TabletType" json:"tablet_type,omitempty"`
// not_in_transaction is deprecated and should not be used.
NotInTransaction bool `protobuf:"varint,7,opt,name=not_in_transaction" json:"not_in_transaction,omitempty"`
}
@ -339,10 +390,15 @@ func (m *ExecuteKeyRangesRequest) GetKeyRanges() []*topodata.KeyRange {
return nil
}
// ExecuteKeyRangesResponse is the returned value from ExecuteKeyRanges
// ExecuteKeyRangesResponse is the returned value from ExecuteKeyRanges.
type ExecuteKeyRangesResponse struct {
// error contains an application level error if necessary. Note the
// session may have changed, even when an error is returned (for
// instance if a database integrity error happened).
Error *vtrpc.RPCError `protobuf:"bytes,1,opt,name=error" json:"error,omitempty"`
// session is the updated session information (only returned inside a transaction).
Session *Session `protobuf:"bytes,2,opt,name=session" json:"session,omitempty"`
// result contains the query result, only set if error is unset.
Result *query.QueryResult `protobuf:"bytes,3,opt,name=result" json:"result,omitempty"`
}
@ -371,15 +427,26 @@ func (m *ExecuteKeyRangesResponse) GetResult() *query.QueryResult {
return nil
}
// ExecuteEntityIdsRequest is the payload to ExecuteEntityIds
// ExecuteEntityIdsRequest is the payload to ExecuteEntityIds.
type ExecuteEntityIdsRequest struct {
// caller_id identifies the caller. This is the effective caller ID,
// set by the application to further identify the caller.
CallerId *vtrpc.CallerID `protobuf:"bytes,1,opt,name=caller_id" json:"caller_id,omitempty"`
// session carries the current transaction data. It is returned by Begin.
// Do not fill it in if outside of a transaction.
Session *Session `protobuf:"bytes,2,opt,name=session" json:"session,omitempty"`
// query is the query and bind variables to execute.
Query *query.BoundQuery `protobuf:"bytes,3,opt,name=query" json:"query,omitempty"`
// keyspace to target the query to.
Keyspace string `protobuf:"bytes,4,opt,name=keyspace" json:"keyspace,omitempty"`
// entity_column_name is the column name to use.
EntityColumnName string `protobuf:"bytes,5,opt,name=entity_column_name" json:"entity_column_name,omitempty"`
// entity_keyspace_ids are pairs of entity_column_name values
// associated with its corresponding keyspace_id.
EntityKeyspaceIds []*ExecuteEntityIdsRequest_EntityId `protobuf:"bytes,6,rep,name=entity_keyspace_ids" json:"entity_keyspace_ids,omitempty"`
// tablet_type is the type of tablets that this query is targeted to.
TabletType topodata.TabletType `protobuf:"varint,7,opt,name=tablet_type,enum=topodata.TabletType" json:"tablet_type,omitempty"`
// not_in_transaction is deprecated and should not be used.
NotInTransaction bool `protobuf:"varint,8,opt,name=not_in_transaction" json:"not_in_transaction,omitempty"`
}
@ -416,8 +483,11 @@ func (m *ExecuteEntityIdsRequest) GetEntityKeyspaceIds() []*ExecuteEntityIdsRequ
}
type ExecuteEntityIdsRequest_EntityId struct {
// xid_type is the type of the entity's value. Can be NULL.
XidType query.Type `protobuf:"varint,1,opt,name=xid_type,enum=query.Type" json:"xid_type,omitempty"`
// xid_value is the value for the entity. Not set if xid_type is NULL.
XidValue []byte `protobuf:"bytes,2,opt,name=xid_value,proto3" json:"xid_value,omitempty"`
// keyspace_id is the associated keyspace_id for the entity.
KeyspaceId []byte `protobuf:"bytes,3,opt,name=keyspace_id,proto3" json:"keyspace_id,omitempty"`
}
@ -425,10 +495,15 @@ func (m *ExecuteEntityIdsRequest_EntityId) Reset() { *m = ExecuteEntityI
func (m *ExecuteEntityIdsRequest_EntityId) String() string { return proto.CompactTextString(m) }
func (*ExecuteEntityIdsRequest_EntityId) ProtoMessage() {}
// ExecuteEntityIdsResponse is the returned value from ExecuteEntityIds
// ExecuteEntityIdsResponse is the returned value from ExecuteEntityIds.
type ExecuteEntityIdsResponse struct {
// error contains an application level error if necessary. Note the
// session may have changed, even when an error is returned (for
// instance if a database integrity error happened).
Error *vtrpc.RPCError `protobuf:"bytes,1,opt,name=error" json:"error,omitempty"`
// session is the updated session information (only returned inside a transaction).
Session *Session `protobuf:"bytes,2,opt,name=session" json:"session,omitempty"`
// result contains the query result, only set if error is unset.
Result *query.QueryResult `protobuf:"bytes,3,opt,name=result" json:"result,omitempty"`
}
@ -461,8 +536,11 @@ func (m *ExecuteEntityIdsResponse) GetResult() *query.QueryResult {
// specified list of shards. This is used in a list for
// ExecuteBatchShardsRequest.
type BoundShardQuery struct {
// query is the query and bind variables to execute.
Query *query.BoundQuery `protobuf:"bytes,1,opt,name=query" json:"query,omitempty"`
// keyspace to target the query to.
Keyspace string `protobuf:"bytes,2,opt,name=keyspace" json:"keyspace,omitempty"`
// shards to target the query to. A DML can only target one shard.
Shards []string `protobuf:"bytes,3,rep,name=shards" json:"shards,omitempty"`
}
@ -479,10 +557,19 @@ func (m *BoundShardQuery) GetQuery() *query.BoundQuery {
// ExecuteBatchShardsRequest is the payload to ExecuteBatchShards
type ExecuteBatchShardsRequest struct {
// caller_id identifies the caller. This is the effective caller ID,
// set by the application to further identify the caller.
CallerId *vtrpc.CallerID `protobuf:"bytes,1,opt,name=caller_id" json:"caller_id,omitempty"`
// session carries the current transaction data. It is returned by Begin.
// Do not fill it in if outside of a transaction.
Session *Session `protobuf:"bytes,2,opt,name=session" json:"session,omitempty"`
// queries carries all the queries to execute.
Queries []*BoundShardQuery `protobuf:"bytes,3,rep,name=queries" json:"queries,omitempty"`
// tablet_type is the type of tablets that this query is targeted to.
TabletType topodata.TabletType `protobuf:"varint,4,opt,name=tablet_type,enum=topodata.TabletType" json:"tablet_type,omitempty"`
// as_transaction will execute the queries in this batch in a single transaction per shard, created for this purpose.
// (this can be seen as adding a 'begin' before and 'commit' after the queries).
// Only makes sense if tablet_type is master. If set, the Session is ignored.
AsTransaction bool `protobuf:"varint,5,opt,name=as_transaction" json:"as_transaction,omitempty"`
}
@ -511,10 +598,15 @@ func (m *ExecuteBatchShardsRequest) GetQueries() []*BoundShardQuery {
return nil
}
// ExecuteBatchShardsResponse is the returned value from ExecuteBatchShards
// ExecuteBatchShardsResponse is the returned value from ExecuteBatchShards.
type ExecuteBatchShardsResponse struct {
// error contains an application level error if necessary. Note the
// session may have changed, even when an error is returned (for
// instance if a database integrity error happened).
Error *vtrpc.RPCError `protobuf:"bytes,1,opt,name=error" json:"error,omitempty"`
// session is the updated session information (only returned inside a transaction).
Session *Session `protobuf:"bytes,2,opt,name=session" json:"session,omitempty"`
// result contains the query result, only set if error is unset.
Results []*query.QueryResult `protobuf:"bytes,3,rep,name=results" json:"results,omitempty"`
}
@ -547,8 +639,12 @@ func (m *ExecuteBatchShardsResponse) GetResults() []*query.QueryResult {
// specified list of keyspace ids. This is used in a list for
// ExecuteBatchKeyspaceIdsRequest.
type BoundKeyspaceIdQuery struct {
// query is the query and bind variables to execute.
Query *query.BoundQuery `protobuf:"bytes,1,opt,name=query" json:"query,omitempty"`
// keyspace to target the query to.
Keyspace string `protobuf:"bytes,2,opt,name=keyspace" json:"keyspace,omitempty"`
// keyspace_ids contains the list of keyspace_ids affected by this query.
// Will be used to find the shards to send the query to.
KeyspaceIds [][]byte `protobuf:"bytes,3,rep,name=keyspace_ids,proto3" json:"keyspace_ids,omitempty"`
}
@ -563,12 +659,20 @@ func (m *BoundKeyspaceIdQuery) GetQuery() *query.BoundQuery {
return nil
}
// ExecuteBatchKeyspaceIdsRequest is the payload to ExecuteBatchKeyspaceId
// ExecuteBatchKeyspaceIdsRequest is the payload to ExecuteBatchKeyspaceId.
type ExecuteBatchKeyspaceIdsRequest struct {
// caller_id identifies the caller. This is the effective caller ID,
// set by the application to further identify the caller.
CallerId *vtrpc.CallerID `protobuf:"bytes,1,opt,name=caller_id" json:"caller_id,omitempty"`
// session carries the current transaction data. It is returned by Begin.
// Do not fill it in if outside of a transaction.
Session *Session `protobuf:"bytes,2,opt,name=session" json:"session,omitempty"`
Queries []*BoundKeyspaceIdQuery `protobuf:"bytes,3,rep,name=queries" json:"queries,omitempty"`
// tablet_type is the type of tablets that this query is targeted to.
TabletType topodata.TabletType `protobuf:"varint,4,opt,name=tablet_type,enum=topodata.TabletType" json:"tablet_type,omitempty"`
// as_transaction will execute the queries in this batch in a single transaction per shard, created for this purpose.
// (this can be seen as adding a 'begin' before and 'commit' after the queries).
// Only makes sense if tablet_type is master. If set, the Session is ignored.
AsTransaction bool `protobuf:"varint,5,opt,name=as_transaction" json:"as_transaction,omitempty"`
}
@ -597,10 +701,15 @@ func (m *ExecuteBatchKeyspaceIdsRequest) GetQueries() []*BoundKeyspaceIdQuery {
return nil
}
// ExecuteBatchKeyspaceIdsResponse is the returned value from ExecuteBatchKeyspaceId
// ExecuteBatchKeyspaceIdsResponse is the returned value from ExecuteBatchKeyspaceId.
type ExecuteBatchKeyspaceIdsResponse struct {
// error contains an application level error if necessary. Note the
// session may have changed, even when an error is returned (for
// instance if a database integrity error happened).
Error *vtrpc.RPCError `protobuf:"bytes,1,opt,name=error" json:"error,omitempty"`
// session is the updated session information (only returned inside a transaction).
Session *Session `protobuf:"bytes,2,opt,name=session" json:"session,omitempty"`
// result contains the query result, only set if error is unset.
Results []*query.QueryResult `protobuf:"bytes,3,rep,name=results" json:"results,omitempty"`
}
@ -629,10 +738,14 @@ func (m *ExecuteBatchKeyspaceIdsResponse) GetResults() []*query.QueryResult {
return nil
}
// StreamExecuteRequest is the payload to StreamExecute
// StreamExecuteRequest is the payload to StreamExecute.
type StreamExecuteRequest struct {
// caller_id identifies the caller. This is the effective caller ID,
// set by the application to further identify the caller.
CallerId *vtrpc.CallerID `protobuf:"bytes,1,opt,name=caller_id" json:"caller_id,omitempty"`
// query is the query and bind variables to execute.
Query *query.BoundQuery `protobuf:"bytes,2,opt,name=query" json:"query,omitempty"`
// tablet_type is the type of tablets that this query is targeted to.
TabletType topodata.TabletType `protobuf:"varint,3,opt,name=tablet_type,enum=topodata.TabletType" json:"tablet_type,omitempty"`
}
@ -654,8 +767,11 @@ func (m *StreamExecuteRequest) GetQuery() *query.BoundQuery {
return nil
}
// StreamExecuteResponse is the returned value from StreamExecute
// StreamExecuteResponse is the returned value from StreamExecute.
type StreamExecuteResponse struct {
// result contains the result data.
// The first value contains only Fields information.
// The next values contain the actual rows, a few values per result.
Result *query.QueryResult `protobuf:"bytes,1,opt,name=result" json:"result,omitempty"`
}
@ -670,12 +786,18 @@ func (m *StreamExecuteResponse) GetResult() *query.QueryResult {
return nil
}
// StreamExecuteShardsRequest is the payload to StreamExecuteShards
// StreamExecuteShardsRequest is the payload to StreamExecuteShards.
type StreamExecuteShardsRequest struct {
// caller_id identifies the caller. This is the effective caller ID,
// set by the application to further identify the caller.
CallerId *vtrpc.CallerID `protobuf:"bytes,1,opt,name=caller_id" json:"caller_id,omitempty"`
// query is the query and bind variables to execute.
Query *query.BoundQuery `protobuf:"bytes,2,opt,name=query" json:"query,omitempty"`
// keyspace to target the query to.
Keyspace string `protobuf:"bytes,3,opt,name=keyspace" json:"keyspace,omitempty"`
// shards to target the query to.
Shards []string `protobuf:"bytes,4,rep,name=shards" json:"shards,omitempty"`
// tablet_type is the type of tablets that this query is targeted to.
TabletType topodata.TabletType `protobuf:"varint,5,opt,name=tablet_type,enum=topodata.TabletType" json:"tablet_type,omitempty"`
}
@ -697,8 +819,11 @@ func (m *StreamExecuteShardsRequest) GetQuery() *query.BoundQuery {
return nil
}
// StreamExecuteShardsResponse is the returned value from StreamExecuteShards
// StreamExecuteShardsResponse is the returned value from StreamExecuteShards.
type StreamExecuteShardsResponse struct {
// result contains the result data.
// The first value contains only Fields information.
// The next values contain the actual rows, a few values per result.
Result *query.QueryResult `protobuf:"bytes,1,opt,name=result" json:"result,omitempty"`
}
@ -713,12 +838,19 @@ func (m *StreamExecuteShardsResponse) GetResult() *query.QueryResult {
return nil
}
// StreamExecuteKeyspaceIdsRequest is the payload to StreamExecuteKeyspaceIds
// StreamExecuteKeyspaceIdsRequest is the payload to StreamExecuteKeyspaceIds.
type StreamExecuteKeyspaceIdsRequest struct {
// caller_id identifies the caller. This is the effective caller ID,
// set by the application to further identify the caller.
CallerId *vtrpc.CallerID `protobuf:"bytes,1,opt,name=caller_id" json:"caller_id,omitempty"`
// query is the query and bind variables to execute.
Query *query.BoundQuery `protobuf:"bytes,2,opt,name=query" json:"query,omitempty"`
// keyspace to target the query to.
Keyspace string `protobuf:"bytes,3,opt,name=keyspace" json:"keyspace,omitempty"`
// keyspace_ids contains the list of keyspace_ids affected by this query.
// Will be used to find the shards to send the query to.
KeyspaceIds [][]byte `protobuf:"bytes,4,rep,name=keyspace_ids,proto3" json:"keyspace_ids,omitempty"`
// tablet_type is the type of tablets that this query is targeted to.
TabletType topodata.TabletType `protobuf:"varint,5,opt,name=tablet_type,enum=topodata.TabletType" json:"tablet_type,omitempty"`
}
@ -740,8 +872,11 @@ func (m *StreamExecuteKeyspaceIdsRequest) GetQuery() *query.BoundQuery {
return nil
}
// StreamExecuteKeyspaceIdsResponse is the returned value from StreamExecuteKeyspaceIds
// StreamExecuteKeyspaceIdsResponse is the returned value from StreamExecuteKeyspaceIds.
type StreamExecuteKeyspaceIdsResponse struct {
// result contains the result data.
// The first value contains only Fields information.
// The next values contain the actual rows, a few values per result.
Result *query.QueryResult `protobuf:"bytes,1,opt,name=result" json:"result,omitempty"`
}
@ -756,12 +891,19 @@ func (m *StreamExecuteKeyspaceIdsResponse) GetResult() *query.QueryResult {
return nil
}
// StreamExecuteKeyRangesRequest is the payload to StreamExecuteKeyRanges
// StreamExecuteKeyRangesRequest is the payload to StreamExecuteKeyRanges.
type StreamExecuteKeyRangesRequest struct {
// caller_id identifies the caller. This is the effective caller ID,
// set by the application to further identify the caller.
CallerId *vtrpc.CallerID `protobuf:"bytes,1,opt,name=caller_id" json:"caller_id,omitempty"`
// query is the query and bind variables to execute.
Query *query.BoundQuery `protobuf:"bytes,2,opt,name=query" json:"query,omitempty"`
// keyspace to target the query to.
Keyspace string `protobuf:"bytes,3,opt,name=keyspace" json:"keyspace,omitempty"`
// key_ranges contains the list of key ranges affected by this query.
// Will be used to find the shards to send the query to.
KeyRanges []*topodata.KeyRange `protobuf:"bytes,4,rep,name=key_ranges" json:"key_ranges,omitempty"`
// tablet_type is the type of tablets that this query is targeted to.
TabletType topodata.TabletType `protobuf:"varint,5,opt,name=tablet_type,enum=topodata.TabletType" json:"tablet_type,omitempty"`
}
@ -790,8 +932,11 @@ func (m *StreamExecuteKeyRangesRequest) GetKeyRanges() []*topodata.KeyRange {
return nil
}
// StreamExecuteKeyRangesResponse is the returned value from StreamExecuteKeyRanges
// StreamExecuteKeyRangesResponse is the returned value from StreamExecuteKeyRanges.
type StreamExecuteKeyRangesResponse struct {
// result contains the result data.
// The first value contains only Fields information.
// The next values contain the actual rows, a few values per result.
Result *query.QueryResult `protobuf:"bytes,1,opt,name=result" json:"result,omitempty"`
}
@ -806,8 +951,10 @@ func (m *StreamExecuteKeyRangesResponse) GetResult() *query.QueryResult {
return nil
}
// BeginRequest is the payload to Begin
// BeginRequest is the payload to Begin.
type BeginRequest struct {
// caller_id identifies the caller. This is the effective caller ID,
// set by the application to further identify the caller.
CallerId *vtrpc.CallerID `protobuf:"bytes,1,opt,name=caller_id" json:"caller_id,omitempty"`
}
@ -822,8 +969,9 @@ func (m *BeginRequest) GetCallerId() *vtrpc.CallerID {
return nil
}
// BeginResponse is the returned value from Begin
// BeginResponse is the returned value from Begin.
type BeginResponse struct {
// session is the initial session information to use for subsequent queries.
Session *Session `protobuf:"bytes,1,opt,name=session" json:"session,omitempty"`
}
@ -838,9 +986,12 @@ func (m *BeginResponse) GetSession() *Session {
return nil
}
// CommitRequest is the payload to Commit
// CommitRequest is the payload to Commit.
type CommitRequest struct {
// caller_id identifies the caller. This is the effective caller ID,
// set by the application to further identify the caller.
CallerId *vtrpc.CallerID `protobuf:"bytes,1,opt,name=caller_id" json:"caller_id,omitempty"`
// session carries the current transaction data to commit.
Session *Session `protobuf:"bytes,2,opt,name=session" json:"session,omitempty"`
}
@ -862,7 +1013,7 @@ func (m *CommitRequest) GetSession() *Session {
return nil
}
// CommitResponse is the returned value from Commit
// CommitResponse is the returned value from Commit.
type CommitResponse struct {
}
@ -870,9 +1021,12 @@ func (m *CommitResponse) Reset() { *m = CommitResponse{} }
func (m *CommitResponse) String() string { return proto.CompactTextString(m) }
func (*CommitResponse) ProtoMessage() {}
// RollbackRequest is the payload to Rollback
// RollbackRequest is the payload to Rollback.
type RollbackRequest struct {
// caller_id identifies the caller. This is the effective caller ID,
// set by the application to further identify the caller.
CallerId *vtrpc.CallerID `protobuf:"bytes,1,opt,name=caller_id" json:"caller_id,omitempty"`
// session carries the current transaction data to rollback.
Session *Session `protobuf:"bytes,2,opt,name=session" json:"session,omitempty"`
}
@ -894,7 +1048,7 @@ func (m *RollbackRequest) GetSession() *Session {
return nil
}
// RollbackResponse is the returned value from Rollback
// RollbackResponse is the returned value from Rollback.
type RollbackResponse struct {
}
@ -902,12 +1056,18 @@ func (m *RollbackResponse) Reset() { *m = RollbackResponse{} }
func (m *RollbackResponse) String() string { return proto.CompactTextString(m) }
func (*RollbackResponse) ProtoMessage() {}
// SplitQueryRequest is the payload to SplitQuery
// SplitQueryRequest is the payload to SplitQuery.
type SplitQueryRequest struct {
// caller_id identifies the caller. This is the effective caller ID,
// set by the application to further identify the caller.
CallerId *vtrpc.CallerID `protobuf:"bytes,1,opt,name=caller_id" json:"caller_id,omitempty"`
// keyspace to target the query to.
Keyspace string `protobuf:"bytes,2,opt,name=keyspace" json:"keyspace,omitempty"`
// query is the query and bind variables to produce splits for.
Query *query.BoundQuery `protobuf:"bytes,3,opt,name=query" json:"query,omitempty"`
// split_column is an optional hint on the column to use to split the query.
SplitColumn string `protobuf:"bytes,4,opt,name=split_column" json:"split_column,omitempty"`
// split_count describes how many splits we want for this query.
SplitCount int64 `protobuf:"varint,5,opt,name=split_count" json:"split_count,omitempty"`
}
@ -929,8 +1089,9 @@ func (m *SplitQueryRequest) GetQuery() *query.BoundQuery {
return nil
}
// SplitQueryResponse is the returned value from SplitQuery
// SplitQueryResponse is the returned value from SplitQuery.
type SplitQueryResponse struct {
// splits contains the queries to run to fetch the entire data set.
Splits []*SplitQueryResponse_Part `protobuf:"bytes,1,rep,name=splits" json:"splits,omitempty"`
}
@ -946,7 +1107,9 @@ func (m *SplitQueryResponse) GetSplits() []*SplitQueryResponse_Part {
}
type SplitQueryResponse_KeyRangePart struct {
// keyspace to target the query to.
Keyspace string `protobuf:"bytes,1,opt,name=keyspace" json:"keyspace,omitempty"`
// key ranges to target the query to.
KeyRanges []*topodata.KeyRange `protobuf:"bytes,2,rep,name=key_ranges" json:"key_ranges,omitempty"`
}
@ -962,7 +1125,9 @@ func (m *SplitQueryResponse_KeyRangePart) GetKeyRanges() []*topodata.KeyRange {
}
type SplitQueryResponse_ShardPart struct {
// keyspace to target the query to.
Keyspace string `protobuf:"bytes,1,opt,name=keyspace" json:"keyspace,omitempty"`
// shards to target the query to.
Shards []string `protobuf:"bytes,2,rep,name=shards" json:"shards,omitempty"`
}
@ -971,9 +1136,13 @@ func (m *SplitQueryResponse_ShardPart) String() string { return proto.CompactTex
func (*SplitQueryResponse_ShardPart) ProtoMessage() {}
type SplitQueryResponse_Part struct {
// query is the query and bind variables to execute.
Query *query.BoundQuery `protobuf:"bytes,1,opt,name=query" json:"query,omitempty"`
// key_range_part is set if the query should be executed by ExecuteKeyRanges.
KeyRangePart *SplitQueryResponse_KeyRangePart `protobuf:"bytes,2,opt,name=key_range_part" json:"key_range_part,omitempty"`
// shard_part is set if the query should be executed by ExecuteShards.
ShardPart *SplitQueryResponse_ShardPart `protobuf:"bytes,3,opt,name=shard_part" json:"shard_part,omitempty"`
// size is the approximate number of rows this query will return.
Size int64 `protobuf:"varint,4,opt,name=size" json:"size,omitempty"`
}
@ -1002,8 +1171,9 @@ func (m *SplitQueryResponse_Part) GetShardPart() *SplitQueryResponse_ShardPart {
return nil
}
// GetSrvKeyspaceRequest is the payload to GetSrvKeyspace
// GetSrvKeyspaceRequest is the payload to GetSrvKeyspace.
type GetSrvKeyspaceRequest struct {
// keyspace name to fetch.
Keyspace string `protobuf:"bytes,1,opt,name=keyspace" json:"keyspace,omitempty"`
}
@ -1011,8 +1181,9 @@ func (m *GetSrvKeyspaceRequest) Reset() { *m = GetSrvKeyspaceRequest{} }
func (m *GetSrvKeyspaceRequest) String() string { return proto.CompactTextString(m) }
func (*GetSrvKeyspaceRequest) ProtoMessage() {}
// GetSrvKeyspaceResponse is the returned value from GetSrvKeyspace
// GetSrvKeyspaceResponse is the returned value from GetSrvKeyspace.
type GetSrvKeyspaceResponse struct {
// srv_keyspace is the topology object for the SrvKeyspace.
SrvKeyspace *topodata.SrvKeyspace `protobuf:"bytes,1,opt,name=srv_keyspace" json:"srv_keyspace,omitempty"`
}

Просмотреть файл

@ -8,6 +8,7 @@ package actionnode
// topology server.
import (
"flag"
"time"
log "github.com/golang/glog"
@ -20,6 +21,10 @@ var (
// DefaultLockTimeout is a good value to use as a default for
// locking a shard / keyspace.
DefaultLockTimeout = 30 * time.Second
// LockTimeout is the command line flag that introduces a shorter
// timeout for locking topology structures.
LockTimeout = flag.Duration("lock_timeout", DefaultLockTimeout, "timeout for acquiring topology locks")
)
// LockKeyspace will lock the keyspace in the topology server.
@ -27,6 +32,9 @@ var (
func (n *ActionNode) LockKeyspace(ctx context.Context, ts topo.Server, keyspace string) (lockPath string, err error) {
log.Infof("Locking keyspace %v for action %v", keyspace, n.Action)
ctx, cancel := context.WithTimeout(ctx, *LockTimeout)
defer cancel()
span := trace.NewSpanFromContext(ctx)
span.StartClient("TopoServer.LockKeyspaceForAction")
span.Annotate("action", n.Action)
@ -89,6 +97,9 @@ func (n *ActionNode) UnlockKeyspace(ctx context.Context, ts topo.Server, keyspac
func (n *ActionNode) LockShard(ctx context.Context, ts topo.Server, keyspace, shard string) (lockPath string, err error) {
log.Infof("Locking shard %v/%v for action %v", keyspace, shard, n.Action)
ctx, cancel := context.WithTimeout(ctx, *LockTimeout)
defer cancel()
span := trace.NewSpanFromContext(ctx)
span.StartClient("TopoServer.LockShardForAction")
span.Annotate("action", n.Action)
@ -153,6 +164,9 @@ func (n *ActionNode) UnlockShard(ctx context.Context, ts topo.Server, keyspace,
func (n *ActionNode) LockSrvShard(ctx context.Context, ts topo.Server, cell, keyspace, shard string) (lockPath string, err error) {
log.Infof("Locking serving shard %v/%v/%v for action %v", cell, keyspace, shard, n.Action)
ctx, cancel := context.WithTimeout(ctx, *LockTimeout)
defer cancel()
span := trace.NewSpanFromContext(ctx)
span.StartClient("TopoServer.LockSrvShardForAction")
span.Annotate("action", n.Action)

Просмотреть файл

@ -73,7 +73,6 @@ type ActionAgent struct {
DBConfigs dbconfigs.DBConfigs
SchemaOverrides []tabletserver.SchemaOverride
BinlogPlayerMap *BinlogPlayerMap
LockTimeout time.Duration
// exportStats is set only for production tablet.
exportStats bool
@ -158,7 +157,6 @@ func NewActionAgent(
mycnf *mysqlctl.Mycnf,
port, gRPCPort int32,
overridesFile string,
lockTimeout time.Duration,
) (agent *ActionAgent, err error) {
schemaOverrides := loadSchemaOverrides(overridesFile)
@ -173,7 +171,6 @@ func NewActionAgent(
MysqlDaemon: mysqld,
DBConfigs: dbcfgs,
SchemaOverrides: schemaOverrides,
LockTimeout: lockTimeout,
History: history.New(historyLength),
lastHealthMapCount: stats.NewInt("LastHealthMapCount"),
_healthy: fmt.Errorf("healthcheck not run yet"),
@ -526,11 +523,16 @@ func (agent *ActionAgent) Start(ctx context.Context, mysqlPort, vtPort, gRPCPort
return err
}
// and update our state
// update our state
oldTablet := &pb.Tablet{}
if err = agent.updateState(ctx, oldTablet, "Start"); err != nil {
log.Warningf("Initial updateState failed, will need a state change before running properly: %v", err)
}
// run a background task to rebuild the SrvKeyspace in our cell/keyspace
// if it doesn't exist yet
go agent.maybeRebuildKeyspace(tablet.Alias.Cell, tablet.Keyspace)
return nil
}

Просмотреть файл

@ -7,7 +7,6 @@ package tabletmanager
import (
"fmt"
"testing"
"time"
"github.com/youtube/vitess/go/history"
"github.com/youtube/vitess/go/stats"
@ -43,7 +42,6 @@ func TestInitTablet(t *testing.T) {
DBConfigs: dbconfigs.DBConfigs{},
SchemaOverrides: nil,
BinlogPlayerMap: nil,
LockTimeout: 10 * time.Second,
batchCtx: ctx,
History: history.New(historyLength),
lastHealthMapCount: new(stats.Int),

Просмотреть файл

@ -0,0 +1,37 @@
// Copyright 2014, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package tabletmanager
import (
log "github.com/golang/glog"
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/topotools"
)
// maybeRebuildKeyspace handles the initial rebuild of SrvKeyspace if needed.
// This should be run as a background task: we use the batch context to check
// the SrvKeyspace object. If it is not there, we try to rebuild it
// for the current cell only.
func (agent *ActionAgent) maybeRebuildKeyspace(cell, keyspace string) {
_, err := agent.TopoServer.GetSrvKeyspace(agent.batchCtx, cell, keyspace)
switch err {
case nil:
// SrvKeyspace exists, we're done
log.Infof("SrvKeyspace(%v,%v) exists, not building it", cell, keyspace)
return
case topo.ErrNoNode:
log.Infof("SrvKeyspace(%v,%v) doesn't exist, rebuilding it", cell, keyspace)
// SrvKeyspace doesn't exist, we'll try to rebuild it
default:
log.Warningf("Cannot read SrvKeyspace(%v,%v) (may need to run 'vtctl RebuildKeyspaceGraph %v'), skipping rebuild: %v", cell, keyspace, keyspace, err)
return
}
if err := topotools.RebuildKeyspace(agent.batchCtx, logutil.NewConsoleLogger(), agent.TopoServer, keyspace, []string{cell}, false); err != nil {
log.Warningf("RebuildKeyspace(%v,%v) failed: %v, may need to run 'vtctl RebuildKeyspaceGraph %v')", cell, keyspace, err, keyspace)
}
}

Просмотреть файл

@ -0,0 +1,202 @@
// Copyright 2012, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package topotools
import (
"bytes"
"encoding/hex"
"fmt"
"sync"
"github.com/youtube/vitess/go/vt/concurrency"
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/tabletmanager/actionnode"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/topo/topoproto"
"golang.org/x/net/context"
pb "github.com/youtube/vitess/go/vt/proto/topodata"
)
// RebuildKeyspace rebuilds the serving graph data while locking out other changes.
func RebuildKeyspace(ctx context.Context, log logutil.Logger, ts topo.Server, keyspace string, cells []string, rebuildSrvShards bool) error {
node := actionnode.RebuildKeyspace()
lockPath, err := node.LockKeyspace(ctx, ts, keyspace)
if err != nil {
return err
}
err = rebuildKeyspace(ctx, log, ts, keyspace, cells, rebuildSrvShards)
return node.UnlockKeyspace(ctx, ts, keyspace, lockPath, err)
}
// findCellsForRebuild will find all the cells in the given keyspace
// and create an entry if the map for them
func findCellsForRebuild(ki *topo.KeyspaceInfo, shardMap map[string]*topo.ShardInfo, cells []string, srvKeyspaceMap map[string]*pb.SrvKeyspace) {
for _, si := range shardMap {
for _, cell := range si.Cells {
if !topo.InCellList(cell, cells) {
continue
}
if _, ok := srvKeyspaceMap[cell]; !ok {
srvKeyspaceMap[cell] = &pb.SrvKeyspace{
ShardingColumnName: ki.ShardingColumnName,
ShardingColumnType: ki.ShardingColumnType,
ServedFrom: ki.ComputeCellServedFrom(cell),
SplitShardCount: ki.SplitShardCount,
}
}
}
}
}
// rebuildKeyspace should only be used with an action lock on the keyspace
// - otherwise the consistency of the serving graph data can't be
// guaranteed.
//
// Take data from the global keyspace and rebuild the local serving
// copies in each cell.
func rebuildKeyspace(ctx context.Context, log logutil.Logger, ts topo.Server, keyspace string, cells []string, rebuildSrvShards bool) error {
log.Infof("rebuildKeyspace %v", keyspace)
ki, err := ts.GetKeyspace(ctx, keyspace)
if err != nil {
return err
}
var shardCache map[string]*topo.ShardInfo
if rebuildSrvShards {
shards, err := ts.GetShardNames(ctx, keyspace)
if err != nil {
return nil
}
// Rebuild all shards in parallel, save the shards
shardCache = make(map[string]*topo.ShardInfo)
wg := sync.WaitGroup{}
mu := sync.Mutex{}
rec := concurrency.FirstErrorRecorder{}
for _, shard := range shards {
wg.Add(1)
go func(shard string) {
if shardInfo, err := RebuildShard(ctx, log, ts, keyspace, shard, cells); err != nil {
rec.RecordError(fmt.Errorf("RebuildShard failed: %v/%v %v", keyspace, shard, err))
} else {
mu.Lock()
shardCache[shard] = shardInfo
mu.Unlock()
}
wg.Done()
}(shard)
}
wg.Wait()
if rec.HasErrors() {
return rec.Error()
}
} else {
shardCache, err = ts.FindAllShardsInKeyspace(ctx, keyspace)
if err != nil {
return err
}
}
// Build the list of cells to work on: we get the union
// of all the Cells of all the Shards, limited to the provided cells.
//
// srvKeyspaceMap is a map:
// key: cell
// value: topo.SrvKeyspace object being built
srvKeyspaceMap := make(map[string]*pb.SrvKeyspace)
findCellsForRebuild(ki, shardCache, cells, srvKeyspaceMap)
// Then we add the cells from the keyspaces we might be 'ServedFrom'.
for _, ksf := range ki.ServedFroms {
servedFromShards, err := ts.FindAllShardsInKeyspace(ctx, ksf.Keyspace)
if err != nil {
return err
}
findCellsForRebuild(ki, servedFromShards, cells, srvKeyspaceMap)
}
// for each entry in the srvKeyspaceMap map, we do the following:
// - read the SrvShard structures for each shard / cell
// - if not present, build an empty one from global Shard
// - compute the union of the db types (replica, master, ...)
// - sort the shards in the list by range
// - check the ranges are compatible (no hole, covers everything)
for cell, srvKeyspace := range srvKeyspaceMap {
for _, si := range shardCache {
servedTypes := si.GetServedTypesPerCell(cell)
// for each type this shard is supposed to serve,
// add it to srvKeyspace.Partitions
for _, tabletType := range servedTypes {
partition := topoproto.SrvKeyspaceGetPartition(srvKeyspace, tabletType)
if partition == nil {
partition = &pb.SrvKeyspace_KeyspacePartition{
ServedType: tabletType,
}
srvKeyspace.Partitions = append(srvKeyspace.Partitions, partition)
}
partition.ShardReferences = append(partition.ShardReferences, &pb.ShardReference{
Name: si.ShardName(),
KeyRange: si.KeyRange,
})
}
}
if err := orderAndCheckPartitions(cell, srvKeyspace); err != nil {
return err
}
}
// and then finally save the keyspace objects
for cell, srvKeyspace := range srvKeyspaceMap {
log.Infof("updating keyspace serving graph in cell %v for %v", cell, keyspace)
if err := ts.UpdateSrvKeyspace(ctx, cell, keyspace, srvKeyspace); err != nil {
return fmt.Errorf("writing serving data failed: %v", err)
}
}
return nil
}
// orderAndCheckPartitions will re-order the partition list, and check
// it's correct.
func orderAndCheckPartitions(cell string, srvKeyspace *pb.SrvKeyspace) error {
// now check them all
for _, partition := range srvKeyspace.Partitions {
tabletType := partition.ServedType
topoproto.ShardReferenceArray(partition.ShardReferences).Sort()
// check the first Start is MinKey, the last End is MaxKey,
// and the values in between match: End[i] == Start[i+1]
first := partition.ShardReferences[0]
if first.KeyRange != nil && len(first.KeyRange.Start) != 0 {
return fmt.Errorf("keyspace partition for %v in cell %v does not start with min key", tabletType, cell)
}
last := partition.ShardReferences[len(partition.ShardReferences)-1]
if last.KeyRange != nil && len(last.KeyRange.End) != 0 {
return fmt.Errorf("keyspace partition for %v in cell %v does not end with max key", tabletType, cell)
}
for i := range partition.ShardReferences[0 : len(partition.ShardReferences)-1] {
fn := partition.ShardReferences[i].KeyRange == nil
sn := partition.ShardReferences[i+1].KeyRange == nil
if fn != sn {
return fmt.Errorf("shards with unconsistent KeyRanges for %v in cell %v at shard %v", tabletType, cell, i)
}
if fn {
// this is the custom sharding case, all KeyRanges must be nil
continue
}
if bytes.Compare(partition.ShardReferences[i].KeyRange.End, partition.ShardReferences[i+1].KeyRange.Start) != 0 {
return fmt.Errorf("non-contiguous KeyRange values for %v in cell %v at shard %v to %v: %v != %v", tabletType, cell, i, i+1, hex.EncodeToString(partition.ShardReferences[i].KeyRange.End), hex.EncodeToString(partition.ShardReferences[i+1].KeyRange.Start))
}
}
}
return nil
}

Просмотреть файл

@ -7,7 +7,6 @@ package topotools
import (
"fmt"
"sync"
"time"
"github.com/youtube/vitess/go/trace"
"github.com/youtube/vitess/go/vt/concurrency"
@ -25,7 +24,7 @@ import (
//
// This function will start each cell over from the beginning on ErrBadVersion,
// so it doesn't need a lock on the shard.
func RebuildShard(ctx context.Context, log logutil.Logger, ts topo.Server, keyspace, shard string, cells []string, lockTimeout time.Duration) (*topo.ShardInfo, error) {
func RebuildShard(ctx context.Context, log logutil.Logger, ts topo.Server, keyspace, shard string, cells []string) (*topo.ShardInfo, error) {
log.Infof("RebuildShard %v/%v", keyspace, shard)
span := trace.NewSpanFromContext(ctx)

Просмотреть файл

@ -8,7 +8,6 @@ import (
"fmt"
"strings"
"testing"
"time"
"golang.org/x/net/context"
@ -70,7 +69,7 @@ func TestRebuildShard(t *testing.T) {
replicaInfo := addTablet(ctx, t, ts, 2, cells[0], pb.TabletType_REPLICA)
// Do an initial rebuild.
if _, err := RebuildShard(ctx, logger, ts, testKeyspace, testShard, cells, time.Minute); err != nil {
if _, err := RebuildShard(ctx, logger, ts, testKeyspace, testShard, cells); err != nil {
t.Fatalf("RebuildShard: %v", err)
}
@ -95,7 +94,7 @@ func TestRebuildShard(t *testing.T) {
if err := ts.UpdateTablet(ctx, masterInfo); err != nil {
t.Fatalf("UpdateTablet: %v", err)
}
if _, err := RebuildShard(ctx, logger, ts, testKeyspace, testShard, cells, time.Minute); err != nil {
if _, err := RebuildShard(ctx, logger, ts, testKeyspace, testShard, cells); err != nil {
t.Fatalf("RebuildShard: %v", err)
}
@ -104,7 +103,7 @@ func TestRebuildShard(t *testing.T) {
if err := ts.UpdateTablet(ctx, replicaInfo); err != nil {
t.Fatalf("UpdateTablet: %v", err)
}
if _, err := RebuildShard(ctx, logger, ts, testKeyspace, testShard, cells, time.Minute); err != nil {
if _, err := RebuildShard(ctx, logger, ts, testKeyspace, testShard, cells); err != nil {
t.Fatalf("RebuildShard: %v", err)
}

Просмотреть файл

@ -31,7 +31,7 @@ func (f *FakeVtctlClient) FakeVtctlClientFactory(addr string, dialTimeout time.D
}
// ExecuteVtctlCommand is part of the vtctlclient interface.
func (f *FakeVtctlClient) ExecuteVtctlCommand(ctx context.Context, args []string, actionTimeout, lockTimeout time.Duration) (<-chan *logutil.LoggerEvent, vtctlclient.ErrFunc, error) {
func (f *FakeVtctlClient) ExecuteVtctlCommand(ctx context.Context, args []string, actionTimeout time.Duration) (<-chan *logutil.LoggerEvent, vtctlclient.ErrFunc, error) {
return f.FakeLoggerEventStreamingClient.StreamResult(args)
}

Просмотреть файл

@ -17,8 +17,7 @@ import (
type ExecuteVtctlCommandArgs struct {
Args []string
// Use wrangler.DefaultActionTimeout and actionnode.DefaultLockTimeout
// for decent default values here.
// ActionTimeout can be defaulted to wrangler.DefaultActionTimeout
// for a decent default value.
ActionTimeout time.Duration
LockTimeout time.Duration
}

Просмотреть файл

@ -34,11 +34,10 @@ func goRPCVtctlClientFactory(addr string, dialTimeout time.Duration) (vtctlclien
// ExecuteVtctlCommand is part of the VtctlClient interface.
// Note the bson rpc version doesn't honor timeouts in the context
// (but the server side will honor the actionTimeout)
func (client *goRPCVtctlClient) ExecuteVtctlCommand(ctx context.Context, args []string, actionTimeout, lockTimeout time.Duration) (<-chan *logutil.LoggerEvent, vtctlclient.ErrFunc, error) {
func (client *goRPCVtctlClient) ExecuteVtctlCommand(ctx context.Context, args []string, actionTimeout time.Duration) (<-chan *logutil.LoggerEvent, vtctlclient.ErrFunc, error) {
req := &gorpcproto.ExecuteVtctlCommandArgs{
Args: args,
ActionTimeout: actionTimeout,
LockTimeout: lockTimeout,
}
sr := make(chan *logutil.LoggerEvent, 10)
c := client.rpcClient.StreamGo("VtctlServer.ExecuteVtctlCommand", req, sr)

Просмотреть файл

@ -50,7 +50,7 @@ func (s *VtctlServer) ExecuteVtctlCommand(ctx context.Context, query *gorpcproto
}()
// create the wrangler
wr := wrangler.New(logger, s.ts, tmclient.NewTabletManagerClient(), query.LockTimeout)
wr := wrangler.New(logger, s.ts, tmclient.NewTabletManagerClient())
// FIXME(alainjobart) use a single context, copy the source info from it
ctx, cancel := context.WithTimeout(context.TODO(), query.ActionTimeout)

Просмотреть файл

@ -38,11 +38,10 @@ func gRPCVtctlClientFactory(addr string, dialTimeout time.Duration) (vtctlclient
}
// ExecuteVtctlCommand is part of the VtctlClient interface
func (client *gRPCVtctlClient) ExecuteVtctlCommand(ctx context.Context, args []string, actionTimeout, lockTimeout time.Duration) (<-chan *logutil.LoggerEvent, vtctlclient.ErrFunc, error) {
func (client *gRPCVtctlClient) ExecuteVtctlCommand(ctx context.Context, args []string, actionTimeout time.Duration) (<-chan *logutil.LoggerEvent, vtctlclient.ErrFunc, error) {
query := &pb.ExecuteVtctlCommandRequest{
Args: args,
ActionTimeout: int64(actionTimeout.Nanoseconds()),
LockTimeout: int64(lockTimeout.Nanoseconds()),
}
stream, err := client.c.ExecuteVtctlCommand(ctx, query)

Просмотреть файл

@ -10,7 +10,6 @@ package grpcvtctlserver
import (
"sync"
"time"
"google.golang.org/grpc"
@ -60,7 +59,7 @@ func (s *VtctlServer) ExecuteVtctlCommand(args *pb.ExecuteVtctlCommandRequest, s
}()
// create the wrangler
wr := wrangler.New(logger, s.ts, tmclient.NewTabletManagerClient(), time.Duration(args.LockTimeout))
wr := wrangler.New(logger, s.ts, tmclient.NewTabletManagerClient())
// execute the command
err = vtctl.RunCommand(stream.Context(), wr, args.Args)

Просмотреть файл

@ -25,7 +25,7 @@ type ErrFunc func() error
type VtctlClient interface {
// ExecuteVtctlCommand will execute the command remotely
// NOTE: ErrFunc should only be checked after the returned channel was closed to avoid races.
ExecuteVtctlCommand(ctx context.Context, args []string, actionTimeout, lockTimeout time.Duration) (<-chan *logutil.LoggerEvent, ErrFunc, error)
ExecuteVtctlCommand(ctx context.Context, args []string, actionTimeout time.Duration) (<-chan *logutil.LoggerEvent, ErrFunc, error)
// Close will terminate the connection. This object won't be
// used after this.

Просмотреть файл

@ -15,7 +15,7 @@ import (
// RunCommandAndWait executes a single command on a given vtctld and blocks until the command did return or timed out.
// Output from vtctld is streamed as logutil.LoggerEvent messages which have to be consumed by the caller who has to specify a "recv" function.
func RunCommandAndWait(ctx context.Context, server string, args []string, dialTimeout, actionTimeout, lockWaitTimeout time.Duration, recv func(*logutil.LoggerEvent)) error {
func RunCommandAndWait(ctx context.Context, server string, args []string, dialTimeout, actionTimeout time.Duration, recv func(*logutil.LoggerEvent)) error {
if recv == nil {
return errors.New("No function closure for LoggerEvent stream specified")
}
@ -29,7 +29,7 @@ func RunCommandAndWait(ctx context.Context, server string, args []string, dialTi
// run the command
ctx, cancel := context.WithTimeout(context.Background(), actionTimeout)
defer cancel()
c, errFunc, err := client.ExecuteVtctlCommand(ctx, args, actionTimeout, lockWaitTimeout)
c, errFunc, err := client.ExecuteVtctlCommand(ctx, args, actionTimeout)
if err != nil {
return fmt.Errorf("Cannot execute remote command: %v", err)
}

Просмотреть файл

@ -57,7 +57,7 @@ func TestSuite(t *testing.T, ts topo.Server, client vtctlclient.VtctlClient) {
}
// run a command that's gonna return something on the log channel
logs, errFunc, err := client.ExecuteVtctlCommand(ctx, []string{"ListAllTablets", "cell1"}, 30*time.Second, 10*time.Second)
logs, errFunc, err := client.ExecuteVtctlCommand(ctx, []string{"ListAllTablets", "cell1"}, 30*time.Second)
if err != nil {
t.Fatalf("Remote error: %v", err)
}
@ -78,7 +78,7 @@ func TestSuite(t *testing.T, ts topo.Server, client vtctlclient.VtctlClient) {
}
// run a command that's gonna fail
logs, errFunc, err = client.ExecuteVtctlCommand(ctx, []string{"ListAllTablets", "cell2"}, 30*time.Second, 10*time.Second)
logs, errFunc, err = client.ExecuteVtctlCommand(ctx, []string{"ListAllTablets", "cell2"}, 30*time.Second)
if err != nil {
t.Fatalf("Remote error: %v", err)
}
@ -92,7 +92,7 @@ func TestSuite(t *testing.T, ts topo.Server, client vtctlclient.VtctlClient) {
}
// run a command that's gonna panic
logs, errFunc, err = client.ExecuteVtctlCommand(ctx, []string{"Panic"}, 30*time.Second, 10*time.Second)
logs, errFunc, err = client.ExecuteVtctlCommand(ctx, []string{"Panic"}, 30*time.Second)
if err != nil {
t.Fatalf("Remote error: %v", err)
}

Просмотреть файл

@ -45,12 +45,11 @@ type Instance struct {
topoServer topo.Server
cell string
lockTimeout time.Duration
commandDisplayInterval time.Duration
}
// NewInstance creates a new Instance.
func NewInstance(ts topo.Server, cell string, lockTimeout, commandDisplayInterval time.Duration) *Instance {
func NewInstance(ts topo.Server, cell string, commandDisplayInterval time.Duration) *Instance {
wi := &Instance{topoServer: ts, cell: cell, commandDisplayInterval: commandDisplayInterval}
// Note: setAndStartWorker() also adds a MemoryLogger for the webserver.
wi.wr = wi.CreateWrangler(logutil.NewConsoleLogger())
@ -59,7 +58,7 @@ func NewInstance(ts topo.Server, cell string, lockTimeout, commandDisplayInterva
// CreateWrangler creates a new wrangler using the instance specific configuration.
func (wi *Instance) CreateWrangler(logger logutil.Logger) *wrangler.Wrangler {
return wrangler.New(logger, wi.topoServer, tmclient.NewTabletManagerClient(), wi.lockTimeout)
return wrangler.New(logger, wi.topoServer, tmclient.NewTabletManagerClient())
}
// setAndStartWorker will set the current worker.

Просмотреть файл

@ -244,7 +244,7 @@ func TestSplitClonePopulateBlpCheckpoint(t *testing.T) {
func testSplitClone(t *testing.T, strategy string) {
db := fakesqldb.Register()
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
wi := NewInstance(ts, "cell1", time.Second, time.Second)
wi := NewInstance(ts, "cell1", time.Second)
if err := ts.CreateKeyspace(context.Background(), "ks", &pbt.Keyspace{
ShardingColumnName: "keyspace_id",

Просмотреть файл

@ -150,7 +150,7 @@ func (sq *sourceTabletServer) StreamExecute(ctx context.Context, target *pb.Targ
func TestSplitDiff(t *testing.T) {
db := fakesqldb.Register()
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
wi := NewInstance(ts, "cell1", time.Second, time.Second)
wi := NewInstance(ts, "cell1", time.Second)
ctx := context.Background()
if err := ts.CreateKeyspace(context.Background(), "ks", &pbt.Keyspace{
@ -195,7 +195,7 @@ func TestSplitDiff(t *testing.T) {
// We need to use FakeTabletManagerClient because we don't
// have a good way to fake the binlog player yet, which is
// necessary for synchronizing replication.
wr := wrangler.New(logutil.NewConsoleLogger(), ts, faketmclient.NewFakeTabletManagerClient(), time.Second)
wr := wrangler.New(logutil.NewConsoleLogger(), ts, faketmclient.NewFakeTabletManagerClient())
excludedTable := "excludedTable1"
gwrk, err := commandSplitDiff(wi, wr, subFlags, []string{
"-exclude_tables", excludedTable,

Просмотреть файл

@ -229,7 +229,7 @@ func TestVerticalSplitClonePopulateBlpCheckpoint(t *testing.T) {
func testVerticalSplitClone(t *testing.T, strategy string) {
db := fakesqldb.Register()
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
wi := NewInstance(ts, "cell1", time.Second, time.Second)
wi := NewInstance(ts, "cell1", time.Second)
sourceMaster := testlib.NewFakeTablet(t, wi.wr, "cell1", 0,
pbt.TabletType_MASTER, db, testlib.TabletKeyspaceShard(t, "source_ks", "0"))

Просмотреть файл

@ -85,7 +85,7 @@ func (sq *verticalDiffTabletServer) StreamExecute(ctx context.Context, target *p
func TestVerticalSplitDiff(t *testing.T) {
db := fakesqldb.Register()
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
wi := NewInstance(ts, "cell1", time.Second, time.Second)
wi := NewInstance(ts, "cell1", time.Second)
ctx := context.Background()
sourceMaster := testlib.NewFakeTablet(t, wi.wr, "cell1", 0,
@ -139,7 +139,7 @@ func TestVerticalSplitDiff(t *testing.T) {
// We need to use FakeTabletManagerClient because we don't
// have a good way to fake the binlog player yet, which is
// necessary for synchronizing replication.
wr := wrangler.New(logutil.NewConsoleLogger(), ts, faketmclient.NewFakeTabletManagerClient(), time.Second)
wr := wrangler.New(logutil.NewConsoleLogger(), ts, faketmclient.NewFakeTabletManagerClient())
excludedTable := "excludedTable1"
subFlags := flag.NewFlagSet("VerticalSplitDiff", flag.ContinueOnError)
gwrk, err := commandVerticalSplitDiff(wi, wr, subFlags, []string{

Просмотреть файл

@ -30,7 +30,7 @@ func init() {
// CreateWorkerInstance returns a properly configured vtworker instance.
func CreateWorkerInstance(t *testing.T) *worker.Instance {
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
return worker.NewInstance(ts, "cell1", 30*time.Second, 1*time.Second)
return worker.NewInstance(ts, "cell1", 1*time.Second)
}
// TestSuite runs the test suite on the given vtworker and vtworkerclient

Просмотреть файл

@ -26,8 +26,6 @@ import (
// keyspace related methods for Wrangler
func (wr *Wrangler) lockKeyspace(ctx context.Context, keyspace string, actionNode *actionnode.ActionNode) (lockPath string, err error) {
ctx, cancel := context.WithTimeout(ctx, wr.lockTimeout)
defer cancel()
return actionNode.LockKeyspace(ctx, wr.ts, keyspace)
}

Просмотреть файл

@ -5,208 +5,23 @@
package wrangler
import (
"bytes"
"encoding/hex"
"fmt"
"sync"
"github.com/youtube/vitess/go/vt/concurrency"
"github.com/youtube/vitess/go/vt/tabletmanager/actionnode"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/topo/topoproto"
"github.com/youtube/vitess/go/vt/topotools"
"golang.org/x/net/context"
pb "github.com/youtube/vitess/go/vt/proto/topodata"
)
// RebuildShardGraph rebuilds the serving and replication rollup data data while locking
// out other changes.
func (wr *Wrangler) RebuildShardGraph(ctx context.Context, keyspace, shard string, cells []string) (*topo.ShardInfo, error) {
return topotools.RebuildShard(ctx, wr.logger, wr.ts, keyspace, shard, cells, wr.lockTimeout)
return topotools.RebuildShard(ctx, wr.logger, wr.ts, keyspace, shard, cells)
}
// RebuildKeyspaceGraph rebuilds the serving graph data while locking out other changes.
// If some shards were recently read / updated, pass them in the cache so
// we don't read them again (and possible get stale replicated data)
func (wr *Wrangler) RebuildKeyspaceGraph(ctx context.Context, keyspace string, cells []string, rebuildSrvShards bool) error {
actionNode := actionnode.RebuildKeyspace()
lockPath, err := wr.lockKeyspace(ctx, keyspace, actionNode)
if err != nil {
return err
}
err = wr.rebuildKeyspace(ctx, keyspace, cells, rebuildSrvShards)
return wr.unlockKeyspace(ctx, keyspace, actionNode, lockPath, err)
}
// findCellsForRebuild will find all the cells in the given keyspace
// and create an entry if the map for them
func (wr *Wrangler) findCellsForRebuild(ki *topo.KeyspaceInfo, shardMap map[string]*topo.ShardInfo, cells []string, srvKeyspaceMap map[string]*pb.SrvKeyspace) {
for _, si := range shardMap {
for _, cell := range si.Cells {
if !topo.InCellList(cell, cells) {
continue
}
if _, ok := srvKeyspaceMap[cell]; !ok {
srvKeyspaceMap[cell] = &pb.SrvKeyspace{
ShardingColumnName: ki.ShardingColumnName,
ShardingColumnType: ki.ShardingColumnType,
ServedFrom: ki.ComputeCellServedFrom(cell),
SplitShardCount: ki.SplitShardCount,
}
}
}
}
}
// This function should only be used with an action lock on the keyspace
// - otherwise the consistency of the serving graph data can't be
// guaranteed.
//
// Take data from the global keyspace and rebuild the local serving
// copies in each cell.
func (wr *Wrangler) rebuildKeyspace(ctx context.Context, keyspace string, cells []string, rebuildSrvShards bool) error {
wr.logger.Infof("rebuildKeyspace %v", keyspace)
ki, err := wr.ts.GetKeyspace(ctx, keyspace)
if err != nil {
return err
}
var shardCache map[string]*topo.ShardInfo
if rebuildSrvShards {
shards, err := wr.ts.GetShardNames(ctx, keyspace)
if err != nil {
return nil
}
// Rebuild all shards in parallel, save the shards
shardCache = make(map[string]*topo.ShardInfo)
wg := sync.WaitGroup{}
mu := sync.Mutex{}
rec := concurrency.FirstErrorRecorder{}
for _, shard := range shards {
wg.Add(1)
go func(shard string) {
if shardInfo, err := wr.RebuildShardGraph(ctx, keyspace, shard, cells); err != nil {
rec.RecordError(fmt.Errorf("RebuildShardGraph failed: %v/%v %v", keyspace, shard, err))
} else {
mu.Lock()
shardCache[shard] = shardInfo
mu.Unlock()
}
wg.Done()
}(shard)
}
wg.Wait()
if rec.HasErrors() {
return rec.Error()
}
} else {
shardCache, err = wr.ts.FindAllShardsInKeyspace(ctx, keyspace)
if err != nil {
return err
}
}
// Build the list of cells to work on: we get the union
// of all the Cells of all the Shards, limited to the provided cells.
//
// srvKeyspaceMap is a map:
// key: cell
// value: topo.SrvKeyspace object being built
srvKeyspaceMap := make(map[string]*pb.SrvKeyspace)
wr.findCellsForRebuild(ki, shardCache, cells, srvKeyspaceMap)
// Then we add the cells from the keyspaces we might be 'ServedFrom'.
for _, ksf := range ki.ServedFroms {
servedFromShards, err := wr.ts.FindAllShardsInKeyspace(ctx, ksf.Keyspace)
if err != nil {
return err
}
wr.findCellsForRebuild(ki, servedFromShards, cells, srvKeyspaceMap)
}
// for each entry in the srvKeyspaceMap map, we do the following:
// - read the SrvShard structures for each shard / cell
// - if not present, build an empty one from global Shard
// - compute the union of the db types (replica, master, ...)
// - sort the shards in the list by range
// - check the ranges are compatible (no hole, covers everything)
for cell, srvKeyspace := range srvKeyspaceMap {
for _, si := range shardCache {
servedTypes := si.GetServedTypesPerCell(cell)
// for each type this shard is supposed to serve,
// add it to srvKeyspace.Partitions
for _, tabletType := range servedTypes {
partition := topoproto.SrvKeyspaceGetPartition(srvKeyspace, tabletType)
if partition == nil {
partition = &pb.SrvKeyspace_KeyspacePartition{
ServedType: tabletType,
}
srvKeyspace.Partitions = append(srvKeyspace.Partitions, partition)
}
partition.ShardReferences = append(partition.ShardReferences, &pb.ShardReference{
Name: si.ShardName(),
KeyRange: si.KeyRange,
})
}
}
if err := wr.orderAndCheckPartitions(cell, srvKeyspace); err != nil {
return err
}
}
// and then finally save the keyspace objects
for cell, srvKeyspace := range srvKeyspaceMap {
wr.logger.Infof("updating keyspace serving graph in cell %v for %v", cell, keyspace)
if err := wr.ts.UpdateSrvKeyspace(ctx, cell, keyspace, srvKeyspace); err != nil {
return fmt.Errorf("writing serving data failed: %v", err)
}
}
return nil
}
// orderAndCheckPartitions will re-order the partition list, and check
// it's correct.
func (wr *Wrangler) orderAndCheckPartitions(cell string, srvKeyspace *pb.SrvKeyspace) error {
// now check them all
for _, partition := range srvKeyspace.Partitions {
tabletType := partition.ServedType
topoproto.ShardReferenceArray(partition.ShardReferences).Sort()
// check the first Start is MinKey, the last End is MaxKey,
// and the values in between match: End[i] == Start[i+1]
first := partition.ShardReferences[0]
if first.KeyRange != nil && len(first.KeyRange.Start) != 0 {
return fmt.Errorf("keyspace partition for %v in cell %v does not start with min key", tabletType, cell)
}
last := partition.ShardReferences[len(partition.ShardReferences)-1]
if last.KeyRange != nil && len(last.KeyRange.End) != 0 {
return fmt.Errorf("keyspace partition for %v in cell %v does not end with max key", tabletType, cell)
}
for i := range partition.ShardReferences[0 : len(partition.ShardReferences)-1] {
fn := partition.ShardReferences[i].KeyRange == nil
sn := partition.ShardReferences[i+1].KeyRange == nil
if fn != sn {
return fmt.Errorf("shards with unconsistent KeyRanges for %v in cell %v at shard %v", tabletType, cell, i)
}
if fn {
// this is the custom sharding case, all KeyRanges must be nil
continue
}
if bytes.Compare(partition.ShardReferences[i].KeyRange.End, partition.ShardReferences[i+1].KeyRange.Start) != 0 {
return fmt.Errorf("non-contiguous KeyRange values for %v in cell %v at shard %v to %v: %v != %v", tabletType, cell, i, i+1, hex.EncodeToString(partition.ShardReferences[i].KeyRange.End), hex.EncodeToString(partition.ShardReferences[i+1].KeyRange.Start))
}
}
}
return nil
return topotools.RebuildKeyspace(ctx, wr.logger, wr.ts, keyspace, cells, rebuildSrvShards)
}
func strInList(sl []string, s string) bool {

Просмотреть файл

@ -18,8 +18,6 @@ import (
// shard related methods for Wrangler
func (wr *Wrangler) lockShard(ctx context.Context, keyspace, shard string, actionNode *actionnode.ActionNode) (lockPath string, err error) {
ctx, cancel := context.WithTimeout(ctx, wr.lockTimeout)
defer cancel()
return actionNode.LockShard(ctx, wr.ts, keyspace, shard)
}

Просмотреть файл

@ -10,7 +10,6 @@ import (
"os"
"path"
"testing"
"time"
mproto "github.com/youtube/vitess/go/mysql/proto"
"github.com/youtube/vitess/go/vt/logutil"
@ -33,7 +32,7 @@ func TestBackupRestore(t *testing.T) {
ctx := context.Background()
db := fakesqldb.Register()
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
vp := NewVtctlPipe(t, ts)
defer vp.Close()

Просмотреть файл

@ -7,7 +7,6 @@ package testlib
import (
"fmt"
"testing"
"time"
"golang.org/x/net/context"
@ -103,7 +102,7 @@ func TestCopySchemaShard_UseShardAsSource(t *testing.T) {
func copySchema(t *testing.T, useShardAsSource bool) {
db := fakesqldb.Register()
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
vp := NewVtctlPipe(t, ts)
defer vp.Close()

Просмотреть файл

@ -25,7 +25,7 @@ import (
func TestEmergencyReparentShard(t *testing.T) {
db := fakesqldb.Register()
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
vp := NewVtctlPipe(t, ts)
defer vp.Close()
@ -146,7 +146,7 @@ func TestEmergencyReparentShardMasterElectNotBest(t *testing.T) {
ctx := context.Background()
db := fakesqldb.Register()
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
// Create a master, a couple good slaves
oldMaster := NewFakeTablet(t, wr, "cell1", 0, pb.TabletType_MASTER, db)

Просмотреть файл

@ -29,7 +29,7 @@ func TestInitMasterShard(t *testing.T) {
ctx := context.Background()
db := fakesqldb.Register()
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
vp := NewVtctlPipe(t, ts)
defer vp.Close()
@ -129,7 +129,7 @@ func TestInitMasterShardChecks(t *testing.T) {
ctx := context.Background()
db := fakesqldb.Register()
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
master := NewFakeTablet(t, wr, "cell1", 0, pb.TabletType_MASTER, db)
@ -167,7 +167,7 @@ func TestInitMasterShardOneSlaveFails(t *testing.T) {
ctx := context.Background()
db := fakesqldb.Register()
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
// Create a master, a couple slaves
master := NewFakeTablet(t, wr, "cell1", 0, pb.TabletType_MASTER, db)

Просмотреть файл

@ -7,7 +7,6 @@ package testlib
import (
"reflect"
"testing"
"time"
mproto "github.com/youtube/vitess/go/mysql/proto"
"github.com/youtube/vitess/go/sqltypes"
@ -26,7 +25,7 @@ func TestMigrateServedFrom(t *testing.T) {
ctx := context.Background()
db := fakesqldb.Register()
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
vp := NewVtctlPipe(t, ts)
defer vp.Close()

Просмотреть файл

@ -6,7 +6,6 @@ package testlib
import (
"testing"
"time"
mproto "github.com/youtube/vitess/go/mysql/proto"
"github.com/youtube/vitess/go/sqltypes"
@ -36,7 +35,7 @@ func checkShardServedTypes(t *testing.T, ts topo.Server, shard string, expected
func TestMigrateServedTypes(t *testing.T) {
db := fakesqldb.Register()
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
vp := NewVtctlPipe(t, ts)
defer vp.Close()

Просмотреть файл

@ -7,7 +7,6 @@ package testlib
import (
"strings"
"testing"
"time"
mproto "github.com/youtube/vitess/go/mysql/proto"
"github.com/youtube/vitess/go/sqltypes"
@ -26,7 +25,7 @@ func TestPermissions(t *testing.T) {
ctx := context.Background()
db := fakesqldb.Register()
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
vp := NewVtctlPipe(t, ts)
defer vp.Close()

Просмотреть файл

@ -7,7 +7,6 @@ package testlib
import (
"fmt"
"testing"
"time"
"github.com/youtube/vitess/go/vt/logutil"
myproto "github.com/youtube/vitess/go/vt/mysqlctl/proto"
@ -24,7 +23,7 @@ import (
func TestPlannedReparentShard(t *testing.T) {
db := fakesqldb.Register()
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
vp := NewVtctlPipe(t, ts)
defer vp.Close()

Просмотреть файл

@ -33,7 +33,7 @@ func TestTabletExternallyReparented(t *testing.T) {
ctx := context.Background()
db := fakesqldb.Register()
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
vp := NewVtctlPipe(t, ts)
defer vp.Close()
@ -171,7 +171,7 @@ func TestTabletExternallyReparentedWithDifferentMysqlPort(t *testing.T) {
ctx := context.Background()
db := fakesqldb.Register()
ts := zktopo.NewTestServer(t, []string{"cell1"})
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
// Create an old master, a new master, two good slaves, one bad slave
oldMaster := NewFakeTablet(t, wr, "cell1", 0, pb.TabletType_MASTER, db)
@ -220,7 +220,7 @@ func TestTabletExternallyReparentedContinueOnUnexpectedMaster(t *testing.T) {
ctx := context.Background()
db := fakesqldb.Register()
ts := zktopo.NewTestServer(t, []string{"cell1"})
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
// Create an old master, a new master, two good slaves, one bad slave
oldMaster := NewFakeTablet(t, wr, "cell1", 0, pb.TabletType_MASTER, db)
@ -263,7 +263,7 @@ func TestTabletExternallyReparentedFailedOldMaster(t *testing.T) {
ctx := context.Background()
db := fakesqldb.Register()
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
// Create an old master, a new master, and a good slave.
oldMaster := NewFakeTablet(t, wr, "cell1", 0, pb.TabletType_MASTER, db)

Просмотреть файл

@ -7,7 +7,6 @@ package testlib
import (
"fmt"
"testing"
"time"
"github.com/youtube/vitess/go/vt/logutil"
myproto "github.com/youtube/vitess/go/vt/mysqlctl/proto"
@ -25,7 +24,7 @@ func TestShardReplicationStatuses(t *testing.T) {
ctx := context.Background()
db := fakesqldb.Register()
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
// create shard and tablets
if err := ts.CreateShard(ctx, "test_keyspace", "0"); err != nil {
@ -94,7 +93,7 @@ func TestReparentTablet(t *testing.T) {
ctx := context.Background()
db := fakesqldb.Register()
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
// create shard and tablets
if err := ts.CreateShard(ctx, "test_keyspace", "0"); err != nil {

Просмотреть файл

@ -10,7 +10,6 @@ import (
"net/http"
"strings"
"testing"
"time"
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/tabletmanager/tmclient"
@ -52,7 +51,7 @@ func TestVersion(t *testing.T) {
// Initialize our environment
db := fakesqldb.Register()
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
vp := NewVtctlPipe(t, ts)
defer vp.Close()

Просмотреть файл

@ -72,10 +72,9 @@ func (vp *VtctlPipe) Close() {
// test logs, and returns the command error.
func (vp *VtctlPipe) Run(args []string) error {
actionTimeout := 30 * time.Second
lockTimeout := 10 * time.Second
ctx := context.Background()
c, errFunc, err := vp.client.ExecuteVtctlCommand(ctx, args, actionTimeout, lockTimeout)
c, errFunc, err := vp.client.ExecuteVtctlCommand(ctx, args, actionTimeout)
if err != nil {
return fmt.Errorf("VtctlPipe.Run() failed: %v", err)
}

Просмотреть файл

@ -80,7 +80,7 @@ func TestWaitForFilteredReplication_unhealthy(t *testing.T) {
func waitForFilteredReplication(t *testing.T, expectedErr string, initialStats *pbq.RealtimeStats, broadcastStatsFunc func() *pbq.RealtimeStats) {
db := fakesqldb.Register()
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), time.Second)
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
vp := NewVtctlPipe(t, ts)
defer vp.Close()

Просмотреть файл

@ -7,8 +7,6 @@
package wrangler
import (
"time"
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/tabletmanager/actionnode"
"github.com/youtube/vitess/go/vt/tabletmanager/tmclient"
@ -32,22 +30,14 @@ type Wrangler struct {
logger logutil.Logger
ts topo.Server
tmc tmclient.TabletManagerClient
lockTimeout time.Duration
}
// New creates a new Wrangler object.
//
// lockTimeout: how long should we wait for the initial lock to start
// a complex action? This is distinct from the context timeout because most
// of the time, we want to immediately know that our action will
// fail. However, automated action will need some time to arbitrate
// the locks.
func New(logger logutil.Logger, ts topo.Server, tmc tmclient.TabletManagerClient, lockTimeout time.Duration) *Wrangler {
func New(logger logutil.Logger, ts topo.Server, tmc tmclient.TabletManagerClient) *Wrangler {
return &Wrangler{
logger: logger,
ts: ts,
tmc: tmc,
lockTimeout: lockTimeout,
}
}

Просмотреть файл

@ -246,12 +246,13 @@ func (zkts *Server) GetSrvKeyspace(ctx context.Context, cell, keyspace string) (
}
return nil, err
}
if len(data) == 0 {
return nil, topo.ErrNoNode
}
srvKeyspace := &pb.SrvKeyspace{}
if len(data) > 0 {
if err := json.Unmarshal([]byte(data), srvKeyspace); err != nil {
return nil, fmt.Errorf("SrvKeyspace unmarshal failed: %v %v", data, err)
}
}
return srvKeyspace, nil
}

Просмотреть файл

@ -13,9 +13,6 @@ namespace vtctldata {
/** @var int */
public $action_timeout = null;
/** @var int */
public $lock_timeout = null;
/** @var \Closure[] */
protected static $__extensions = array();
@ -40,14 +37,6 @@ namespace vtctldata {
$f->rule = \DrSlump\Protobuf::RULE_OPTIONAL;
$descriptor->addField($f);
// OPTIONAL INT64 lock_timeout = 3
$f = new \DrSlump\Protobuf\Field();
$f->number = 3;
$f->name = "lock_timeout";
$f->type = \DrSlump\Protobuf::TYPE_INT64;
$f->rule = \DrSlump\Protobuf::RULE_OPTIONAL;
$descriptor->addField($f);
foreach (self::$__extensions as $cb) {
$descriptor->addField($cb(), true);
}
@ -148,43 +137,6 @@ namespace vtctldata {
public function setActionTimeout( $value){
return $this->_set(2, $value);
}
/**
* Check if <lock_timeout> has a value
*
* @return boolean
*/
public function hasLockTimeout(){
return $this->_has(3);
}
/**
* Clear <lock_timeout> value
*
* @return \vtctldata\ExecuteVtctlCommandRequest
*/
public function clearLockTimeout(){
return $this->_clear(3);
}
/**
* Get <lock_timeout> value
*
* @return int
*/
public function getLockTimeout(){
return $this->_get(3);
}
/**
* Set <lock_timeout> value
*
* @param int $value
* @return \vtctldata\ExecuteVtctlCommandRequest
*/
public function setLockTimeout( $value){
return $this->_set(3, $value);
}
}
}

Просмотреть файл

@ -12,7 +12,6 @@ import "logutil.proto";
message ExecuteVtctlCommandRequest {
repeated string args = 1;
int64 action_timeout = 2;
int64 lock_timeout = 3;
}
// ExecuteVtctlCommandResponse is streamed back by ExecuteVtctlCommand.

Просмотреть файл

@ -23,97 +23,214 @@ message Session {
repeated ShardSession shard_sessions = 2;
}
// ExecuteRequest is the payload to Execute
// ExecuteRequest is the payload to Execute.
message ExecuteRequest {
// caller_id identifies the caller. This is the effective caller ID,
// set by the application to further identify the caller.
vtrpc.CallerID caller_id = 1;
// session carries the current transaction data. It is returned by Begin.
// Do not fill it in if outside of a transaction.
Session session = 2;
// query is the query and bind variables to execute.
query.BoundQuery query = 3;
// tablet_type is the type of tablets that this query is targeted to.
topodata.TabletType tablet_type = 4;
// not_in_transaction is deprecated and should not be used.
bool not_in_transaction = 5;
}
// ExecuteResponse is the returned value from Execute
// ExecuteResponse is the returned value from Execute.
message ExecuteResponse {
// error contains an application level error if necessary. Note the
// session may have changed, even when an error is returned (for
// instance if a database integrity error happened).
vtrpc.RPCError error = 1;
// session is the updated session information (only returned inside a transaction).
Session session = 2;
// result contains the query result, only set if error is unset.
query.QueryResult result = 3;
}
// ExecuteShardsRequest is the payload to ExecuteShards
// ExecuteShardsRequest is the payload to ExecuteShards.
message ExecuteShardsRequest {
// caller_id identifies the caller. This is the effective caller ID,
// set by the application to further identify the caller.
vtrpc.CallerID caller_id = 1;
// session carries the current transaction data. It is returned by Begin.
// Do not fill it in if outside of a transaction.
Session session = 2;
// query is the query and bind variables to execute.
query.BoundQuery query = 3;
// keyspace to target the query to.
string keyspace = 4;
// shards to target the query to. A DML can only target one shard.
repeated string shards = 5;
// tablet_type is the type of tablets that this query is targeted to.
topodata.TabletType tablet_type = 6;
// not_in_transaction is deprecated and should not be used.
bool not_in_transaction = 7;
}
// ExecuteShardsResponse is the returned value from ExecuteShards
// ExecuteShardsResponse is the returned value from ExecuteShards.
message ExecuteShardsResponse {
// error contains an application level error if necessary. Note the
// session may have changed, even when an error is returned (for
// instance if a database integrity error happened).
vtrpc.RPCError error = 1;
// session is the updated session information (only returned inside a transaction).
Session session = 2;
// result contains the query result, only set if error is unset.
query.QueryResult result = 3;
}
// ExecuteKeyspaceIdsRequest is the payload to ExecuteKeyspaceIds
// ExecuteKeyspaceIdsRequest is the payload to ExecuteKeyspaceIds.
message ExecuteKeyspaceIdsRequest {
// caller_id identifies the caller. This is the effective caller ID,
// set by the application to further identify the caller.
vtrpc.CallerID caller_id = 1;
// session carries the current transaction data. It is returned by Begin.
// Do not fill it in if outside of a transaction.
Session session = 2;
// query is the query and bind variables to execute.
query.BoundQuery query = 3;
// keyspace to target the query to.
string keyspace = 4;
// keyspace_ids contains the list of keyspace_ids affected by this query.
// Will be used to find the shards to send the query to.
repeated bytes keyspace_ids = 5;
// tablet_type is the type of tablets that this query is targeted to.
topodata.TabletType tablet_type = 6;
// not_in_transaction is deprecated and should not be used.
bool not_in_transaction = 7;
}
// ExecuteKeyspaceIdsResponse is the returned value from ExecuteKeyspaceIds
// ExecuteKeyspaceIdsResponse is the returned value from ExecuteKeyspaceIds.
message ExecuteKeyspaceIdsResponse {
// error contains an application level error if necessary. Note the
// session may have changed, even when an error is returned (for
// instance if a database integrity error happened).
vtrpc.RPCError error = 1;
// session is the updated session information (only returned inside a transaction).
Session session = 2;
// result contains the query result, only set if error is unset.
query.QueryResult result = 3;
}
// ExecuteKeyRangesRequest is the payload to ExecuteKeyRanges
// ExecuteKeyRangesRequest is the payload to ExecuteKeyRanges.
message ExecuteKeyRangesRequest {
// caller_id identifies the caller. This is the effective caller ID,
// set by the application to further identify the caller.
vtrpc.CallerID caller_id = 1;
// session carries the current transaction data. It is returned by Begin.
// Do not fill it in if outside of a transaction.
Session session = 2;
// query is the query and bind variables to execute.
query.BoundQuery query = 3;
// keyspace to target the query to
string keyspace = 4;
// key_ranges contains the list of key ranges affected by this query.
// Will be used to find the shards to send the query to.
repeated topodata.KeyRange key_ranges = 5;
// tablet_type is the type of tablets that this query is targeted to.
topodata.TabletType tablet_type = 6;
// not_in_transaction is deprecated and should not be used.
bool not_in_transaction = 7;
}
// ExecuteKeyRangesResponse is the returned value from ExecuteKeyRanges
// ExecuteKeyRangesResponse is the returned value from ExecuteKeyRanges.
message ExecuteKeyRangesResponse {
// error contains an application level error if necessary. Note the
// session may have changed, even when an error is returned (for
// instance if a database integrity error happened).
vtrpc.RPCError error = 1;
// session is the updated session information (only returned inside a transaction).
Session session = 2;
// result contains the query result, only set if error is unset.
query.QueryResult result = 3;
}
// ExecuteEntityIdsRequest is the payload to ExecuteEntityIds
// ExecuteEntityIdsRequest is the payload to ExecuteEntityIds.
message ExecuteEntityIdsRequest {
// caller_id identifies the caller. This is the effective caller ID,
// set by the application to further identify the caller.
vtrpc.CallerID caller_id = 1;
// session carries the current transaction data. It is returned by Begin.
// Do not fill it in if outside of a transaction.
Session session = 2;
// query is the query and bind variables to execute.
query.BoundQuery query = 3;
// keyspace to target the query to.
string keyspace = 4;
// entity_column_name is the column name to use.
string entity_column_name = 5;
message EntityId {
// xid_type is the type of the entity's value. Can be NULL.
query.Type xid_type = 1;
// xid_value is the value for the entity. Not set if xid_type is NULL.
bytes xid_value = 2;
// keyspace_id is the associated keyspace_id for the entity.
bytes keyspace_id = 3;
}
// entity_keyspace_ids are pairs of entity_column_name values
// associated with its corresponding keyspace_id.
repeated EntityId entity_keyspace_ids = 6;
// tablet_type is the type of tablets that this query is targeted to.
topodata.TabletType tablet_type = 7;
// not_in_transaction is deprecated and should not be used.
bool not_in_transaction = 8;
}
// ExecuteEntityIdsResponse is the returned value from ExecuteEntityIds
// ExecuteEntityIdsResponse is the returned value from ExecuteEntityIds.
message ExecuteEntityIdsResponse {
// error contains an application level error if necessary. Note the
// session may have changed, even when an error is returned (for
// instance if a database integrity error happened).
vtrpc.RPCError error = 1;
// session is the updated session information (only returned inside a transaction).
Session session = 2;
// result contains the query result, only set if error is unset.
query.QueryResult result = 3;
}
@ -121,24 +238,49 @@ message ExecuteEntityIdsResponse {
// specified list of shards. This is used in a list for
// ExecuteBatchShardsRequest.
message BoundShardQuery {
// query is the query and bind variables to execute.
query.BoundQuery query = 1;
// keyspace to target the query to.
string keyspace = 2;
// shards to target the query to. A DML can only target one shard.
repeated string shards = 3;
}
// ExecuteBatchShardsRequest is the payload to ExecuteBatchShards
message ExecuteBatchShardsRequest {
// caller_id identifies the caller. This is the effective caller ID,
// set by the application to further identify the caller.
vtrpc.CallerID caller_id = 1;
// session carries the current transaction data. It is returned by Begin.
// Do not fill it in if outside of a transaction.
Session session = 2;
// queries carries all the queries to execute.
repeated BoundShardQuery queries = 3;
// tablet_type is the type of tablets that this query is targeted to.
topodata.TabletType tablet_type = 4;
// as_transaction will execute the queries in this batch in a single transaction per shard, created for this purpose.
// (this can be seen as adding a 'begin' before and 'commit' after the queries).
// Only makes sense if tablet_type is master. If set, the Session is ignored.
bool as_transaction = 5;
}
// ExecuteBatchShardsResponse is the returned value from ExecuteBatchShards
// ExecuteBatchShardsResponse is the returned value from ExecuteBatchShards.
message ExecuteBatchShardsResponse {
// error contains an application level error if necessary. Note the
// session may have changed, even when an error is returned (for
// instance if a database integrity error happened).
vtrpc.RPCError error = 1;
// session is the updated session information (only returned inside a transaction).
Session session = 2;
// result contains the query result, only set if error is unset.
repeated query.QueryResult results = 3;
}
@ -146,145 +288,258 @@ message ExecuteBatchShardsResponse {
// specified list of keyspace ids. This is used in a list for
// ExecuteBatchKeyspaceIdsRequest.
message BoundKeyspaceIdQuery {
// query is the query and bind variables to execute.
query.BoundQuery query = 1;
// keyspace to target the query to.
string keyspace = 2;
// keyspace_ids contains the list of keyspace_ids affected by this query.
// Will be used to find the shards to send the query to.
repeated bytes keyspace_ids = 3;
}
// ExecuteBatchKeyspaceIdsRequest is the payload to ExecuteBatchKeyspaceId
// ExecuteBatchKeyspaceIdsRequest is the payload to ExecuteBatchKeyspaceId.
message ExecuteBatchKeyspaceIdsRequest {
// caller_id identifies the caller. This is the effective caller ID,
// set by the application to further identify the caller.
vtrpc.CallerID caller_id = 1;
// session carries the current transaction data. It is returned by Begin.
// Do not fill it in if outside of a transaction.
Session session = 2;
repeated BoundKeyspaceIdQuery queries = 3;
// tablet_type is the type of tablets that this query is targeted to.
topodata.TabletType tablet_type = 4;
// as_transaction will execute the queries in this batch in a single transaction per shard, created for this purpose.
// (this can be seen as adding a 'begin' before and 'commit' after the queries).
// Only makes sense if tablet_type is master. If set, the Session is ignored.
bool as_transaction = 5;
}
// ExecuteBatchKeyspaceIdsResponse is the returned value from ExecuteBatchKeyspaceId
// ExecuteBatchKeyspaceIdsResponse is the returned value from ExecuteBatchKeyspaceId.
message ExecuteBatchKeyspaceIdsResponse {
// error contains an application level error if necessary. Note the
// session may have changed, even when an error is returned (for
// instance if a database integrity error happened).
vtrpc.RPCError error = 1;
// session is the updated session information (only returned inside a transaction).
Session session = 2;
// result contains the query result, only set if error is unset.
repeated query.QueryResult results = 3;
}
// StreamExecuteRequest is the payload to StreamExecute
// StreamExecuteRequest is the payload to StreamExecute.
message StreamExecuteRequest {
// caller_id identifies the caller. This is the effective caller ID,
// set by the application to further identify the caller.
vtrpc.CallerID caller_id = 1;
// query is the query and bind variables to execute.
query.BoundQuery query = 2;
// tablet_type is the type of tablets that this query is targeted to.
topodata.TabletType tablet_type = 3;
}
// StreamExecuteResponse is the returned value from StreamExecute
// StreamExecuteResponse is the returned value from StreamExecute.
message StreamExecuteResponse {
// result contains the result data.
// The first value contains only Fields information.
// The next values contain the actual rows, a few values per result.
query.QueryResult result = 1;
}
// StreamExecuteShardsRequest is the payload to StreamExecuteShards
// StreamExecuteShardsRequest is the payload to StreamExecuteShards.
message StreamExecuteShardsRequest {
// caller_id identifies the caller. This is the effective caller ID,
// set by the application to further identify the caller.
vtrpc.CallerID caller_id = 1;
// query is the query and bind variables to execute.
query.BoundQuery query = 2;
// keyspace to target the query to.
string keyspace = 3;
// shards to target the query to.
repeated string shards = 4;
// tablet_type is the type of tablets that this query is targeted to.
topodata.TabletType tablet_type = 5;
}
// StreamExecuteShardsResponse is the returned value from StreamExecuteShards
// StreamExecuteShardsResponse is the returned value from StreamExecuteShards.
message StreamExecuteShardsResponse {
// result contains the result data.
// The first value contains only Fields information.
// The next values contain the actual rows, a few values per result.
query.QueryResult result = 1;
}
// StreamExecuteKeyspaceIdsRequest is the payload to StreamExecuteKeyspaceIds
// StreamExecuteKeyspaceIdsRequest is the payload to StreamExecuteKeyspaceIds.
message StreamExecuteKeyspaceIdsRequest {
// caller_id identifies the caller. This is the effective caller ID,
// set by the application to further identify the caller.
vtrpc.CallerID caller_id = 1;
// query is the query and bind variables to execute.
query.BoundQuery query = 2;
// keyspace to target the query to.
string keyspace = 3;
// keyspace_ids contains the list of keyspace_ids affected by this query.
// Will be used to find the shards to send the query to.
repeated bytes keyspace_ids = 4;
// tablet_type is the type of tablets that this query is targeted to.
topodata.TabletType tablet_type = 5;
}
// StreamExecuteKeyspaceIdsResponse is the returned value from StreamExecuteKeyspaceIds
// StreamExecuteKeyspaceIdsResponse is the returned value from StreamExecuteKeyspaceIds.
message StreamExecuteKeyspaceIdsResponse {
// result contains the result data.
// The first value contains only Fields information.
// The next values contain the actual rows, a few values per result.
query.QueryResult result = 1;
}
// StreamExecuteKeyRangesRequest is the payload to StreamExecuteKeyRanges
// StreamExecuteKeyRangesRequest is the payload to StreamExecuteKeyRanges.
message StreamExecuteKeyRangesRequest {
// caller_id identifies the caller. This is the effective caller ID,
// set by the application to further identify the caller.
vtrpc.CallerID caller_id = 1;
// query is the query and bind variables to execute.
query.BoundQuery query = 2;
// keyspace to target the query to.
string keyspace = 3;
// key_ranges contains the list of key ranges affected by this query.
// Will be used to find the shards to send the query to.
repeated topodata.KeyRange key_ranges = 4;
// tablet_type is the type of tablets that this query is targeted to.
topodata.TabletType tablet_type = 5;
}
// StreamExecuteKeyRangesResponse is the returned value from StreamExecuteKeyRanges
// StreamExecuteKeyRangesResponse is the returned value from StreamExecuteKeyRanges.
message StreamExecuteKeyRangesResponse {
// result contains the result data.
// The first value contains only Fields information.
// The next values contain the actual rows, a few values per result.
query.QueryResult result = 1;
}
// BeginRequest is the payload to Begin
// BeginRequest is the payload to Begin.
message BeginRequest {
// caller_id identifies the caller. This is the effective caller ID,
// set by the application to further identify the caller.
vtrpc.CallerID caller_id = 1;
}
// BeginResponse is the returned value from Begin
// BeginResponse is the returned value from Begin.
message BeginResponse {
// session is the initial session information to use for subsequent queries.
Session session = 1;
}
// CommitRequest is the payload to Commit
// CommitRequest is the payload to Commit.
message CommitRequest {
// caller_id identifies the caller. This is the effective caller ID,
// set by the application to further identify the caller.
vtrpc.CallerID caller_id = 1;
// session carries the current transaction data to commit.
Session session = 2;
}
// CommitResponse is the returned value from Commit
// CommitResponse is the returned value from Commit.
message CommitResponse {
}
// RollbackRequest is the payload to Rollback
// RollbackRequest is the payload to Rollback.
message RollbackRequest {
// caller_id identifies the caller. This is the effective caller ID,
// set by the application to further identify the caller.
vtrpc.CallerID caller_id = 1;
// session carries the current transaction data to rollback.
Session session = 2;
}
// RollbackResponse is the returned value from Rollback
// RollbackResponse is the returned value from Rollback.
message RollbackResponse {
}
// SplitQueryRequest is the payload to SplitQuery
// SplitQueryRequest is the payload to SplitQuery.
message SplitQueryRequest {
// caller_id identifies the caller. This is the effective caller ID,
// set by the application to further identify the caller.
vtrpc.CallerID caller_id = 1;
// keyspace to target the query to.
string keyspace = 2;
// query is the query and bind variables to produce splits for.
query.BoundQuery query = 3;
// split_column is an optional hint on the column to use to split the query.
string split_column = 4;
// split_count describes how many splits we want for this query.
int64 split_count = 5;
}
// SplitQueryResponse is the returned value from SplitQuery
// SplitQueryResponse is the returned value from SplitQuery.
message SplitQueryResponse {
message KeyRangePart {
// keyspace to target the query to.
string keyspace = 1;
// key ranges to target the query to.
repeated topodata.KeyRange key_ranges = 2;
}
message ShardPart {
// keyspace to target the query to.
string keyspace = 1;
// shards to target the query to.
repeated string shards = 2;
}
message Part {
// query is the query and bind variables to execute.
query.BoundQuery query = 1;
// key_range_part is set if the query should be executed by ExecuteKeyRanges.
KeyRangePart key_range_part = 2;
// shard_part is set if the query should be executed by ExecuteShards.
ShardPart shard_part = 3;
// size is the approximate number of rows this query will return.
int64 size = 4;
}
// splits contains the queries to run to fetch the entire data set.
repeated Part splits = 1;
}
// GetSrvKeyspaceRequest is the payload to GetSrvKeyspace
// GetSrvKeyspaceRequest is the payload to GetSrvKeyspace.
message GetSrvKeyspaceRequest {
// keyspace name to fetch.
string keyspace = 1;
}
// GetSrvKeyspaceResponse is the returned value from GetSrvKeyspace
// GetSrvKeyspaceResponse is the returned value from GetSrvKeyspace.
message GetSrvKeyspaceResponse {
// srv_keyspace is the topology object for the SrvKeyspace.
topodata.SrvKeyspace srv_keyspace = 1;
}

Просмотреть файл

@ -39,11 +39,10 @@ class GoRpcVtctlClient(vtctl_client.VtctlClient):
def is_closed(self):
return self.client.is_closed()
def execute_vtctl_command(self, args, action_timeout=30.0, lock_timeout=5.0):
def execute_vtctl_command(self, args, action_timeout=30.0):
req = {
'Args': args,
'ActionTimeout': long(action_timeout * 1e9),
'LockTimeout': long(lock_timeout * 1e9),
}
self.client.stream_call('VtctlServer.ExecuteVtctlCommand', req)
while True:

Просмотреть файл

@ -95,13 +95,12 @@ class VtctlClient(object):
"""
pass
def execute_vtctl_command(self, args, action_timeout=30.0, lock_timeout=5.0):
def execute_vtctl_command(self, args, action_timeout=30.0):
"""Executes a remote command on the vtctl server.
Args:
args: Command line to run.
action_timeout: total timeout for the action (float, in seconds).
lock_timeout: timeout for locking topology (float, in seconds).
Returns:
This is a generator method that yields Event objects.
@ -110,7 +109,7 @@ class VtctlClient(object):
def execute_vtctl_command(client, args, action_timeout=30.0,
lock_timeout=5.0, info_to_debug=False):
info_to_debug=False):
"""This is a helper method that executes a remote vtctl command.
It logs the output to the logging module, and returns the console output.
@ -119,7 +118,6 @@ def execute_vtctl_command(client, args, action_timeout=30.0,
client: VtctlClient object to use.
args: Command line to run.
action_timeout: total timeout for the action (float, in seconds).
lock_timeout: timeout for locking topology (float, in seconds).
info_to_debug: if set, changes the info messages to debug.
Returns:
@ -127,8 +125,7 @@ def execute_vtctl_command(client, args, action_timeout=30.0,
"""
console_result = ''
for e in client.execute_vtctl_command(args, action_timeout=action_timeout,
lock_timeout=lock_timeout):
for e in client.execute_vtctl_command(args, action_timeout=action_timeout):
if e.level == Event.INFO:
if info_to_debug:
logging.debug('%s', e.value)

Просмотреть файл

@ -18,7 +18,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
name='vtctldata.proto',
package='vtctldata',
syntax='proto3',
serialized_pb=b'\n\x0fvtctldata.proto\x12\tvtctldata\x1a\rlogutil.proto\"X\n\x1a\x45xecuteVtctlCommandRequest\x12\x0c\n\x04\x61rgs\x18\x01 \x03(\t\x12\x16\n\x0e\x61\x63tion_timeout\x18\x02 \x01(\x03\x12\x14\n\x0clock_timeout\x18\x03 \x01(\x03\"<\n\x1b\x45xecuteVtctlCommandResponse\x12\x1d\n\x05\x65vent\x18\x01 \x01(\x0b\x32\x0e.logutil.Eventb\x06proto3'
serialized_pb=b'\n\x0fvtctldata.proto\x12\tvtctldata\x1a\rlogutil.proto\"B\n\x1a\x45xecuteVtctlCommandRequest\x12\x0c\n\x04\x61rgs\x18\x01 \x03(\t\x12\x16\n\x0e\x61\x63tion_timeout\x18\x02 \x01(\x03\"<\n\x1b\x45xecuteVtctlCommandResponse\x12\x1d\n\x05\x65vent\x18\x01 \x01(\x0b\x32\x0e.logutil.Eventb\x06proto3'
,
dependencies=[logutil__pb2.DESCRIPTOR,])
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
@ -47,13 +47,6 @@ _EXECUTEVTCTLCOMMANDREQUEST = _descriptor.Descriptor(
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='lock_timeout', full_name='vtctldata.ExecuteVtctlCommandRequest.lock_timeout', index=2,
number=3, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
@ -67,7 +60,7 @@ _EXECUTEVTCTLCOMMANDREQUEST = _descriptor.Descriptor(
oneofs=[
],
serialized_start=45,
serialized_end=133,
serialized_end=111,
)
@ -97,8 +90,8 @@ _EXECUTEVTCTLCOMMANDRESPONSE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=135,
serialized_end=195,
serialized_start=113,
serialized_end=173,
)
_EXECUTEVTCTLCOMMANDRESPONSE.fields_by_name['event'].message_type = logutil__pb2._EVENT

Просмотреть файл

@ -12,10 +12,10 @@
import logging
import unittest
from mysql_flavor import mysql_flavor
import environment
import tablet
import utils
from mysql_flavor import mysql_flavor
from vtdb import keyrange_constants
from vtdb import update_stream
@ -57,8 +57,6 @@ def setUpModule():
utils.run_vtctl(['RebuildShardGraph', 'test_keyspace/0'])
utils.validate_topology()
utils.run_vtctl(['RebuildKeyspaceGraph', 'test_keyspace'], auto_log=True)
for t in [src_master, src_replica, src_rdonly1, src_rdonly2]:
t.create_db('vt_test_keyspace')
t.start_vttablet(wait_for_state=None)
@ -94,7 +92,6 @@ def setUpModule():
utils.run_vtctl(['InitShardMaster', 'test_keyspace/-',
dst_master.tablet_alias], auto_log=True)
utils.run_vtctl(['RebuildKeyspaceGraph', 'test_keyspace'], auto_log=True)
# copy the schema
utils.run_vtctl(['CopySchemaShard', src_replica.tablet_alias,

Просмотреть файл

@ -136,10 +136,10 @@ index by_msg (msg)
# _insert_startup_value inserts a value in the MySQL database before it
# is sharded
def _insert_startup_value(self, tablet, table, id, msg):
def _insert_startup_value(self, tablet, table, mid, msg):
tablet.mquery('vt_test_keyspace', [
'begin',
'insert into %s(id, msg) values(%d, "%s")' % (table, id, msg),
'insert into %s(id, msg) values(%d, "%s")' % (table, mid, msg),
'commit'
], write=True)
@ -159,7 +159,7 @@ index by_msg (msg)
# _insert_value inserts a value in the MySQL database along with the comments
# required for routing.
def _insert_value(self, tablet, table, id, msg, keyspace_id):
def _insert_value(self, tablet, table, mid, msg, keyspace_id):
k = utils.uint64_to_hex(keyspace_id)
tablet.mquery(
'vt_test_keyspace',
@ -167,40 +167,40 @@ index by_msg (msg)
'insert into %s(id, msg, keyspace_id) '
'values(%d, "%s", 0x%x) /* vtgate:: keyspace_id:%s */ '
'/* user_id:%d */' %
(table, id, msg, keyspace_id, k, id),
(table, mid, msg, keyspace_id, k, mid),
'commit'],
write=True)
def _get_value(self, tablet, table, id):
def _get_value(self, tablet, table, mid):
return tablet.mquery(
'vt_test_keyspace',
'select id, msg, keyspace_id from %s where id=%d' % (table, id))
'select id, msg, keyspace_id from %s where id=%d' % (table, mid))
def _check_value(self, tablet, table, id, msg, keyspace_id,
def _check_value(self, tablet, table, mid, msg, keyspace_id,
should_be_here=True):
result = self._get_value(tablet, table, id)
result = self._get_value(tablet, table, mid)
if keyspace_id_type == keyrange_constants.KIT_BYTES:
fmt = '%s'
keyspace_id = pack_keyspace_id(keyspace_id)
else:
fmt = '%x'
if should_be_here:
self.assertEqual(result, ((id, msg, keyspace_id),),
self.assertEqual(result, ((mid, msg, keyspace_id),),
('Bad row in tablet %s for id=%d, keyspace_id=' +
fmt + ', row=%s') % (tablet.tablet_alias, id,
fmt + ', row=%s') % (tablet.tablet_alias, mid,
keyspace_id, str(result)))
else:
self.assertEqual(len(result), 0,
('Extra row in tablet %s for id=%d, keyspace_id=' +
fmt + ': %s') % (tablet.tablet_alias, id, keyspace_id,
fmt + ': %s') % (tablet.tablet_alias, mid, keyspace_id,
str(result)))
# _is_value_present_and_correct tries to read a value.
# if it is there, it will check it is correct and return True if it is.
# if not correct, it will self.fail.
# if not there, it will return False.
def _is_value_present_and_correct(self, tablet, table, id, msg, keyspace_id):
result = self._get_value(tablet, table, id)
def _is_value_present_and_correct(self, tablet, table, mid, msg, keyspace_id):
result = self._get_value(tablet, table, mid)
if len(result) == 0:
return False
if keyspace_id_type == keyrange_constants.KIT_BYTES:
@ -208,9 +208,9 @@ index by_msg (msg)
keyspace_id = pack_keyspace_id(keyspace_id)
else:
fmt = '%x'
self.assertEqual(result, ((id, msg, keyspace_id),),
self.assertEqual(result, ((mid, msg, keyspace_id),),
('Bad row in tablet %s for id=%d, keyspace_id=' + fmt) % (
tablet.tablet_alias, id, keyspace_id))
tablet.tablet_alias, mid, keyspace_id))
return True
def _check_startup_values(self):
@ -286,8 +286,6 @@ index by_msg (msg)
shard_replica.init_tablet('replica', 'test_keyspace', '0')
shard_rdonly1.init_tablet('rdonly', 'test_keyspace', '0')
utils.run_vtctl(['RebuildKeyspaceGraph', 'test_keyspace'], auto_log=True)
# create databases so vttablet can start behaving normally
for t in [shard_master, shard_replica, shard_rdonly1]:
t.create_db('vt_test_keyspace')

Просмотреть файл

@ -4,8 +4,7 @@
# Copyright 2015, Google Inc. All rights reserved.
# Use of this source code is governed by a BSD-style license that can
# be found in the LICENSE file.
"""
This module allows you to bring up and tear down keyspaces.
"""This module allows you to bring up and tear down keyspaces.
"""
import os
@ -16,6 +15,7 @@ import utils
class TestEnv(object):
"""Main class for this module."""
def __init__(self):
self.tablet_map = {}
@ -40,8 +40,6 @@ class TestEnv(object):
procs.append(self._start_tablet(keyspace, shard, 'rdonly', i))
utils.wait_procs(procs)
utils.run_vtctl(['RebuildKeyspaceGraph', keyspace], auto_log=True)
for t in self.tablets:
t.create_db('vt_' + keyspace)
t.start_vttablet(
@ -57,8 +55,6 @@ class TestEnv(object):
# Force read-write even if there are no replicas.
utils.run_vtctl(['SetReadWrite', t.tablet_alias], auto_log=True)
utils.run_vtctl(['RebuildKeyspaceGraph', keyspace], auto_log=True)
for ddl in ddls:
fname = os.path.join(environment.tmproot, 'ddl.sql')
with open(fname, 'w') as f:

Просмотреть файл

@ -8,8 +8,8 @@ warnings.simplefilter('ignore')
import unittest
import environment
import utils
import tablet
import utils
master_tablet = tablet.Tablet()
replica_tablet = tablet.Tablet()
@ -30,8 +30,6 @@ def setUpModule():
master_tablet.init_tablet('master', 'test_keyspace', '0')
replica_tablet.init_tablet('replica', 'test_keyspace', '0')
utils.run_vtctl(['RebuildShardGraph', 'test_keyspace/0'])
utils.run_vtctl(['RebuildKeyspaceGraph', 'test_keyspace'], auto_log=True)
master_tablet.create_db('vt_test_keyspace')
replica_tablet.create_db('vt_test_keyspace')

Просмотреть файл

@ -42,7 +42,6 @@ def setUpModule():
utils.run_vtctl(['CreateKeyspace', 'test_keyspace'])
master_tablet.init_tablet('master', 'test_keyspace', '0')
replica_tablet.init_tablet('replica', 'test_keyspace', '0')
utils.run_vtctl(['RebuildKeyspaceGraph', 'test_keyspace'])
utils.validate_topology()
master_tablet.populate('vt_test_keyspace', create_vt_insert_test)
@ -139,10 +138,10 @@ class RowCacheInvalidator(unittest.TestCase):
def test_cache_invalidation(self):
self._wait_for_replica()
invalidations = self.replica_stats()['Totals']['Invalidations']
invalidatorStats = self.replica_vars()
invalidator_stats = self.replica_vars()
logging.debug(
'Invalidations %d InvalidatorStats %s',
invalidations, invalidatorStats['RowcacheInvalidatorPosition'])
invalidations, invalidator_stats['RowcacheInvalidatorPosition'])
self.assertTrue(
invalidations > 0, 'Invalidations are not flowing through.')
@ -150,19 +149,19 @@ class RowCacheInvalidator(unittest.TestCase):
'select min(id) from vt_insert_test')
self.assertNotEqual(res[0][0], None,
'Cannot proceed, no rows in vt_insert_test')
id = int(res[0][0])
mid = int(res[0][0])
stats_dict = self.replica_stats()['vt_insert_test']
logging.debug('vt_insert_test stats %s', stats_dict)
misses = stats_dict['Misses']
hits = stats_dict['Hits']
replica_tablet.execute('select * from vt_insert_test where id=:id',
bindvars={'id': id})
bindvars={'id': mid})
stats_dict = self.replica_stats()['vt_insert_test']
self.assertEqual(stats_dict['Misses'] - misses, 1,
"This shouldn't have hit the cache")
replica_tablet.execute('select * from vt_insert_test where id=:id',
bindvars={'id': id})
bindvars={'id': mid})
stats_dict = self.replica_stats()['vt_insert_test']
self.assertEqual(stats_dict['Hits'] - hits, 1,
'This should have hit the cache')
@ -267,10 +266,10 @@ class RowCacheInvalidator(unittest.TestCase):
(inv_count2, 200), timeout, sleep_time=0.1)
# check and display some stats
invalidatorStats = self.replica_vars()
logging.debug('invalidatorStats %s',
invalidatorStats['RowcacheInvalidatorPosition'])
self.assertEqual(invalidatorStats['RowcacheInvalidatorState'], 'Running',
invalidator_stats = self.replica_vars()
logging.debug('invalidator_stats %s',
invalidator_stats['RowcacheInvalidatorPosition'])
self.assertEqual(invalidator_stats['RowcacheInvalidatorState'], 'Running',
'Row-cache invalidator should be enabled')
def test_cache_hit(self):

Просмотреть файл

@ -5,8 +5,8 @@ import unittest
import os
import environment
import utils
import tablet
import utils
shard_0_master = tablet.Tablet()
shard_0_replica1 = tablet.Tablet()
@ -41,8 +41,6 @@ def setUpModule():
shard_1_master.init_tablet('master', test_keyspace, '1')
shard_1_replica1.init_tablet('replica', test_keyspace, '1')
utils.run_vtctl(['RebuildKeyspaceGraph', test_keyspace], auto_log=True)
# run checks now before we start the tablets
utils.validate_topology()
@ -129,11 +127,11 @@ class TestSchema(unittest.TestCase):
tablets.remove(t)
utils.run_vtctl(['DeleteShard', 'test_keyspace/2'], auto_log=True)
def _check_tables(self, tablet, expectedCount):
def _check_tables(self, tablet, expected_count):
tables = tablet.mquery(db_name, 'show tables')
self.assertEqual(len(tables), expectedCount,
self.assertEqual(len(tables), expected_count,
'Unexpected table count on %s (not %d): got tables: %s' %
(tablet.tablet_alias, expectedCount, str(tables)))
(tablet.tablet_alias, expected_count, str(tables)))
def _check_db_not_created(self, tablet):
# Broadly catch all exceptions, since the exception being raised
@ -221,7 +219,7 @@ class TestSchema(unittest.TestCase):
self._check_tables(shard_0_master, 5)
self._check_tables(shard_1_master, 5)
def _setUp_tablets_shard_2(self):
def _setup_tablets_shard_2(self):
try:
_init_mysql(tablets_shard2)
finally:
@ -253,7 +251,7 @@ class TestSchema(unittest.TestCase):
def _test_vtctl_copyschemashard(self, source):
self._apply_initial_schema()
self._setUp_tablets_shard_2()
self._setup_tablets_shard_2()
# InitShardMaster creates the db, but there shouldn't be any tables yet.
self._check_tables(shard_2_master, 0)

Просмотреть файл

@ -150,12 +150,12 @@ def setUpModule():
]
utils.wait_procs(setup_procs)
utils.run_vtctl("CreateKeyspace test_keyspace")
utils.run_vtctl(["CreateKeyspace", "test_keyspace"])
shard_0_master.init_tablet("master", "test_keyspace", "0")
shard_0_slave.init_tablet("replica", "test_keyspace", "0")
utils.run_vtctl("RebuildKeyspaceGraph test_keyspace", auto_log=True)
utils.run_vtctl(["RebuildKeyspaceGraph", "test_keyspace"], auto_log=True)
# create databases so vttablet can start behaving normally
shard_0_master.create_db("vt_test_keyspace")

Просмотреть файл

@ -83,8 +83,6 @@ class TestSharded(unittest.TestCase):
shard_1_master.init_tablet('master', 'test_keyspace', '80-')
shard_1_replica.init_tablet('replica', 'test_keyspace', '80-')
utils.run_vtctl(['RebuildKeyspaceGraph', 'test_keyspace'], auto_log=True)
# run checks now before we start the tablets
utils.validate_topology()
@ -152,7 +150,7 @@ class TestSharded(unittest.TestCase):
# we created the schema differently, so it should show
utils.run_vtctl(['ValidateSchemaShard', 'test_keyspace/-80'])
utils.run_vtctl(['ValidateSchemaShard', 'test_keyspace/80-'])
out, err = utils.run_vtctl(['ValidateSchemaKeyspace', 'test_keyspace'],
_, err = utils.run_vtctl(['ValidateSchemaKeyspace', 'test_keyspace'],
trap_output=True, raise_on_error=False)
if ('test_nj-0000062344 and test_nj-0000062346 disagree on schema '
'for table vt_select_test:\nCREATE TABLE' not in err or

Просмотреть файл

@ -79,8 +79,6 @@ def setUpModule():
replica_tablet.create_db('vt_test_keyspace')
replica_tablet.create_db('other_database')
utils.run_vtctl(['RebuildKeyspaceGraph', 'test_keyspace'])
utils.VtGate().start()
master_tablet.start_vttablet()
@ -277,8 +275,6 @@ class TestUpdateStream(unittest.TestCase):
)
logging.debug('run_test_stream_parity starting @ %s',
master_start_position)
master_txn_count = 0
replica_txn_count = 0
self._exec_vt_txn(self._populate_vt_a(15))
self._exec_vt_txn(self._populate_vt_b(14))
self._exec_vt_txn(['delete from vt_a'])
@ -288,14 +284,12 @@ class TestUpdateStream(unittest.TestCase):
for stream_event in master_conn.stream_update(master_start_position):
master_events.append(stream_event)
if stream_event.category == update_stream.StreamEvent.POS:
master_txn_count += 1
break
replica_events = []
replica_conn = self._get_replica_stream_conn()
for stream_event in replica_conn.stream_update(replica_start_position):
replica_events.append(stream_event)
if stream_event.category == update_stream.StreamEvent.POS:
replica_txn_count += 1
break
if len(master_events) != len(replica_events):
logging.debug(
@ -313,7 +307,6 @@ class TestUpdateStream(unittest.TestCase):
logging.debug('Test Writes: PASS')
def test_ddl(self):
global master_start_position
start_position = master_start_position
logging.debug('test_ddl: starting @ %s', start_position)
master_conn = self._get_master_stream_conn()

Просмотреть файл

@ -279,7 +279,6 @@ index by_msg (msg)
# rebuild destination keyspace to make sure there is a serving
# graph entry, even though there is no tablet yet.
utils.run_vtctl(['RebuildKeyspaceGraph', 'source_keyspace'], auto_log=True)
utils.run_vtctl(['RebuildKeyspaceGraph', 'destination_keyspace'],
auto_log=True)
self._check_srv_keyspace('ServedFrom(master): source_keyspace\n'
@ -291,7 +290,6 @@ index by_msg (msg)
destination_rdonly1.init_tablet('rdonly', 'destination_keyspace', '0')
destination_rdonly2.init_tablet('rdonly', 'destination_keyspace', '0')
utils.run_vtctl(['RebuildKeyspaceGraph', 'source_keyspace'], auto_log=True)
utils.run_vtctl(['RebuildKeyspaceGraph', 'destination_keyspace'],
auto_log=True)
self._check_srv_keyspace('ServedFrom(master): source_keyspace\n'