add SplitColumn field to SplitQuery

1. Add SplitColumn field to SplitQuery so that caller could hint SplitQuery endpoint
   which column is best for splitting query.
2. The split column must be indexed and this will be verified on the server side.
3. coding style fixes suggested by golint.
This commit is contained in:
Shengzhe Yao 2015-05-19 16:03:31 -07:00
Родитель e8bd9ebfbb
Коммит 199aa60881
10 изменённых файлов: 117 добавлений и 39 удалений

Просмотреть файл

@ -11,15 +11,20 @@ import (
mproto "github.com/youtube/vitess/go/mysql/proto"
)
// SessionParams is passed to GetSessionId. The server will
// double-check the keyspace and shard are what the tablet is serving.
type SessionParams struct {
Keyspace string
Shard string
}
// SessionInfo is returned by GetSessionId. Use the provided
// session_id in the Session object for any subsequent call.
type SessionInfo struct {
SessionId int64
}
// Query is the payload to Execute.
type Query struct {
Sql string
BindVariables map[string]interface{}
@ -56,6 +61,7 @@ func slimit(s string) string {
return s[:l]
}
// BoundQuery is one query in a QueryList.
type BoundQuery struct {
Sql string
BindVariables map[string]interface{}
@ -63,6 +69,7 @@ type BoundQuery struct {
//go:generate bsongen -file $GOFILE -type BoundQuery -o bound_query_bson.go
// QueryList is the payload to ExecuteBatch.
type QueryList struct {
Queries []BoundQuery
SessionId int64
@ -71,12 +78,14 @@ type QueryList struct {
//go:generate bsongen -file $GOFILE -type QueryList -o query_list_bson.go
// QueryResultList is the return type for ExecuteBatch.
type QueryResultList struct {
List []mproto.QueryResult
}
//go:generate bsongen -file $GOFILE -type QueryResultList -o query_result_list_bson.go
// Session is passed to all calls.
type Session struct {
SessionId int64
TransactionId int64
@ -84,17 +93,23 @@ type Session struct {
//go:generate bsongen -file $GOFILE -type Session -o session_bson.go
// TransactionInfo is returned by Begin. Use the provided
// transaction_id in the Session object for any subsequent call to be inside
// the transaction.
type TransactionInfo struct {
TransactionId int64
}
// SplitQueryRequest represents a request to split a Query into queries that
// each return a subset of the original query.
// TODO(anandhenry): Add SessionId to this struct.
// SplitColumn: preferred column to split. Server will pick a random PK column
// if this field is empty or returns an error if this field is not
// empty but not found in schema info or not be indexed.
type SplitQueryRequest struct {
Query BoundQuery
SplitCount int
SessionID int64
Query BoundQuery
SplitColumn string
SplitCount int
SessionID int64
}
// QuerySplit represents a split of SplitQueryRequest.Query. RowCount is only

Просмотреть файл

@ -17,26 +17,31 @@ import (
// one primary key and the leading primary key must be numeric, see
// QuerySplitter.splitBoundaries()
type QuerySplitter struct {
query *proto.BoundQuery
splitCount int
schemaInfo *SchemaInfo
sel *sqlparser.Select
tableName string
pkCol string
rowCount int64
query *proto.BoundQuery
splitCount int
schemaInfo *SchemaInfo
sel *sqlparser.Select
tableName string
splitColumn string
rowCount int64
}
// NewQuerySplitter creates a new QuerySplitter. query is the original query
// to split and splitCount is the desired number of splits. splitCount must
// be a positive int, if not it will be set to 1.
func NewQuerySplitter(query *proto.BoundQuery, splitCount int, schemaInfo *SchemaInfo) *QuerySplitter {
func NewQuerySplitter(
query *proto.BoundQuery,
splitColumn string,
splitCount int,
schemaInfo *SchemaInfo) *QuerySplitter {
if splitCount < 1 {
splitCount = 1
}
return &QuerySplitter{
query: query,
splitCount: splitCount,
schemaInfo: schemaInfo,
query: query,
splitCount: splitCount,
schemaInfo: schemaInfo,
splitColumn: splitColumn,
}
}
@ -74,7 +79,17 @@ func (qs *QuerySplitter) validateQuery() error {
if len(tableInfo.PKColumns) == 0 {
return fmt.Errorf("no primary keys")
}
qs.pkCol = tableInfo.GetPKColumn(0).Name
if qs.splitColumn != "" {
for _, index := range tableInfo.Indexes {
for _, column := range index.Columns {
if qs.splitColumn == column {
return nil
}
}
}
return fmt.Errorf("split column is not indexed or does not exist in table schema, SplitColumn: %s, TableInfo.Table: %v", qs.splitColumn, tableInfo.Table)
}
qs.splitColumn = tableInfo.GetPKColumn(0).Name
return nil
}
@ -130,9 +145,9 @@ func (qs *QuerySplitter) getWhereClause(start, end sqltypes.Value) *sqlparser.Wh
return qs.sel.Where
}
pk := &sqlparser.ColName{
Name: []byte(qs.pkCol),
Name: []byte(qs.splitColumn),
}
// pkCol >= start
// splitColumn >= start
if !start.IsNull() {
startClause = &sqlparser.ComparisonExpr{
Operator: sqlparser.AST_GE,
@ -140,7 +155,7 @@ func (qs *QuerySplitter) getWhereClause(start, end sqltypes.Value) *sqlparser.Wh
Right: sqlparser.NumVal((start).Raw()),
}
}
// pkCol < end
// splitColumn < end
if !end.IsNull() {
endClause = &sqlparser.ComparisonExpr{
Operator: sqlparser.AST_LT,
@ -154,7 +169,7 @@ func (qs *QuerySplitter) getWhereClause(start, end sqltypes.Value) *sqlparser.Wh
if endClause == nil {
clauses = startClause
} else {
// pkCol >= start AND pkCol < end
// splitColumn >= start AND splitColumn < end
clauses = &sqlparser.AndExpr{
Left: startClause,
Right: endClause,

Просмотреть файл

@ -3,6 +3,7 @@ package tabletserver
import (
"fmt"
"reflect"
"strings"
"testing"
mproto "github.com/youtube/vitess/go/mysql/proto"
@ -18,8 +19,14 @@ func getSchemaInfo() *SchemaInfo {
}
zero, _ := sqltypes.BuildValue(0)
table.AddColumn("id", "int", zero, "")
table.AddColumn("id2", "int", zero, "")
table.AddColumn("count", "int", zero, "")
table.PKColumns = []int{0}
primaryIndex := table.AddIndex("PRIMARY")
primaryIndex.AddColumn("id", 12345)
id2Index := table.AddIndex("idx_id2")
id2Index.AddColumn("id2", 1234)
tables := make(map[string]*TableInfo, 1)
tables["test_table"] = &TableInfo{Table: table}
@ -37,7 +44,7 @@ func getSchemaInfo() *SchemaInfo {
func TestValidateQuery(t *testing.T) {
schemaInfo := getSchemaInfo()
query := &proto.BoundQuery{}
splitter := NewQuerySplitter(query, 3, schemaInfo)
splitter := NewQuerySplitter(query, "", 3, schemaInfo)
query.Sql = "delete from test_table"
got := splitter.validateQuery()
@ -94,6 +101,31 @@ func TestValidateQuery(t *testing.T) {
if !reflect.DeepEqual(got, want) {
t.Errorf("valid query validation failed, got:%v, want:%v", got, want)
}
// column id2 is indexed
splitter = NewQuerySplitter(query, "id2", 3, schemaInfo)
query.Sql = "select * from test_table where count > :count"
got = splitter.validateQuery()
want = nil
if !reflect.DeepEqual(got, want) {
t.Errorf("valid query validation failed, got:%v, want:%v", got, want)
}
// column does not exist
splitter = NewQuerySplitter(query, "unknown_column", 3, schemaInfo)
got = splitter.validateQuery()
wantStr := "split column is not indexed or does not exist in table schema"
if !strings.Contains(got.Error(), wantStr) {
t.Errorf("unknown table validation failed, got:%v, want:%v", got, wantStr)
}
// column is not indexed
splitter = NewQuerySplitter(query, "count", 3, schemaInfo)
got = splitter.validateQuery()
wantStr = "split column is not indexed or does not exist in table schema"
if !strings.Contains(got.Error(), wantStr) {
t.Errorf("unknown table validation failed, got:%v, want:%v", got, wantStr)
}
}
func TestGetWhereClause(t *testing.T) {
@ -101,7 +133,7 @@ func TestGetWhereClause(t *testing.T) {
sql := "select * from test_table where count > :count"
statement, _ := sqlparser.Parse(sql)
splitter.sel, _ = statement.(*sqlparser.Select)
splitter.pkCol = "id"
splitter.splitColumn = "id"
// no boundary case, start = end = nil, should not change the where clause
nilValue := sqltypes.Value{}
@ -238,7 +270,7 @@ func TestSplitQuery(t *testing.T) {
query := &proto.BoundQuery{
Sql: "select * from test_table where count > :count",
}
splitter := NewQuerySplitter(query, 3, schemaInfo)
splitter := NewQuerySplitter(query, "", 3, schemaInfo)
splitter.validateQuery()
min, _ := sqltypes.BuildValue(0)
max, _ := sqltypes.BuildValue(300)

Просмотреть файл

@ -507,7 +507,7 @@ func (sq *SqlQuery) SplitQuery(ctx context.Context, req *proto.SplitQueryRequest
sq.endRequest()
}()
splitter := NewQuerySplitter(&(req.Query), req.SplitCount, sq.qe.schemaInfo)
splitter := NewQuerySplitter(&(req.Query), req.SplitColumn, req.SplitCount, sq.qe.schemaInfo)
err = splitter.validateQuery()
if err != nil {
return NewTabletError(ErrFail, "splitQuery: query validation error: %s, request: %#v", err, req)
@ -520,12 +520,12 @@ func (sq *SqlQuery) SplitQuery(ctx context.Context, req *proto.SplitQueryRequest
}
conn := qre.getConn(sq.qe.connPool)
defer conn.Recycle()
// TODO: For fetching pkMinMax, include where clauses on the
// TODO: For fetching MinMax, include where clauses on the
// primary key, if any, in the original query which might give a narrower
// range of PKs to work with.
minMaxSql := fmt.Sprintf("SELECT MIN(%v), MAX(%v) FROM %v", splitter.pkCol, splitter.pkCol, splitter.tableName)
pkMinMax := qre.execSQL(conn, minMaxSql, true)
reply.Queries, err = splitter.split(pkMinMax)
// range of split column to work with.
minMaxSql := fmt.Sprintf("SELECT MIN(%v), MAX(%v) FROM %v", splitter.splitColumn, splitter.splitColumn, splitter.tableName)
splitColumnMinMax := qre.execSQL(conn, minMaxSql, true)
reply.Queries, err = splitter.split(splitColumnMinMax)
if err != nil {
return NewTabletError(ErrFail, "splitQuery: query split error: %s, request: %#v", err, req)
}

Просмотреть файл

@ -3,23 +3,29 @@ package com.youtube.vitess.vtgate;
public class SplitQueryRequest {
private String sql;
private String keyspace;
private String splitColumn;
private int splitCount;
public SplitQueryRequest(String sql, String keyspace, int splitCount) {
public SplitQueryRequest(String sql, String keyspace, int splitCount, String splitColumn) {
this.sql = sql;
this.keyspace = keyspace;
this.splitCount = splitCount;
this.splitColumn = splitColumn;
}
public String getSql() {
return sql;
return this.sql;
}
public String getKeyspace() {
return keyspace;
return this.keyspace;
}
public int getSplitCount() {
return splitCount;
return this.splitCount;
}
public String getSplitColumn() {
return this.splitColumn;
}
}

Просмотреть файл

@ -114,9 +114,9 @@ public class VtGate {
* instances. Batch jobs or MapReduce jobs that needs to scan all rows can use these queries to
* parallelize full table scans.
*/
public Map<Query, Long> splitQuery(String keyspace, String sql, int splitCount)
public Map<Query, Long> splitQuery(String keyspace, String sql, int splitCount, String pkColumn)
throws ConnectionException, DatabaseException {
SplitQueryRequest req = new SplitQueryRequest(sql, keyspace, splitCount);
SplitQueryRequest req = new SplitQueryRequest(sql, keyspace, splitCount, pkColumn);
SplitQueryResponse response = client.splitQuery(req);
if (response.getError() != null) {
throw new DatabaseException(response.getError());

Просмотреть файл

@ -11,6 +11,7 @@ public class VitessConf {
public static final String INPUT_KEYSPACE = "vitess.vtgate.hadoop.keyspace";
public static final String INPUT_QUERY = "vitess.vtgate.hadoop.input_query";
public static final String SPLITS = "vitess.vtgate.hadoop.splits";
public static final String SPLIT_COLUMN = "vitess.vtgate.hadoop.splitcolumn";
public static final String HOSTS_DELIM = ",";
private Configuration conf;
@ -58,4 +59,12 @@ public class VitessConf {
public void setSplits(int splits) {
conf.setInt(SPLITS, splits);
}
public String getSplitColumn() {
return conf.get(SPLIT_COLUMN);
}
public void setSplitColumn(String splitColumn) {
conf.set(SPLIT_COLUMN, splitColumn);
}
}

Просмотреть файл

@ -33,7 +33,7 @@ public class VitessInputFormat extends InputFormat<NullWritable, RowWritable> {
VitessConf conf = new VitessConf(context.getConfiguration());
VtGate vtgate = VtGate.connect(conf.getHosts(), conf.getTimeoutMs());
Map<Query, Long> queries =
vtgate.splitQuery(conf.getKeyspace(), conf.getInputQuery(), conf.getSplits());
vtgate.splitQuery(conf.getKeyspace(), conf.getInputQuery(), conf.getSplits(), conf.getSplitColumn());
List<InputSplit> splits = new LinkedList<>();
for (Query query : queries.keySet()) {
Long size = queries.get(query);

Просмотреть файл

@ -185,6 +185,7 @@ public class Bsonify {
query.put("Sql", request.getSql());
BSONObject b = new BasicBSONObject();
b.put("Keyspace", request.getKeyspace());
b.put("SplitColumn", request.getSplitColumn());
b.put("Query", query);
b.put("SplitCount", request.getSplitCount());
return b;

Просмотреть файл

@ -306,7 +306,7 @@ public class VtGateIT {
Util.waitForTablet("rdonly", 40, 3, testEnv);
VtGate vtgate = VtGate.connect("localhost:" + testEnv.port, 0);
Map<Query, Long> queries =
vtgate.splitQuery("test_keyspace", "select id,keyspace_id from vtgate_test", 1);
vtgate.splitQuery("test_keyspace", "select id,keyspace_id from vtgate_test", 1, "");
vtgate.close();
// Verify 2 splits, one per shard
@ -342,7 +342,7 @@ public class VtGateIT {
VtGate vtgate = VtGate.connect("localhost:" + testEnv.port, 0);
int splitCount = 6;
Map<Query, Long> queries =
vtgate.splitQuery("test_keyspace", "select id,keyspace_id from vtgate_test", splitCount);
vtgate.splitQuery("test_keyspace", "select id,keyspace_id from vtgate_test", splitCount, "");
vtgate.close();
// Verify 6 splits, 3 per shard
@ -370,7 +370,7 @@ public class VtGateIT {
public void testSplitQueryInvalidTable() throws Exception {
VtGate vtgate = VtGate.connect("localhost:" + testEnv.port, 0);
try {
vtgate.splitQuery("test_keyspace", "select id from invalid_table", 1);
vtgate.splitQuery("test_keyspace", "select id from invalid_table", 1, "");
Assert.fail("failed to raise connection exception");
} catch (ConnectionException e) {
Assert.assertTrue(