зеркало из https://github.com/github/vitess-gh.git
Update VTGate V2
This commit is contained in:
Родитель
1269153768
Коммит
5fc302d596
|
@ -19,8 +19,8 @@ func (entityId *EntityId) MarshalBson(buf *bytes2.ChunkedWriter, key string) {
|
|||
bson.EncodeOptionalPrefix(buf, bson.Object, key)
|
||||
lenWriter := bson.NewLenWriter(buf)
|
||||
|
||||
bson.EncodeInterface(buf, "ExternalId", entityId.ExternalId)
|
||||
entityId.KeyspaceId.MarshalBson(buf, "KeyspaceId")
|
||||
bson.EncodeInterface(buf, "ExternalID", entityId.ExternalID)
|
||||
entityId.KeyspaceID.MarshalBson(buf, "KeyspaceID")
|
||||
|
||||
lenWriter.Close()
|
||||
}
|
||||
|
@ -39,10 +39,10 @@ func (entityId *EntityId) UnmarshalBson(buf *bytes.Buffer, kind byte) {
|
|||
|
||||
for kind := bson.NextByte(buf); kind != bson.EOO; kind = bson.NextByte(buf) {
|
||||
switch bson.ReadCString(buf) {
|
||||
case "ExternalId":
|
||||
entityId.ExternalId = bson.DecodeInterface(buf, kind)
|
||||
case "KeyspaceId":
|
||||
entityId.KeyspaceId.UnmarshalBson(buf, kind)
|
||||
case "ExternalID":
|
||||
entityId.ExternalID = bson.DecodeInterface(buf, kind)
|
||||
case "KeyspaceID":
|
||||
entityId.KeyspaceID.UnmarshalBson(buf, kind)
|
||||
default:
|
||||
bson.Skip(buf, kind)
|
||||
}
|
||||
|
|
|
@ -33,9 +33,9 @@ func (entityIdsQuery *EntityIdsQuery) MarshalBson(buf *bytes2.ChunkedWriter, key
|
|||
bson.EncodeString(buf, "EntityColumnName", entityIdsQuery.EntityColumnName)
|
||||
// []EntityId
|
||||
{
|
||||
bson.EncodePrefix(buf, bson.Array, "EntityKeyspaceIds")
|
||||
bson.EncodePrefix(buf, bson.Array, "EntityKeyspaceIDs")
|
||||
lenWriter := bson.NewLenWriter(buf)
|
||||
for _i, _v2 := range entityIdsQuery.EntityKeyspaceIds {
|
||||
for _i, _v2 := range entityIdsQuery.EntityKeyspaceIDs {
|
||||
_v2.MarshalBson(buf, bson.Itoa(_i))
|
||||
}
|
||||
lenWriter.Close()
|
||||
|
@ -86,19 +86,19 @@ func (entityIdsQuery *EntityIdsQuery) UnmarshalBson(buf *bytes.Buffer, kind byte
|
|||
entityIdsQuery.Keyspace = bson.DecodeString(buf, kind)
|
||||
case "EntityColumnName":
|
||||
entityIdsQuery.EntityColumnName = bson.DecodeString(buf, kind)
|
||||
case "EntityKeyspaceIds":
|
||||
case "EntityKeyspaceIDs":
|
||||
// []EntityId
|
||||
if kind != bson.Null {
|
||||
if kind != bson.Array {
|
||||
panic(bson.NewBsonError("unexpected kind %v for entityIdsQuery.EntityKeyspaceIds", kind))
|
||||
panic(bson.NewBsonError("unexpected kind %v for entityIdsQuery.EntityKeyspaceIDs", kind))
|
||||
}
|
||||
bson.Next(buf, 4)
|
||||
entityIdsQuery.EntityKeyspaceIds = make([]EntityId, 0, 8)
|
||||
entityIdsQuery.EntityKeyspaceIDs = make([]EntityId, 0, 8)
|
||||
for kind := bson.NextByte(buf); kind != bson.EOO; kind = bson.NextByte(buf) {
|
||||
bson.SkipIndex(buf)
|
||||
var _v2 EntityId
|
||||
_v2.UnmarshalBson(buf, kind)
|
||||
entityIdsQuery.EntityKeyspaceIds = append(entityIdsQuery.EntityKeyspaceIds, _v2)
|
||||
entityIdsQuery.EntityKeyspaceIDs = append(entityIdsQuery.EntityKeyspaceIDs, _v2)
|
||||
}
|
||||
}
|
||||
case "TabletType":
|
||||
|
|
|
@ -73,8 +73,8 @@ type KeyRangeQuery struct {
|
|||
|
||||
// EntityId represents a tuple of external_id and keyspace_id
|
||||
type EntityId struct {
|
||||
ExternalId interface{}
|
||||
KeyspaceId kproto.KeyspaceId
|
||||
ExternalID interface{}
|
||||
KeyspaceID kproto.KeyspaceId
|
||||
}
|
||||
|
||||
// EntityIdsQuery represents a query request for the specified KeyspaceId map.
|
||||
|
@ -83,7 +83,7 @@ type EntityIdsQuery struct {
|
|||
BindVariables map[string]interface{}
|
||||
Keyspace string
|
||||
EntityColumnName string
|
||||
EntityKeyspaceIds []EntityId
|
||||
EntityKeyspaceIDs []EntityId
|
||||
TabletType topo.TabletType
|
||||
Session *Session
|
||||
}
|
||||
|
|
|
@ -127,16 +127,16 @@ func (res *Resolver) ExecuteEntityIds(
|
|||
context interface{},
|
||||
query *proto.EntityIdsQuery,
|
||||
) (*mproto.QueryResult, error) {
|
||||
shardIdMap, err := mapEntityIdsToShards(
|
||||
shardIDMap, err := mapEntityIdsToShards(
|
||||
res.scatterConn.toposerv,
|
||||
res.scatterConn.cell,
|
||||
query.Keyspace,
|
||||
query.EntityKeyspaceIds,
|
||||
query.EntityKeyspaceIDs,
|
||||
query.TabletType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
shards, sqls, bindVars := buildEntityIds(shardIdMap, query.Sql, query.EntityColumnName, query.BindVariables)
|
||||
shards, sqls, bindVars := buildEntityIds(shardIDMap, query.Sql, query.EntityColumnName, query.BindVariables)
|
||||
for {
|
||||
qr, err := res.scatterConn.ExecuteEntityIds(
|
||||
context,
|
||||
|
@ -159,16 +159,16 @@ func (res *Resolver) ExecuteEntityIds(
|
|||
resharding = true
|
||||
}
|
||||
// check shards change for horizontal resharding
|
||||
newShardIdMap, err := mapEntityIdsToShards(
|
||||
newShardIDMap, err := mapEntityIdsToShards(
|
||||
res.scatterConn.toposerv,
|
||||
res.scatterConn.cell,
|
||||
query.Keyspace,
|
||||
query.EntityKeyspaceIds,
|
||||
query.EntityKeyspaceIDs,
|
||||
query.TabletType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
newShards, newSqls, newBindVars := buildEntityIds(newShardIdMap, query.Sql, query.EntityColumnName, query.BindVariables)
|
||||
newShards, newSqls, newBindVars := buildEntityIds(newShardIDMap, query.Sql, query.EntityColumnName, query.BindVariables)
|
||||
if !StrsEquals(newShards, shards) {
|
||||
shards = newShards
|
||||
sqls = newSqls
|
||||
|
@ -347,11 +347,11 @@ func StrsEquals(a, b []string) bool {
|
|||
return true
|
||||
}
|
||||
|
||||
func buildEntityIds(shardIdMap map[string][]interface{}, qSql, entityColName string, qBindVars map[string]interface{}) ([]string, map[string]string, map[string]map[string]interface{}) {
|
||||
func buildEntityIds(shardIDMap map[string][]interface{}, qSql, entityColName string, qBindVars map[string]interface{}) ([]string, map[string]string, map[string]map[string]interface{}) {
|
||||
shards := make([]string, 0, 1)
|
||||
sqls := make(map[string]string)
|
||||
bindVars := make(map[string]map[string]interface{})
|
||||
for shard, ids := range shardIdMap {
|
||||
for shard, ids := range shardIDMap {
|
||||
var b bytes.Buffer
|
||||
b.Write([]byte(entityColName))
|
||||
b.Write([]byte(" in ("))
|
||||
|
|
|
@ -77,14 +77,14 @@ func TestResolverExecuteEntityIds(t *testing.T) {
|
|||
Sql: "query",
|
||||
Keyspace: "TestResolverExecuteEntityIds",
|
||||
EntityColumnName: "col",
|
||||
EntityKeyspaceIds: []proto.EntityId{
|
||||
EntityKeyspaceIDs: []proto.EntityId{
|
||||
proto.EntityId{
|
||||
ExternalId: 0,
|
||||
KeyspaceId: kid10,
|
||||
ExternalID: 0,
|
||||
KeyspaceID: kid10,
|
||||
},
|
||||
proto.EntityId{
|
||||
ExternalId: "1",
|
||||
KeyspaceId: kid25,
|
||||
ExternalID: "1",
|
||||
KeyspaceID: kid25,
|
||||
},
|
||||
},
|
||||
TabletType: topo.TYPE_MASTER,
|
||||
|
|
|
@ -64,11 +64,11 @@ func mapEntityIdsToShards(topoServ SrvTopoServer, cell, keyspace string, entityI
|
|||
}
|
||||
var shards = make(map[string][]interface{})
|
||||
for _, eid := range entityIds {
|
||||
shard, err := getShardForKeyspaceId(allShards, eid.KeyspaceId)
|
||||
shard, err := getShardForKeyspaceId(allShards, eid.KeyspaceID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
shards[shard] = append(shards[shard], eid.ExternalId)
|
||||
shards[shard] = append(shards[shard], eid.ExternalID)
|
||||
}
|
||||
return shards, nil
|
||||
}
|
||||
|
|
|
@ -227,10 +227,10 @@ func TestVTGateExecuteEntityIds(t *testing.T) {
|
|||
Sql: "query",
|
||||
Keyspace: "TestVTGateExecuteEntityIds",
|
||||
EntityColumnName: "kid",
|
||||
EntityKeyspaceIds: []proto.EntityId{
|
||||
EntityKeyspaceIDs: []proto.EntityId{
|
||||
proto.EntityId{
|
||||
ExternalId: "id1",
|
||||
KeyspaceId: kid10,
|
||||
ExternalID: "id1",
|
||||
KeyspaceID: kid10,
|
||||
},
|
||||
},
|
||||
TabletType: topo.TYPE_MASTER,
|
||||
|
@ -280,7 +280,7 @@ func TestVTGateExecuteEntityIds(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Errorf("want nil, got %+v", err)
|
||||
}
|
||||
q.EntityKeyspaceIds = append(q.EntityKeyspaceIds, proto.EntityId{ExternalId: "id2", KeyspaceId: kid30})
|
||||
q.EntityKeyspaceIDs = append(q.EntityKeyspaceIDs, proto.EntityId{ExternalID: "id2", KeyspaceID: kid30})
|
||||
RpcVTGate.ExecuteEntityIds(nil, &q, qr)
|
||||
if qr.RowsAffected != 2 {
|
||||
t.Errorf("want 2, got %v", qr.RowsAffected)
|
||||
|
|
|
@ -189,15 +189,14 @@ class VTGateConnection(object):
|
|||
def _execute_entity_ids(self, sql, bind_variables, keyspace, tablet_type, entity_keyspace_id_map, entity_column_name):
|
||||
sql, new_binds = dbapi.prepare_query_bind_vars(sql, bind_variables)
|
||||
new_binds = field_types.convert_bind_vars(new_binds)
|
||||
new_entity_kid_map = dict()
|
||||
for k in entity_keyspace_id_map:
|
||||
new_entity_kid_map[str(k)] = str(entity_keyspace_id_map[k])
|
||||
req = {
|
||||
'Sql': sql,
|
||||
'BindVariables': new_binds,
|
||||
'Keyspace': keyspace,
|
||||
'TabletType': tablet_type,
|
||||
'EntityKeyspaceIdMap': new_entity_kid_map,
|
||||
'EntityKeyspaceIDs': [
|
||||
{'ExternalID': xid, 'KeyspaceID': str(kid)}
|
||||
for xid, kid in entity_keyspace_id_map.iteritems()],
|
||||
'EntityColumnName': entity_column_name,
|
||||
}
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче