VStream From: POC supporting 'vstream * from table' with where and limit

Signed-off-by: Rohit Nayak <rohit@planetscale.com>
This commit is contained in:
Rohit Nayak 2020-09-02 18:19:09 +02:00
Родитель c8e044f3a5
Коммит 3316399b4f
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: BA0A4E9168156524
14 изменённых файлов: 4872 добавлений и 4447 удалений

Просмотреть файл

@ -572,6 +572,9 @@ func (hc *HealthCheckImpl) GetHealthyTabletStats(target *query.Target) []*Tablet
var result []*TabletHealth
hc.mu.Lock()
defer hc.mu.Unlock()
if target.Shard == "" {
target.Shard = "0"
}
return append(result, hc.healthy[hc.keyFromTarget(target)]...)
}

Просмотреть файл

@ -56,6 +56,7 @@ const (
StmtSavepoint
StmtSRollback
StmtRelease
StmtVStream
)
//ASTToStatementType returns a StatementType from an AST stmt
@ -138,6 +139,8 @@ func Preview(sql string) StatementType {
return StmtSelect
case "stream":
return StmtStream
case "vstream":
return StmtVStream
case "insert":
return StmtInsert
case "replace":
@ -192,6 +195,8 @@ func (s StatementType) String() string {
return "SELECT"
case StmtStream:
return "STREAM"
case StmtVStream:
return "VSTREAM"
case StmtInsert:
return "INSERT"
case StmtReplace:

Просмотреть файл

@ -86,6 +86,15 @@ type (
Lock string
}
// VStream represents a VSTREAM statement.
VStream struct {
Comments Comments
SelectExpr SelectExpr
Table TableName
Where *Where
Limit *Limit
}
// Stream represents a SELECT statement.
Stream struct {
Comments Comments
@ -270,6 +279,7 @@ type (
func (*Union) iStatement() {}
func (*Select) iStatement() {}
func (*Stream) iStatement() {}
func (*VStream) iStatement() {}
func (*Insert) iStatement() {}
func (*Update) iStatement() {}
func (*Delete) iStatement() {}
@ -921,6 +931,12 @@ func (node *UnionSelect) Format(buf *TrackedBuffer) {
buf.astPrintf(node, " %s %v", node.Type, node.Statement)
}
// Format formats the node.
func (node *VStream) Format(buf *TrackedBuffer) {
buf.astPrintf(node, "vstream %v%v from %v",
node.Comments, node.SelectExpr, node.Table)
}
// Format formats the node.
func (node *Stream) Format(buf *TrackedBuffer) {
buf.astPrintf(node, "stream %v%v from %v",

Просмотреть файл

@ -1588,6 +1588,8 @@ var (
input: "delete from t partition (p0) where a = 1",
}, {
input: "stream * from t",
}, {
input: "vstream * from t",
}, {
input: "stream /* comment */ * from t",
}, {

Просмотреть файл

@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"io"
"runtime/debug"
"sync"
"vitess.io/vitess/go/vt/log"
@ -93,6 +94,7 @@ func Parse(sql string) (Statement, error) {
return nil, vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT, tokenizer.LastError.Error())
}
if tokenizer.ParseTree == nil {
log.Infof("Empty Statement: %s", debug.Stack())
return nil, ErrEmpty
}
return tokenizer.ParseTree, nil
@ -106,6 +108,8 @@ func ParseStrictDDL(sql string) (Statement, error) {
return nil, tokenizer.LastError
}
if tokenizer.ParseTree == nil {
log.Infof("Empty Statement DDL: %s", debug.Stack())
return nil, ErrEmpty
}
return tokenizer.ParseTree, nil

Просмотреть файл

@ -805,6 +805,26 @@ func replaceUseDBName(newNode, parent SQLNode) {
parent.(*Use).DBName = newNode.(TableIdent)
}
func replaceVStreamComments(newNode, parent SQLNode) {
parent.(*VStream).Comments = newNode.(Comments)
}
func replaceVStreamLimit(newNode, parent SQLNode) {
parent.(*VStream).Limit = newNode.(*Limit)
}
func replaceVStreamSelectExpr(newNode, parent SQLNode) {
parent.(*VStream).SelectExpr = newNode.(SelectExpr)
}
func replaceVStreamTable(newNode, parent SQLNode) {
parent.(*VStream).Table = newNode.(TableName)
}
func replaceVStreamWhere(newNode, parent SQLNode) {
parent.(*VStream).Where = newNode.(*Where)
}
type replaceValTupleItems int
func (r *replaceValTupleItems) replace(newNode, container SQLNode) {
@ -1348,6 +1368,13 @@ func (a *application) apply(parent, node SQLNode, replacer replacerFunc) {
case *Use:
a.apply(node, n.DBName, replaceUseDBName)
case *VStream:
a.apply(node, n.Comments, replaceVStreamComments)
a.apply(node, n.Limit, replaceVStreamLimit)
a.apply(node, n.SelectExpr, replaceVStreamSelectExpr)
a.apply(node, n.Table, replaceVStreamTable)
a.apply(node, n.Where, replaceVStreamWhere)
case ValTuple:
replacer := replaceValTupleItems(0)
replacerRef := &replacer

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -121,7 +121,7 @@ func skipToEnd(yylex interface{}) {
%token LEX_ERROR
%left <bytes> UNION
%token <bytes> SELECT STREAM INSERT UPDATE DELETE FROM WHERE GROUP HAVING ORDER BY LIMIT OFFSET FOR
%token <bytes> SELECT STREAM VSTREAM INSERT UPDATE DELETE FROM WHERE GROUP HAVING ORDER BY LIMIT OFFSET FOR
%token <bytes> ALL DISTINCT AS EXISTS ASC DESC INTO DUPLICATE KEY DEFAULT SET LOCK UNLOCK KEYS DO
%token <bytes> DISTINCTROW
%token <bytes> VALUES LAST_INSERT_ID
@ -220,7 +220,7 @@ func skipToEnd(yylex interface{}) {
%type <statement> command
%type <selStmt> simple_select select_statement base_select union_rhs
%type <statement> explain_statement explainable_statement
%type <statement> stream_statement insert_statement update_statement delete_statement set_statement set_transaction_statement
%type <statement> stream_statement vstream_statement insert_statement update_statement delete_statement set_statement set_transaction_statement
%type <statement> create_statement alter_statement rename_statement drop_statement truncate_statement flush_statement do_statement
%type <ddl> create_table_prefix rename_list
%type <statement> analyze_statement show_statement use_statement other_statement
@ -348,6 +348,7 @@ command:
$$ = $1
}
| stream_statement
| vstream_statement
| insert_statement
| update_statement
| delete_statement
@ -454,6 +455,12 @@ stream_statement:
$$ = &Stream{Comments: Comments($2), SelectExpr: $3, Table: $5}
}
vstream_statement:
VSTREAM comment_opt select_expression FROM table_name where_expression_opt limit_opt
{
$$ = &VStream{Comments: Comments($2), SelectExpr: $3, Table: $5, Where: NewWhere(WhereStr, $6), Limit: $7}
}
// base_select is an unparenthesized SELECT with no order by clause or beyond.
base_select:
// 1 2 3 4 5 6 7 8

Просмотреть файл

@ -356,6 +356,7 @@ var keywords = map[string]int{
"stored": UNUSED,
"straight_join": STRAIGHT_JOIN,
"stream": STREAM,
"vstream": VSTREAM,
"table": TABLE,
"tables": TABLES,
"terminated": UNUSED,

Просмотреть файл

@ -31,6 +31,8 @@ import (
"golang.org/x/net/context"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/acl"
"vitess.io/vitess/go/cache"
@ -53,6 +55,9 @@ import (
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)
// this is the healthcheck used by vtgate, used by the "vstream * from" functionality
var vtgateHealthCheck discovery.HealthCheck
var (
errNoKeyspace = vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "no keyspace in database name specified. Supported database name format (items in <> are optional): keyspace<:shard><@type> or keyspace<[range]><@type>")
defaultTabletType topodatapb.TabletType
@ -943,7 +948,6 @@ func (e *Executor) StreamExecute(ctx context.Context, method string, safeSession
query, comments := sqlparser.SplitMarginComments(sql)
vcursor, _ := newVCursorImpl(ctx, safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver)
vcursor.SetIgnoreMaxMemoryRows(true)
switch stmtType {
case sqlparser.StmtStream:
// this is a stream statement for messaging
@ -958,6 +962,9 @@ func (e *Executor) StreamExecute(ctx context.Context, method string, safeSession
// These statements don't populate plan.Instructions. We want to make sure we don't try to
// dereference nil Instructions which would panic.
fallthrough
case sqlparser.StmtVStream:
log.Infof("handleVStream called with target %v", target)
return e.handleVStream(ctx, sql, target, callback, vcursor, logStats)
default:
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unsupported statement type for OLAP: %s", stmtType)
}

Просмотреть файл

@ -263,7 +263,8 @@ var unshardedVSchema = `
"sequence": "user_seq"
}
},
"simple": {}
"simple": {},
"t1": {}
}
}
`
@ -375,6 +376,7 @@ func createExecutorEnv() (executor *Executor, sbc1, sbc2, sbclookup *sandboxconn
*GatewayImplementation = GatewayImplementationDiscovery
cell := "aa"
hc := discovery.NewFakeHealthCheck()
vtgateHealthCheck = hc
s := createSandbox("TestExecutor")
s.VSchema = executorVSchema
serv := newSandboxForCells([]string{cell})

Просмотреть файл

@ -0,0 +1,189 @@
/*
Copyright 2020 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package vtgate
import (
"context"
"fmt"
"io"
"reflect"
"strconv"
"strings"
"time"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/vterrors"
)
func (e *Executor) handleVStream(ctx context.Context, sql string, target querypb.Target, callback func(*sqltypes.Result) error, vcursor *vcursorImpl, logStats *LogStats) error {
stmt, err := sqlparser.Parse(sql)
if err != nil {
logStats.Error = err
return err
}
vstreamStmt, ok := stmt.(*sqlparser.VStream)
if !ok {
logStats.Error = err
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unrecognized VSTREAM statement: %v", sql)
}
table, _, _, _, err := vcursor.FindTable(vstreamStmt.Table)
if err != nil {
logStats.Error = err
return err
}
execStart := time.Now()
logStats.PlanTime = execStart.Sub(logStats.StartTime)
err = e.startVStream(ctx, table.Keyspace.Name, target.Shard, nil, vstreamStmt, callback)
logStats.Error = err
logStats.ExecuteTime = time.Since(execStart)
return err
}
func getVStreamStartPos(stmt *sqlparser.VStream) (string, error) {
var colName, pos string
log.Infof("in getVStreamStartPos with where %v", stmt.Where)
if stmt.Where != nil {
switch v := stmt.Where.Expr.(type) {
case *sqlparser.ComparisonExpr:
if v.Operator == sqlparser.GreaterThanStr {
log.Infof("Found EqualStr")
switch c := v.Left.(type) {
case *sqlparser.ColName:
log.Infof("Found ColName %v: %v", c, reflect.TypeOf(v.Right))
switch val := v.Right.(type) {
case *sqlparser.SQLVal:
pos = string(val.Val)
}
colName = strings.ToLower(c.Name.String())
if colName != "pos" {
return "", fmt.Errorf("can only use pos in vstream where clause ")
}
}
} else {
return "", fmt.Errorf("where can only be of type 'pos > <value>'")
}
default:
return "", fmt.Errorf("where can only be of type 'pos > <value>'")
}
}
return pos, nil
}
func (e *Executor) startVStream(ctx context.Context, keyspace string, shard string, keyRange *topodatapb.KeyRange, stmt *sqlparser.VStream, callback func(*sqltypes.Result) error) error {
tableName := stmt.Table.Name.CompliantName()
var pos string
var err error
limit := 10000
if stmt.Where != nil {
pos, err = getVStreamStartPos(stmt)
if err != nil {
return err
}
}
if stmt.Limit != nil {
count, ok := stmt.Limit.Rowcount.(*sqlparser.SQLVal)
if ok {
limit, _ = strconv.Atoi(string(count.Val))
}
}
log.Infof("startVStream for %s.%s.%s, position %s, limit %d", keyspace, shard, tableName, pos, limit)
vgtid := &binlogdata.VGtid{
ShardGtids: []*binlogdata.ShardGtid{{
Keyspace: keyspace,
Shard: shard,
Gtid: pos,
}},
}
log.Infof("VGTID is %v", vgtid)
filter := &binlogdata.Filter{
Rules: []*binlogdata.Rule{{
Match: tableName,
Filter: fmt.Sprintf("select * from %s", tableName),
}},
}
var lastFields []*querypb.Field
numRows := 0
if limit == 0 {
return io.EOF
}
send := func(evs []*binlogdata.VEvent) error {
result := &sqltypes.Result{
Fields: nil,
RowsAffected: 0,
Rows: [][]sqltypes.Value{},
}
log.Infof("Events got: %v", evs)
for _, ev := range evs {
if numRows >= limit {
break
}
switch ev.Type {
case binlogdata.VEventType_FIELD:
result.Fields = ev.FieldEvent.Fields
lastFields = result.Fields
case binlogdata.VEventType_ROW:
if result.Fields == nil {
result.Fields = lastFields
}
for _, change := range ev.RowEvent.RowChanges {
vals := sqltypes.MakeRowTrusted(result.Fields, change.After)
result.Rows = append(result.Rows, vals)
numRows++
if numRows >= limit {
break
}
}
result.RowsAffected = uint64(numRows)
default:
}
}
if numRows > 0 {
log.Infof("VStream Sending result to callback:%v", result)
err := callback(result)
numRows = 0
if err != nil {
return err
}
return io.EOF
}
return nil
}
gw := NewTabletGateway(ctx, vtgateHealthCheck /*discovery.Healthcheck*/, e.serv, e.cell)
srvResolver := srvtopo.NewResolver(e.serv, gw, e.cell)
vs := &vstream{
vgtid: vgtid,
tabletType: topodatapb.TabletType_MASTER,
filter: filter,
send: send,
resolver: srvResolver,
}
vs.stream(ctx)
return nil
}

Просмотреть файл

@ -0,0 +1,132 @@
/*
Copyright 2019 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package vtgate
import (
"testing"
"time"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
"vitess.io/vitess/go/sqltypes"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
_ "vitess.io/vitess/go/vt/vtgate/vindexes"
)
func TestVStreamZZZ(t *testing.T) {
send1 := []*binlogdatapb.VEvent{
{Type: binlogdatapb.VEventType_GTID, Gtid: "gtid01"},
{Type: binlogdatapb.VEventType_FIELD, FieldEvent: &binlogdatapb.FieldEvent{
TableName: "t1",
Fields: []*querypb.Field{
{
Name: "id",
Type: querypb.Type_INT64,
},
{
Name: "val",
Type: querypb.Type_VARCHAR,
},
},
},
},
{Type: binlogdatapb.VEventType_ROW, RowEvent: &binlogdatapb.RowEvent{
TableName: "t1",
RowChanges: []*binlogdatapb.RowChange{
{
After: &querypb.Row{
Lengths: []int64{1, 3},
Values: []byte("1abc"),
},
},
{
After: &querypb.Row{
Lengths: []int64{1, 5},
Values: []byte("2defgh"),
},
},
},
},
},
{Type: binlogdatapb.VEventType_COMMIT},
}
executor, _, _, sbclookup := createExecutorEnv()
sbclookup.AddVStreamEvents(send1, nil)
logChan := QueryLogger.Subscribe("Test")
defer QueryLogger.Unsubscribe(logChan)
sql := "vstream * from t1 where pos > 'current' limit 2" //TODO pos > ""/"c, LIMIT
result, err := vstreamEvents(executor, sql)
require.NoError(t, err)
want := &sqltypes.Result{
Fields: []*querypb.Field{
{Name: "id", Type: sqltypes.Int64},
{Name: "val", Type: sqltypes.VarChar},
},
RowsAffected: 2,
InsertID: 0,
Rows: [][]sqltypes.Value{{
sqltypes.NewInt64(1),
sqltypes.NewVarChar("abc"),
}, {
sqltypes.NewInt64(2),
sqltypes.NewVarChar("defgh"),
}},
}
if !result.Equal(want) {
t.Errorf("result: %+v, want %+v", result, want)
}
}
func vstreamEvents(executor *Executor, sql string) (qr *sqltypes.Result, err error) {
results := make(chan *sqltypes.Result, 100)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
err = executor.StreamExecute(
ctx,
"TestVStream",
NewSafeSession(masterSession),
sql,
nil,
querypb.Target{
Keyspace: "TestUnsharded",
Shard: "0",
TabletType: topodatapb.TabletType_MASTER,
Cell: "aa",
},
func(qr *sqltypes.Result) error {
results <- qr
return nil
},
)
close(results)
if err != nil {
return nil, err
}
first := true
for r := range results {
if first {
qr = &sqltypes.Result{Fields: r.Fields, RowsAffected: r.RowsAffected}
first = false
}
qr.Rows = append(qr.Rows, r.Rows...)
}
return qr, nil
}

Просмотреть файл

@ -97,7 +97,9 @@ func NewTabletGateway(ctx context.Context, hc discovery.HealthCheck, serv srvtop
}
}
hc = createHealthCheck(ctx, *HealthCheckRetryDelay, *HealthCheckTimeout, topoServer, localCell, *CellsToWatch)
}
vtgateHealthCheck = hc
gw := &TabletGateway{
hc: hc,
srvTopoServer: serv,