* feat: added query logging to safe session

Signed-off-by: Andres Taylor <andres@planetscale.com>

* feat: added [explain format=vtexplain] parsing and AST

Signed-off-by: Andres Taylor <andres@planetscale.com>

* feat: add planning of vtexplain queries

Signed-off-by: Andres Taylor <andres@planetscale.com>

* feat: keep the logging even in the precense of autocommits

Signed-off-by: Andres Taylor <andres@planetscale.com>

* feat: use table result instead of warnings, and move the logging to scattercon

Signed-off-by: Andres Taylor <andres@planetscale.com>

* feat: log commit to vtexplain logger and added e2e test

Signed-off-by: Harshit Gangal <harshit@planetscale.com>

* feat: vindex query to mark vindexExec in session

Signed-off-by: Harshit Gangal <harshit@planetscale.com>

* test: run only for gen4 planner

Signed-off-by: Harshit Gangal <harshit@planetscale.com>

* feat: vtexplain logs with bindvars replaces, and begin being counted with the real query

Signed-off-by: Andres Taylor <andres@planetscale.com>

* feat: add ACTUALLY_RUN_QUERIES as a comment directive to vtexplain and block dml queries if not specified

Signed-off-by: Manan Gupta <manan@planetscale.com>

* test: make sure we add the actually_runs_queries directive to inserts we want to inspect

Signed-off-by: Andres Taylor <andres@planetscale.com>

* docs: added info to release notes

Signed-off-by: Andres Taylor <andres@planetscale.com>

* typo: fix error message

Signed-off-by: Andres Taylor <andres@planetscale.com>

* feat: ignore cases on comment directives

Signed-off-by: Andres Taylor <andres@planetscale.com>

* fix: CommentDirective now hides internals so that the lower casing of keys can be preserved everywhere

Signed-off-by: Andres Taylor <andres@planetscale.com>

* test: improve assertion

Signed-off-by: Andres Taylor <andres@planetscale.com>

* chore: clean up accidental change

Signed-off-by: Andres Taylor <andres@planetscale.com>

* refactor: clean up so more responsibility for vtexplain ends up in the engine primitive

Signed-off-by: Andres Taylor <andres@planetscale.com>

* fix: no need to Unquote multiple times

Signed-off-by: Andres Taylor <andres@planetscale.com>

* fix: rename comment directive

Signed-off-by: Andres Taylor <andres@planetscale.com>

Co-authored-by: Harshit Gangal <harshit@planetscale.com>
Co-authored-by: Manan Gupta <manan@planetscale.com>
This commit is contained in:
Andres Taylor 2022-08-02 11:41:06 +02:00 коммит произвёл GitHub
Родитель bb334264ce
Коммит 1fa1245b91
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
41 изменённых файлов: 7729 добавлений и 6989 удалений

Просмотреть файл

@ -9,6 +9,7 @@
- [VDiff2](#vdiff2)
- [Mysql Compatibility](#mysql-compatibility)
- [Durability Policy](#durability-policy)
- [New EXPLAIN format](#new-explain-format)
## Known Issues
@ -185,3 +186,9 @@ This is different from the existing `autocommit` parameter where the query is se
A new durability policy `cross_cell` is now supported. `cross_cell` durability policy only allows replica tablets from a different cell than the current primary to
send semi-sync ACKs. This ensures that any committed write exists in at least 2 tablets belonging to different cells.
### New EXPLAIN format
#### FORMAT=vtexplain
With this new `explain` format, you can get an output that is very similar to the command line `vtexplain` app, but from a running `vtgate`, through a MySQL query.

Просмотреть файл

@ -0,0 +1,155 @@
/*
Copyright 2022 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package vtexplain
import (
"context"
_ "embed"
"flag"
"os"
"testing"
"vitess.io/vitess/go/vt/vtgate/planbuilder"
"github.com/stretchr/testify/require"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/utils"
)
var (
clusterInstance *cluster.LocalProcessCluster
vtParams mysql.ConnParams
shardedKs = "ks"
shardedKsShards = []string{"-40", "40-80", "80-c0", "c0-"}
Cell = "test"
//go:embed schema.sql
shardedSchemaSQL string
//go:embed vschema.json
shardedVSchema string
)
func TestMain(m *testing.M) {
defer cluster.PanicHandler(nil)
flag.Parse()
exitCode := func() int {
clusterInstance = cluster.NewCluster(Cell, "localhost")
defer clusterInstance.Teardown()
// Start topo server
err := clusterInstance.StartTopo()
if err != nil {
return 1
}
// Start keyspace
sKs := &cluster.Keyspace{
Name: shardedKs,
SchemaSQL: shardedSchemaSQL,
VSchema: shardedVSchema,
}
err = clusterInstance.StartKeyspace(*sKs, shardedKsShards, 0, false)
if err != nil {
return 1
}
// Start vtgate
clusterInstance.VtGatePlannerVersion = planbuilder.Gen4
err = clusterInstance.StartVtgate()
if err != nil {
return 1
}
vtParams = mysql.ConnParams{
Host: clusterInstance.Hostname,
Port: clusterInstance.VtgateMySQLPort,
}
return m.Run()
}()
os.Exit(exitCode)
}
func TestVtGateVtExplain(t *testing.T) {
conn, err := mysql.Connect(context.Background(), &vtParams)
require.NoError(t, err)
defer conn.Close()
utils.AssertContainsError(t, conn,
`explain format=vtexplain insert into user (id,lookup,lookup_unique) values (4,'apa','foo'),(5,'apa','bar'),(6,'monkey','nobar')`,
"vtexplain will actually run queries")
expected := `[[INT32(0) VARCHAR("ks") VARCHAR("-40") VARCHAR("begin")]` +
` [INT32(0) VARCHAR("ks") VARCHAR("-40") VARCHAR("insert into lookup(lookup, id, keyspace_id) values ('apa', 1, '\x16k@\xb4J\xbaK\xd6'), ('apa', 2, '\x06\xe7\xea\\\"Βp\x8f') on duplicate key update lookup = values(lookup), id = values(id), keyspace_id = values(keyspace_id)")]` +
` [INT32(1) VARCHAR("ks") VARCHAR("40-80") VARCHAR("begin")]` +
` [INT32(1) VARCHAR("ks") VARCHAR("40-80") VARCHAR("insert into lookup(lookup, id, keyspace_id) values ('monkey', 3, 'N\xb1\x90ɢ\xfa\x16\x9c') on duplicate key update lookup = values(lookup), id = values(id), keyspace_id = values(keyspace_id)")]` +
` [INT32(2) VARCHAR("ks") VARCHAR("-40") VARCHAR("commit")]` +
` [INT32(3) VARCHAR("ks") VARCHAR("40-80") VARCHAR("commit")]` +
` [INT32(4) VARCHAR("ks") VARCHAR("40-80") VARCHAR("begin")]` +
` [INT32(4) VARCHAR("ks") VARCHAR("40-80") VARCHAR("insert into lookup_unique(lookup_unique, keyspace_id) values ('monkey', 'N\xb1\x90ɢ\xfa\x16\x9c')")]` +
` [INT32(5) VARCHAR("ks") VARCHAR("-40") VARCHAR("begin")]` +
` [INT32(5) VARCHAR("ks") VARCHAR("-40") VARCHAR("insert into lookup_unique(lookup_unique, keyspace_id) values ('apa', '\x16k@\xb4J\xbaK\xd6'), ('bandar', '\x06\xe7\xea\\\"Βp\x8f')")]` +
` [INT32(6) VARCHAR("ks") VARCHAR("40-80") VARCHAR("commit")]` +
` [INT32(7) VARCHAR("ks") VARCHAR("-40") VARCHAR("commit")]` +
` [INT32(8) VARCHAR("ks") VARCHAR("40-80") VARCHAR("begin")]` +
` [INT32(8) VARCHAR("ks") VARCHAR("40-80") VARCHAR("insert into ` + "`user`" + `(id, lookup, lookup_unique) values (3, 'monkey', 'monkey')")]` +
` [INT32(9) VARCHAR("ks") VARCHAR("-40") VARCHAR("begin")]` +
` [INT32(9) VARCHAR("ks") VARCHAR("-40") VARCHAR("insert into ` + "`user`" + `(id, lookup, lookup_unique) values (1, 'apa', 'apa'), (2, 'apa', 'bandar')")]]`
utils.AssertMatchesNoOrder(t, conn, `explain /*vt+ EXECUTE_DML_QUERIES */ format=vtexplain insert into user (id,lookup,lookup_unique) values (1,'apa','apa'),(2,'apa','bandar'),(3,'monkey','monkey')`, expected)
expected = `[[INT32(0) VARCHAR("ks") VARCHAR("-40") VARCHAR("select lookup, keyspace_id from lookup where lookup in ('apa')")]` +
` [INT32(1) VARCHAR("ks") VARCHAR("-40") VARCHAR("select id from ` + "`user`" + ` where lookup = 'apa'")]]`
for _, mode := range []string{"oltp", "olap"} {
t.Run(mode, func(t *testing.T) {
utils.Exec(t, conn, "set workload = "+mode)
utils.AssertMatches(t, conn, `explain format=vtexplain select id from user where lookup = "apa"`, expected)
})
}
// transaction explicitly started to no commit in the end.
utils.Exec(t, conn, "begin")
expected = `[[INT32(0) VARCHAR("ks") VARCHAR("-40") VARCHAR("begin")]` +
` [INT32(0) VARCHAR("ks") VARCHAR("-40") VARCHAR("insert into lookup(lookup, id, keyspace_id) values ('apa', 4, '\xd2\xfd\x88g\xd5\\r-\xfe'), ('apa', 5, 'p\xbb\x02<\x81\f\xa8z') on duplicate key update lookup = values(lookup), id = values(id), keyspace_id = values(keyspace_id)")]` +
` [INT32(1) VARCHAR("ks") VARCHAR("40-80") VARCHAR("begin")]` +
` [INT32(1) VARCHAR("ks") VARCHAR("40-80") VARCHAR("insert into lookup(lookup, id, keyspace_id) values ('monkey', 6, '\xf0\x98H\\n\xc4ľq') on duplicate key update lookup = values(lookup), id = values(id), keyspace_id = values(keyspace_id)")]` +
` [INT32(2) VARCHAR("ks") VARCHAR("-40") VARCHAR("commit")]` +
` [INT32(3) VARCHAR("ks") VARCHAR("40-80") VARCHAR("commit")]` +
` [INT32(4) VARCHAR("ks") VARCHAR("-40") VARCHAR("begin")]` +
` [INT32(4) VARCHAR("ks") VARCHAR("-40") VARCHAR("insert into lookup_unique(lookup_unique, keyspace_id) values ('foo', '\xd2\xfd\x88g\xd5\\r-\xfe')")]` +
` [INT32(5) VARCHAR("ks") VARCHAR("80-c0") VARCHAR("begin")]` +
` [INT32(5) VARCHAR("ks") VARCHAR("80-c0") VARCHAR("insert into lookup_unique(lookup_unique, keyspace_id) values ('bar', 'p\xbb\x02<\x81\f\xa8z')")]` +
` [INT32(6) VARCHAR("ks") VARCHAR("c0-") VARCHAR("begin")]` +
` [INT32(6) VARCHAR("ks") VARCHAR("c0-") VARCHAR("insert into lookup_unique(lookup_unique, keyspace_id) values ('nobar', '\xf0\x98H\\n\xc4ľq')")]` +
` [INT32(7) VARCHAR("ks") VARCHAR("-40") VARCHAR("commit")]` +
` [INT32(8) VARCHAR("ks") VARCHAR("80-c0") VARCHAR("commit")]` +
` [INT32(9) VARCHAR("ks") VARCHAR("c0-") VARCHAR("commit")]` +
` [INT32(10) VARCHAR("ks") VARCHAR("40-80") VARCHAR("begin")]` +
` [INT32(10) VARCHAR("ks") VARCHAR("40-80") VARCHAR("insert into ` + "`user`" + `(id, lookup, lookup_unique) values (5, 'apa', 'bar')")]` +
` [INT32(11) VARCHAR("ks") VARCHAR("c0-") VARCHAR("begin")]` +
` [INT32(11) VARCHAR("ks") VARCHAR("c0-") VARCHAR("insert into ` + "`user`" + `(id, lookup, lookup_unique) values (4, 'apa', 'foo'), (6, 'monkey', 'nobar')")]]`
utils.AssertMatchesNoOrder(t, conn,
`explain /*vt+ EXECUTE_DML_QUERIES */ format=vtexplain insert into user (id,lookup,lookup_unique) values (4,'apa','foo'),(5,'apa','bar'),(6,'monkey','nobar')`,
expected)
utils.Exec(t, conn, "rollback")
}

Просмотреть файл

@ -0,0 +1,22 @@
create table user
(
id bigint,
lookup varchar(128),
lookup_unique varchar(128),
primary key (id)
) Engine = InnoDB;
create table lookup
(
lookup varchar(128),
id bigint,
keyspace_id varbinary(100),
primary key (id)
) Engine = InnoDB;
create table lookup_unique
(
lookup_unique varchar(128),
keyspace_id varbinary(100),
primary key (lookup_unique)
) Engine = InnoDB;

Просмотреть файл

@ -0,0 +1,68 @@
{
"sharded": true,
"vindexes": {
"hash_index": {
"type": "hash"
},
"md5_index": {
"type": "unicode_loose_md5"
},
"lookup_vdx": {
"type": "lookup",
"params": {
"table": "lookup",
"from": "lookup,id",
"to": "keyspace_id",
"autocommit": "true"
},
"owner": "user"
},
"lookup_unique_vdx": {
"type": "lookup_unique",
"params": {
"table": "lookup_unique",
"from": "lookup_unique",
"to": "keyspace_id",
"autocommit": "true"
},
"owner": "user"
}
},
"tables": {
"user": {
"column_vindexes": [
{
"column": "id",
"name": "hash_index"
},
{
"columns": [
"lookup",
"id"
],
"name": "lookup_vdx"
},
{
"column": "lookup_unique",
"name": "lookup_unique_vdx"
}
]
},
"lookup": {
"column_vindexes": [
{
"column": "lookup",
"name": "md5_index"
}
]
},
"lookup_unique": {
"column_vindexes": [
{
"column": "lookup_unique",
"name": "md5_index"
}
]
}
}
}

Просмотреть файл

@ -276,15 +276,11 @@ func OnlineDDLFromCommentedStatement(stmt sqlparser.Statement) (onlineDDL *Onlin
directives := comments.Directives()
decodeDirective := func(name string) (string, error) {
value, ok := directives[name]
value, ok := directives.GetString(name, "")
if !ok {
return "", vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "no value found for comment directive %s", name)
}
unquoted, err := strconv.Unquote(value)
if err != nil {
return "", err
}
b, err := hex.DecodeString(unquoted)
b, err := hex.DecodeString(value)
if err != nil {
return "", err
}

Просмотреть файл

@ -633,6 +633,7 @@ type (
ExplainStmt struct {
Type ExplainType
Statement Statement
Comments *ParsedComments
}
// ExplainTab represents the Explain table
@ -1272,6 +1273,11 @@ func (node *AlterTable) SetComments(comments Comments) {
node.Comments = comments.Parsed()
}
// SetComments implements DDLStatement.
func (node *ExplainStmt) SetComments(comments Comments) {
node.Comments = comments.Parsed()
}
// SetComments implements DDLStatement.
func (node *CreateTable) SetComments(comments Comments) {
node.Comments = comments.Parsed()
@ -1344,6 +1350,11 @@ func (node *AlterTable) GetParsedComments() *ParsedComments {
return node.Comments
}
// GetParsedComments implements DDLStatement.
func (node *ExplainStmt) GetParsedComments() *ParsedComments {
return node.Comments
}
// GetParsedComments implements DDLStatement.
func (node *CreateTable) GetParsedComments() *ParsedComments {
return node.Comments
@ -1924,7 +1935,7 @@ func (c Comments) Parsed() *ParsedComments {
type ParsedComments struct {
comments Comments
_directives CommentDirectives
_directives *CommentDirectives
}
// SelectExprs represents SELECT expressions.

Просмотреть файл

@ -1194,6 +1194,7 @@ func CloneRefOfExplainStmt(n *ExplainStmt) *ExplainStmt {
}
out := *n
out.Statement = CloneStatement(n.Statement)
out.Comments = CloneRefOfParsedComments(n.Comments)
return &out
}

Просмотреть файл

@ -2301,7 +2301,8 @@ func EqualsRefOfExplainStmt(a, b *ExplainStmt) bool {
return false
}
return a.Type == b.Type &&
EqualsStatement(a.Statement, b.Statement)
EqualsStatement(a.Statement, b.Statement) &&
EqualsRefOfParsedComments(a.Comments, b.Comments)
}
// EqualsRefOfExplainTab does deep equals between the two objects.

Просмотреть файл

@ -990,7 +990,7 @@ func (node *ExplainStmt) Format(buf *TrackedBuffer) {
default:
format = "format = " + node.Type.ToString() + " "
}
buf.astPrintf(node, "explain %s%v", format, node.Statement)
buf.astPrintf(node, "explain %v%s%v", node.Comments, format, node.Statement)
}
// Format formats the node.

Просмотреть файл

@ -1314,6 +1314,7 @@ func (node *ExplainStmt) formatFast(buf *TrackedBuffer) {
format = "format = " + node.Type.ToString() + " "
}
buf.WriteString("explain ")
node.Comments.formatFast(buf)
buf.WriteString(format)
node.Statement.formatFast(buf)
}

Просмотреть файл

@ -1611,6 +1611,8 @@ func (ty ExplainType) ToString() string {
return JSONStr
case VitessType:
return VitessStr
case VTExplainType:
return VTExplainStr
case TraditionalType:
return TraditionalStr
case AnalyzeType:

Просмотреть файл

@ -2490,6 +2490,11 @@ func (a *application) rewriteRefOfExplainStmt(parent SQLNode, node *ExplainStmt,
}) {
return false
}
if !a.rewriteRefOfParsedComments(node, node.Comments, func(newNode, parent SQLNode) {
parent.(*ExplainStmt).Comments = newNode.(*ParsedComments)
}) {
return false
}
if a.post != nil {
a.cur.replacer = replacer
a.cur.parent = parent

Просмотреть файл

@ -1430,6 +1430,9 @@ func VisitRefOfExplainStmt(in *ExplainStmt, f Visit) error {
if err := VisitStatement(in.Statement, f); err != nil {
return err
}
if err := VisitRefOfParsedComments(in.Comments, f); err != nil {
return err
}
return nil
}
func VisitRefOfExplainTab(in *ExplainTab, f Visit) error {

Просмотреть файл

@ -704,6 +704,33 @@ func (cached *ColumnTypeOptions) CachedSize(alloc bool) int64 {
size += cached.SRID.CachedSize(true)
return size
}
//go:nocheckptr
func (cached *CommentDirectives) CachedSize(alloc bool) int64 {
if cached == nil {
return int64(0)
}
size := int64(0)
if alloc {
size += int64(8)
}
// field m map[string]string
if cached.m != nil {
size += int64(48)
hmap := reflect.ValueOf(cached.m)
numBuckets := int(math.Pow(2, float64((*(*uint8)(unsafe.Pointer(hmap.Pointer() + uintptr(9)))))))
numOldBuckets := (*(*uint16)(unsafe.Pointer(hmap.Pointer() + uintptr(10))))
size += hack.RuntimeAllocSize(int64(numOldBuckets * 272))
if len(cached.m) > 0 || numBuckets > 1 {
size += hack.RuntimeAllocSize(int64(numBuckets * 272))
}
for k, v := range cached.m {
size += hack.RuntimeAllocSize(int64(len(k)))
size += hack.RuntimeAllocSize(int64(len(v)))
}
}
return size
}
func (cached *CommentOnly) CachedSize(alloc bool) int64 {
if cached == nil {
return int64(0)
@ -1166,12 +1193,14 @@ func (cached *ExplainStmt) CachedSize(alloc bool) int64 {
}
size := int64(0)
if alloc {
size += int64(24)
size += int64(32)
}
// field Statement vitess.io/vitess/go/vt/sqlparser.Statement
if cc, ok := cached.Statement.(cachedObject); ok {
size += cc.CachedSize(true)
}
// field Comments *vitess.io/vitess/go/vt/sqlparser.ParsedComments
size += cached.Comments.CachedSize(true)
return size
}
func (cached *ExplainTab) CachedSize(alloc bool) int64 {
@ -2654,8 +2683,6 @@ func (cached *ParenTableExpr) CachedSize(alloc bool) int64 {
}
return size
}
//go:nocheckptr
func (cached *ParsedComments) CachedSize(alloc bool) int64 {
if cached == nil {
return int64(0)
@ -2671,21 +2698,8 @@ func (cached *ParsedComments) CachedSize(alloc bool) int64 {
size += hack.RuntimeAllocSize(int64(len(elem)))
}
}
// field _directives vitess.io/vitess/go/vt/sqlparser.CommentDirectives
if cached._directives != nil {
size += int64(48)
hmap := reflect.ValueOf(cached._directives)
numBuckets := int(math.Pow(2, float64((*(*uint8)(unsafe.Pointer(hmap.Pointer() + uintptr(9)))))))
numOldBuckets := (*(*uint16)(unsafe.Pointer(hmap.Pointer() + uintptr(10))))
size += hack.RuntimeAllocSize(int64(numOldBuckets * 272))
if len(cached._directives) > 0 || numBuckets > 1 {
size += hack.RuntimeAllocSize(int64(numBuckets * 272))
}
for k, v := range cached._directives {
size += hack.RuntimeAllocSize(int64(len(k)))
size += hack.RuntimeAllocSize(int64(len(v)))
}
}
// field _directives *vitess.io/vitess/go/vt/sqlparser.CommentDirectives
size += cached._directives.CachedSize(true)
return size
}
func (cached *ParsedQuery) CachedSize(alloc bool) int64 {

Просмотреть файл

@ -42,6 +42,8 @@ const (
DirectiveAllowHashJoin = "ALLOW_HASH_JOIN"
// DirectiveQueryPlanner lets the user specify per query which planner should be used
DirectiveQueryPlanner = "PLANNER"
// DirectiveVtexplainRunDMLQueries tells explain format = vtexplain that it is okay to also run the query.
DirectiveVtexplainRunDMLQueries = "EXECUTE_DML_QUERIES"
)
func isNonSpace(r rune) bool {
@ -200,7 +202,9 @@ const commentDirectivePreamble = "/*vt+"
// CommentDirectives is the parsed representation for execution directives
// conveyed in query comments
type CommentDirectives map[string]string
type CommentDirectives struct {
m map[string]string
}
// Directives parses the comment list for any execution directives
// of the form:
@ -208,12 +212,12 @@ type CommentDirectives map[string]string
// /*vt+ OPTION_ONE=1 OPTION_TWO OPTION_THREE=abcd */
//
// It returns the map of the directive values or nil if there aren't any.
func (c *ParsedComments) Directives() CommentDirectives {
func (c *ParsedComments) Directives() *CommentDirectives {
if c == nil {
return nil
}
if c._directives == nil {
c._directives = make(CommentDirectives)
c._directives = &CommentDirectives{m: make(map[string]string)}
for _, commentStr := range c.comments {
if commentStr[0:5] != commentDirectivePreamble {
@ -228,7 +232,7 @@ func (c *ParsedComments) Directives() CommentDirectives {
if !ok {
val = "true"
}
c._directives[directive] = val
c._directives.m[strings.ToLower(directive)] = val
}
}
}
@ -254,12 +258,12 @@ func (c *ParsedComments) Prepend(comment string) Comments {
// IsSet checks the directive map for the named directive and returns
// true if the directive is set and has a true/false or 0/1 value
func (d CommentDirectives) IsSet(key string) bool {
func (d *CommentDirectives) IsSet(key string) bool {
if d == nil {
return false
}
val, ok := d[key]
if !ok {
val, found := d.m[strings.ToLower(key)]
if !found {
return false
}
// ParseBool handles "0", "1", "true", "false" and all similars
@ -268,15 +272,18 @@ func (d CommentDirectives) IsSet(key string) bool {
}
// GetString gets a directive value as string, with default value if not found
func (d CommentDirectives) GetString(key string, defaultVal string) string {
val, ok := d[key]
func (d *CommentDirectives) GetString(key string, defaultVal string) (string, bool) {
if d == nil {
return "", false
}
val, ok := d.m[strings.ToLower(key)]
if !ok {
return defaultVal
return defaultVal, false
}
if unquoted, err := strconv.Unquote(val); err == nil {
return unquoted
return unquoted, true
}
return val
return val, true
}
// MultiShardAutocommitDirective returns true if multishard autocommit directive is set to true in query.
@ -362,10 +369,3 @@ func AllowScatterDirective(stmt Statement) bool {
}
return comments != nil && comments.Directives().IsSet(DirectiveAllowScatter)
}
func CommentsForStatement(stmt Statement) Comments {
if commented, ok := stmt.(Commented); ok {
return commented.GetParsedComments().comments
}
return nil
}

Просмотреть файл

@ -21,6 +21,8 @@ import (
"reflect"
"testing"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/assert"
)
@ -262,58 +264,58 @@ func TestExtractMysqlComment(t *testing.T) {
func TestExtractCommentDirectives(t *testing.T) {
var testCases = []struct {
input string
vals CommentDirectives
vals map[string]string
}{{
input: "",
vals: nil,
}, {
input: "/* not a vt comment */",
vals: CommentDirectives{},
vals: map[string]string{},
}, {
input: "/*vt+ */",
vals: CommentDirectives{},
vals: map[string]string{},
}, {
input: "/*vt+ SINGLE_OPTION */",
vals: CommentDirectives{
"SINGLE_OPTION": "true",
vals: map[string]string{
"single_option": "true",
},
}, {
input: "/*vt+ ONE_OPT TWO_OPT */",
vals: CommentDirectives{
"ONE_OPT": "true",
"TWO_OPT": "true",
vals: map[string]string{
"one_opt": "true",
"two_opt": "true",
},
}, {
input: "/*vt+ ONE_OPT */ /* other comment */ /*vt+ TWO_OPT */",
vals: CommentDirectives{
"ONE_OPT": "true",
"TWO_OPT": "true",
vals: map[string]string{
"one_opt": "true",
"two_opt": "true",
},
}, {
input: "/*vt+ ONE_OPT=abc TWO_OPT=def */",
vals: CommentDirectives{
"ONE_OPT": "abc",
"TWO_OPT": "def",
vals: map[string]string{
"one_opt": "abc",
"two_opt": "def",
},
}, {
input: "/*vt+ ONE_OPT=true TWO_OPT=false */",
vals: CommentDirectives{
"ONE_OPT": "true",
"TWO_OPT": "false",
vals: map[string]string{
"one_opt": "true",
"two_opt": "false",
},
}, {
input: "/*vt+ ONE_OPT=true TWO_OPT=\"false\" */",
vals: CommentDirectives{
"ONE_OPT": "true",
"TWO_OPT": "\"false\"",
vals: map[string]string{
"one_opt": "true",
"two_opt": "\"false\"",
},
}, {
input: "/*vt+ RANGE_OPT=[a:b] ANOTHER ANOTHER_WITH_VALEQ=val= AND_ONE_WITH_EQ== */",
vals: CommentDirectives{
"RANGE_OPT": "[a:b]",
"ANOTHER": "true",
"ANOTHER_WITH_VALEQ": "val=",
"AND_ONE_WITH_EQ": "=",
vals: map[string]string{
"range_opt": "[a:b]",
"another": "true",
"another_with_valeq": "val=",
"and_one_with_eq": "=",
},
}}
@ -359,7 +361,11 @@ func TestExtractCommentDirectives(t *testing.T) {
}
vals := comments.Directives()
if !reflect.DeepEqual(vals, testCase.vals) {
if vals == nil {
require.Nil(t, vals)
return
}
if !reflect.DeepEqual(vals.m, testCase.vals) {
t.Errorf("test input: '%v', got vals %T:\n%+v, want %T\n%+v", testCase.input, vals, vals, testCase.vals, testCase.vals)
}
})
@ -367,38 +373,21 @@ func TestExtractCommentDirectives(t *testing.T) {
})
}
d := CommentDirectives{
"ONE_OPT": "true",
"TWO_OPT": "false",
d := &CommentDirectives{m: map[string]string{
"one_opt": "true",
"two_opt": "false",
"three": "1",
"four": "2",
"five": "0",
"six": "true",
}
}}
if !d.IsSet("ONE_OPT") {
t.Errorf("d.IsSet(ONE_OPT) should be true")
}
if d.IsSet("TWO_OPT") {
t.Errorf("d.IsSet(TWO_OPT) should be false")
}
if !d.IsSet("three") {
t.Errorf("d.IsSet(three) should be true")
}
if d.IsSet("four") {
t.Errorf("d.IsSet(four) should be false")
}
if d.IsSet("five") {
t.Errorf("d.IsSet(five) should be false")
}
if !d.IsSet("six") {
t.Errorf("d.IsSet(six) should be false")
}
assert.True(t, d.IsSet("ONE_OPT"), "d.IsSet(ONE_OPT)")
assert.False(t, d.IsSet("TWO_OPT"), "d.IsSet(TWO_OPT)")
assert.True(t, d.IsSet("three"), "d.IsSet(three)")
assert.False(t, d.IsSet("four"), "d.IsSet(four)")
assert.False(t, d.IsSet("five"), "d.IsSet(five)")
assert.True(t, d.IsSet("six"), "d.IsSet(six)")
}
func TestSkipQueryPlanCacheDirective(t *testing.T) {

Просмотреть файл

@ -248,6 +248,7 @@ const (
VitessStr = "vitess"
TraditionalStr = "traditional"
AnalyzeStr = "analyze"
VTExplainStr = "vtexplain"
// Lock Types
ReadStr = "read"
@ -720,6 +721,7 @@ const (
TreeType
JSONType
VitessType
VTExplainType
TraditionalType
AnalyzeType
)

Просмотреть файл

@ -664,6 +664,7 @@ var keywords = []keyword{
{"vitess_throttled_apps", VITESS_THROTTLED_APPS},
{"vschema", VSCHEMA},
{"vstream", VSTREAM},
{"vtexplain", VTEXPLAIN},
{"warnings", WARNINGS},
{"wait_for_executed_gtid_set", WAIT_FOR_EXECUTED_GTID_SET},
{"wait_until_sql_thread_after_gtids", WAIT_UNTIL_SQL_THREAD_AFTER_GTIDS},

Просмотреть файл

@ -2059,6 +2059,9 @@ var (
}, {
input: "describe select * from t",
output: "explain select * from t",
}, {
input: "describe /*vt+ execute_dml_queries */ select * from t",
output: "explain /*vt+ execute_dml_queries */ select * from t",
}, {
input: "desc select * from t",
output: "explain select * from t",
@ -2081,11 +2084,16 @@ var (
input: "explain format = tree select * from t",
}, {
input: "explain format = json select * from t",
}, {
input: "explain format = vtexplain select * from t",
}, {
input: "explain format = vitess select * from t",
}, {
input: "describe format = vitess select * from t",
output: "explain format = vitess select * from t",
}, {
input: "describe format = vtexplain select * from t",
output: "explain format = vtexplain select * from t",
}, {
input: "explain delete from t",
}, {

13610
go/vt/sqlparser/sql.go сгенерированный

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -367,7 +367,7 @@ func bindVariable(yylex yyLexer, bvar string) {
%token <str> GTID_SUBSET GTID_SUBTRACT WAIT_FOR_EXECUTED_GTID_SET WAIT_UNTIL_SQL_THREAD_AFTER_GTIDS
// Explain tokens
%token <str> FORMAT TREE VITESS TRADITIONAL
%token <str> FORMAT TREE VITESS TRADITIONAL VTEXPLAIN
// Lock type tokens
%token <str> LOCAL LOW_PRIORITY
@ -4234,6 +4234,10 @@ explain_format_opt:
{
$$ = VitessType
}
| FORMAT '=' VTEXPLAIN
{
$$ = VTExplainType
}
| FORMAT '=' TRADITIONAL
{
$$ = TraditionalType
@ -4289,13 +4293,13 @@ wild_opt:
}
explain_statement:
explain_synonyms table_name wild_opt
explain_synonyms comment_opt table_name wild_opt
{
$$ = &ExplainTab{Table: $2, Wild: $3}
$$ = &ExplainTab{Table: $3, Wild: $4}
}
| explain_synonyms explain_format_opt explainable_statement
| explain_synonyms comment_opt explain_format_opt explainable_statement
{
$$ = &ExplainStmt{Type: $2, Statement: $3}
$$ = &ExplainStmt{Type: $3, Statement: $4, Comments: Comments($2).Parsed()}
}
other_statement:

Просмотреть файл

@ -1179,6 +1179,20 @@ func (cached *VStream) CachedSize(alloc bool) int64 {
size += hack.RuntimeAllocSize(int64(len(cached.Position)))
return size
}
func (cached *VTExplain) CachedSize(alloc bool) int64 {
if cached == nil {
return int64(0)
}
size := int64(0)
if alloc {
size += int64(16)
}
// field Input vitess.io/vitess/go/vt/vtgate/engine.Primitive
if cc, ok := cached.Input.(cachedObject); ok {
size += cc.CachedSize(true)
}
return size
}
func (cached *VindexFunc) CachedSize(alloc bool) int64 {
if cached == nil {
return int64(0)

Просмотреть файл

@ -55,22 +55,22 @@ func (t *noopVCursor) StreamExecutePrimitiveStandalone(ctx context.Context, prim
}
func (t *noopVCursor) AnyAdvisoryLockTaken() bool {
//TODO implement me
// TODO implement me
panic("implement me")
}
func (t *noopVCursor) AddAdvisoryLock(name string) {
//TODO implement me
// TODO implement me
panic("implement me")
}
func (t *noopVCursor) RemoveAdvisoryLock(name string) {
//TODO implement me
// TODO implement me
panic("implement me")
}
func (t *noopVCursor) ReleaseLock(context.Context) error {
//TODO implement me
// TODO implement me
panic("implement me")
}
@ -192,7 +192,7 @@ func (t *noopVCursor) SetUDV(key string, value any) error {
}
func (t *noopVCursor) SetSysVar(name string, expr string) {
//panic("implement me")
// panic("implement me")
}
func (t *noopVCursor) InReservedConn() bool {
@ -686,6 +686,15 @@ func (f *loggingVCursor) CanUseSetVar() bool {
return useSetVar
}
func (t *noopVCursor) VtExplainLogging() {}
func (t *noopVCursor) DisableLogging() {}
func (t *noopVCursor) GetVTExplainLogs() []ExecuteEntry {
return nil
}
func (t *noopVCursor) GetLogs() ([]ExecuteEntry, error) {
return nil, nil
}
func expectResult(t *testing.T, msg string, result, want *sqltypes.Result) {
t.Helper()
if !reflect.DeepEqual(result, want) {

Просмотреть файл

@ -169,6 +169,9 @@ type (
AddAdvisoryLock(name string)
// RemoveAdvisoryLock removes advisory lock from the session
RemoveAdvisoryLock(name string)
VtExplainLogging()
GetVTExplainLogs() []ExecuteEntry
}
// Match is used to check if a Primitive matches

Просмотреть файл

@ -0,0 +1,125 @@
/*
Copyright 2022 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package engine
import (
"context"
"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)
type (
ExecuteEntry struct {
ID int
Keyspace string
Shard string
TabletType topodatapb.TabletType
Cell string
Query string
}
VTExplain struct {
Input Primitive
}
)
var _ Primitive = (*VTExplain)(nil)
// RouteType implements the Primitive interface
func (v *VTExplain) RouteType() string {
return v.Input.RouteType()
}
// GetKeyspaceName implements the Primitive interface
func (v *VTExplain) GetKeyspaceName() string {
return v.Input.GetKeyspaceName()
}
// GetTableName implements the Primitive interface
func (v *VTExplain) GetTableName() string {
return v.Input.GetTableName()
}
// GetFields implements the Primitive interface
func (v *VTExplain) GetFields(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
return v.Input.GetFields(ctx, vcursor, bindVars)
}
// NeedsTransaction implements the Primitive interface
func (v *VTExplain) NeedsTransaction() bool {
return v.Input.NeedsTransaction()
}
// TryExecute implements the Primitive interface
func (v *VTExplain) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
vcursor.Session().VtExplainLogging()
_, err := vcursor.ExecutePrimitive(ctx, v.Input, bindVars, wantfields)
if err != nil {
return nil, err
}
result := convertToVTExplainResult(vcursor.Session().GetVTExplainLogs())
return result, nil
}
// TryStreamExecute implements the Primitive interface
func (v *VTExplain) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
vcursor.Session().VtExplainLogging()
err := vcursor.StreamExecutePrimitive(ctx, v.Input, bindVars, wantfields, func(result *sqltypes.Result) error {
return nil
})
if err != nil {
return err
}
result := convertToVTExplainResult(vcursor.Session().GetVTExplainLogs())
return callback(result)
}
func convertToVTExplainResult(logs []ExecuteEntry) *sqltypes.Result {
fields := []*querypb.Field{{
Name: "#", Type: sqltypes.Int32,
}, {
Name: "keyspace", Type: sqltypes.VarChar,
}, {
Name: "shard", Type: sqltypes.VarChar,
}, {
Name: "query", Type: sqltypes.VarChar,
}}
qr := &sqltypes.Result{
Fields: fields,
}
for _, line := range logs {
qr.Rows = append(qr.Rows, sqltypes.Row{
sqltypes.NewInt32(int32(line.ID)),
sqltypes.NewVarChar(line.Keyspace),
sqltypes.NewVarChar(line.Shard),
sqltypes.NewVarChar(line.Query),
})
}
return qr
}
// Inputs implements the Primitive interface
func (v *VTExplain) Inputs() []Primitive {
return []Primitive{v.Input}
}
func (v *VTExplain) description() PrimitiveDescription {
return PrimitiveDescription{
OperatorType: "VTEXPLAIN",
}
}

Просмотреть файл

@ -2503,6 +2503,31 @@ func TestExecutorDescHash(t *testing.T) {
require.NoError(t, err)
}
func TestExecutorVtExplain(t *testing.T) {
executor, _, _, sbclookup := createExecutorEnv()
session := NewAutocommitSession(&vtgatepb.Session{})
sbclookup.SetResults([]*sqltypes.Result{
sqltypes.MakeTestResult(sqltypes.MakeTestFields("name|user_id", "varchar|int64"), "apa|1", "apa|2"),
})
qr, err := executor.Execute(ctx, "TestExecutorVtExplain", session, "explain format=vtexplain select * from user where name = 'apa'", nil)
require.NoError(t, err)
txt := fmt.Sprintf("%v\n", qr.Rows)
lookupQuery := "select `name`, user_id from name_user_map where `name` in"
require.Contains(t, txt, lookupQuery)
// Test the streaming side as well
var results []sqltypes.Row
session = NewAutocommitSession(&vtgatepb.Session{})
err = executor.StreamExecute(ctx, "TestExecutorVtExplain", session, "explain format=vtexplain select * from user where name = 'apa'", nil, func(result *sqltypes.Result) error {
results = append(results, result.Rows...)
return nil
})
require.NoError(t, err)
txt = fmt.Sprintf("%v\n", results)
require.Contains(t, txt, lookupQuery)
}
func exec(executor *Executor, session *SafeSession, sql string) (*sqltypes.Result, error) {
return executor.Execute(context.Background(), "TestExecute", session, sql, nil)
}

Просмотреть файл

@ -190,7 +190,7 @@ func getPlannerFromQueryHint(stmt sqlparser.Statement) (plancontext.PlannerVersi
}
d := cm.GetParsedComments().Directives()
val, ok := d[sqlparser.DirectiveQueryPlanner]
val, ok := d.GetString(sqlparser.DirectiveQueryPlanner, "")
if !ok {
return plancontext.PlannerVersion(0), false
}

Просмотреть файл

@ -36,11 +36,15 @@ func buildExplainPlan(stmt sqlparser.Explain, reservedVars *sqlparser.ReservedVa
case *sqlparser.ExplainTab:
return explainTabPlan(explain, vschema)
case *sqlparser.ExplainStmt:
if explain.Type == sqlparser.VitessType {
switch explain.Type {
case sqlparser.VitessType:
return buildVitessTypePlan(explain, reservedVars, vschema, enableOnlineDDL, enableDirectDDL)
}
case sqlparser.VTExplainType:
return buildVTExplainTypePlan(explain, reservedVars, vschema, enableOnlineDDL, enableDirectDDL)
default:
return buildOtherReadAndAdmin(sqlparser.String(explain), vschema)
}
}
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] unexpected explain type: %T", stmt)
}
@ -111,6 +115,23 @@ func buildVitessTypePlan(explain *sqlparser.ExplainStmt, reservedVars *sqlparser
return newPlanResult(engine.NewRowsPrimitive(rows, fields)), nil
}
func buildVTExplainTypePlan(explain *sqlparser.ExplainStmt, reservedVars *sqlparser.ReservedVars, vschema plancontext.VSchema, enableOnlineDDL, enableDirectDDL bool) (*planResult, error) {
input, err := createInstructionFor(sqlparser.String(explain.Statement), explain.Statement, reservedVars, vschema, enableOnlineDDL, enableDirectDDL)
if err != nil {
return nil, err
}
switch input.primitive.(type) {
case *engine.Insert, *engine.Delete, *engine.Update:
directives := explain.GetParsedComments().Directives()
if directives.IsSet(sqlparser.DirectiveVtexplainRunDMLQueries) {
break
}
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "explain format = vtexplain will actually run queries. `/*vt+ %s */` must be set to run DML queries in vtexplain. Example: `explain /*vt+ %s */ format = vtexplain delete from t1`", sqlparser.DirectiveVtexplainRunDMLQueries, sqlparser.DirectiveVtexplainRunDMLQueries)
}
return &planResult{primitive: &engine.VTExplain{Input: input.primitive}, tables: input.tables}, nil
}
func extractQuery(m map[string]any) string {
queryObj, ok := m["Query"]
if !ok {

Просмотреть файл

@ -444,27 +444,24 @@ func planOrderByOnUnion(ctx *plancontext.PlanningContext, plan logicalPlan, unio
}
func pushCommentDirectivesOnPlan(plan logicalPlan, stmt sqlparser.Statement) (logicalPlan, error) {
var directives sqlparser.CommentDirectives
var directives *sqlparser.CommentDirectives
cmt, ok := stmt.(sqlparser.Commented)
if ok {
directives = cmt.GetParsedComments().Directives()
} else {
directives = make(sqlparser.CommentDirectives)
}
scatterAsWarns := directives.IsSet(sqlparser.DirectiveScatterErrorsAsWarnings)
queryTimeout := queryTimeout(directives)
timeout := queryTimeout(directives)
if scatterAsWarns || queryTimeout > 0 {
if scatterAsWarns || timeout > 0 {
_, _ = visit(plan, func(logicalPlan logicalPlan) (bool, logicalPlan, error) {
switch plan := logicalPlan.(type) {
case *routeGen4:
plan.eroute.ScatterErrorsAsWarnings = scatterAsWarns
plan.eroute.QueryTimeout = queryTimeout
plan.eroute.QueryTimeout = timeout
}
return true, logicalPlan, nil
})
}
}
return plan, nil
}

Просмотреть файл

@ -850,11 +850,10 @@ func (rb *route) exprIsValue(expr sqlparser.Expr) bool {
}
// queryTimeout returns DirectiveQueryTimeout value if set, otherwise returns 0.
func queryTimeout(d sqlparser.CommentDirectives) int {
if val, ok := d[sqlparser.DirectiveQueryTimeout]; ok {
func queryTimeout(d *sqlparser.CommentDirectives) int {
val, _ := d.GetString(sqlparser.DirectiveQueryTimeout, "0")
if intVal, err := strconv.Atoi(val); err == nil {
return intVal
}
}
return 0
}

Просмотреть файл

@ -119,7 +119,7 @@ func buildShowBasicPlan(show *sqlparser.ShowBasic, vschema plancontext.VSchema)
case sqlparser.VitessTarget:
return buildShowTargetPlan(vschema)
case sqlparser.VschemaTables:
return buildVschemaTablesPlan(show, vschema)
return buildVschemaTablesPlan(vschema)
case sqlparser.VschemaVindexes:
return buildVschemaVindexesPlan(show, vschema)
}
@ -229,11 +229,11 @@ func buildDBPlan(show *sqlparser.ShowBasic, vschema plancontext.VSchema) (engine
filter = regexp.MustCompile(".*")
}
//rows := make([][]sqltypes.Value, 0, len(ks)+4)
// rows := make([][]sqltypes.Value, 0, len(ks)+4)
var rows [][]sqltypes.Value
if show.Command == sqlparser.Database {
//Hard code default databases
// Hard code default databases
ks = append(ks, &vindexes.Keyspace{Name: "information_schema"},
&vindexes.Keyspace{Name: "mysql"},
&vindexes.Keyspace{Name: "sys"},
@ -645,7 +645,7 @@ func buildEnginesPlan() (engine.Primitive, error) {
buildVarCharFields("Engine", "Support", "Comment", "Transactions", "XA", "Savepoints")), nil
}
func buildVschemaTablesPlan(show *sqlparser.ShowBasic, vschema plancontext.VSchema) (engine.Primitive, error) {
func buildVschemaTablesPlan(vschema plancontext.VSchema) (engine.Primitive, error) {
vs := vschema.GetVSchema()
ks, err := vschema.DefaultKeyspace()
if err != nil {

Просмотреть файл

@ -5544,3 +5544,72 @@ Gen4 plan same as above
]
}
Gen4 plan same as above
# explain dml without any directive should fail
"explain format=vtexplain delete from user"
"explain format = vtexplain will actually run queries. `/*vt+ EXECUTE_DML_QUERIES */` must be set to run DML queries in vtexplain. Example: `explain /*vt+ EXECUTE_DML_QUERIES */ format = vtexplain delete from t1`"
Gen4 plan same as above
# explain dml with actually_run_query directive
"explain /*vt+ execute_dml_queries */ format=vtexplain delete from user"
{
"QueryType": "EXPLAIN",
"Original": "explain /*vt+ execute_dml_queries */ format=vtexplain delete from user",
"Instructions": {
"OperatorType": "VTEXPLAIN",
"Inputs": [
{
"OperatorType": "Delete",
"Variant": "Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"TargetTabletType": "PRIMARY",
"KsidLength": 1,
"KsidVindex": "user_index",
"MultiShardAutocommit": false,
"OwnedVindexQuery": "select Id, `Name`, Costly from `user` for update",
"Query": "delete from `user`",
"Table": "user"
}
]
},
"TablesUsed": [
"user.user"
]
}
Gen4 plan same as above
# explain dml with actually_run_query directive - 2
"explain /*vt+ eXECUTE_DML_QUERIES */ format=vtexplain delete from user"
{
"QueryType": "EXPLAIN",
"Original": "explain /*vt+ eXECUTE_DML_QUERIES */ format=vtexplain delete from user",
"Instructions": {
"OperatorType": "VTEXPLAIN",
"Inputs": [
{
"OperatorType": "Delete",
"Variant": "Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"TargetTabletType": "PRIMARY",
"KsidLength": 1,
"KsidVindex": "user_index",
"MultiShardAutocommit": false,
"OwnedVindexQuery": "select Id, `Name`, Costly from `user` for update",
"Query": "delete from `user`",
"Table": "user"
}
]
},
"TablesUsed": [
"user.user"
]
}
Gen4 plan same as above

Просмотреть файл

@ -80,3 +80,48 @@ Gen4 plan same as above
}
}
Gen4 plan same as above
"explain format=vtexplain select * from user"
{
"QueryType": "EXPLAIN",
"Original": "explain format=vtexplain select * from user",
"Instructions": {
"OperatorType": "VTEXPLAIN",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "Unsharded",
"Keyspace": {
"Name": "main",
"Sharded": false
},
"FieldQuery": "select * from `user` where 1 != 1",
"Query": "select * from `user`",
"Table": "`user`"
}
]
}
}
{
"QueryType": "EXPLAIN",
"Original": "explain format=vtexplain select * from user",
"Instructions": {
"OperatorType": "VTEXPLAIN",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "Unsharded",
"Keyspace": {
"Name": "main",
"Sharded": false
},
"FieldQuery": "select * from `user` where 1 != 1",
"Query": "select * from `user`",
"Table": "`user`"
}
]
},
"TablesUsed": [
"main.user"
]
}

Просмотреть файл

@ -22,6 +22,10 @@ import (
"sync"
"time"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vtgate/engine"
"google.golang.org/protobuf/proto"
"vitess.io/vitess/go/vt/vterrors"
@ -32,11 +36,12 @@ import (
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)
// SafeSession is a mutex-protected version of the Session.
// It is thread-safe if each thread only accesses one shard.
// (the use pattern is 'Find', if not found, then 'AppendOrUpdate',
// for a single shard)
type SafeSession struct {
type (
// SafeSession is a mutex-protected version of the Session.
// It is thread-safe if each thread only accesses one shard.
// (the use pattern is 'Find', if not found, then 'AppendOrUpdate',
// for a single shard)
SafeSession struct {
mu sync.Mutex
mustRollback bool
autocommitState autocommitState
@ -56,26 +61,45 @@ type SafeSession struct {
// as the query that started a new transaction on the shard belong to a vindex.
queryFromVindex bool
*vtgatepb.Session
}
logging *executeLogger
// autocommitState keeps track of whether a single round-trip
// commit to vttablet is possible. It starts as autocommitable
// if we started a transaction because of the autocommit flag
// being set. Otherwise, it starts as notAutocommitable.
// If execute is recursively called using the same session,
// like from a vindex, we will already be in a transaction,
// and this should cause the state to become notAutocommitable.
//
// SafeSession lets you request a commit token, which will
// be issued if the state is autocommitable,
// implying that no intermediate transactions were started.
// If so, the state transitions to autocommited, which is terminal.
// If the token is successfully issued, the caller has to perform
// the commit. If a token cannot be issued, then a traditional
// commit has to be performed at the outermost level where
// the autocommitable transition happened.
type autocommitState int
*vtgatepb.Session
}
executeLogger struct {
mu sync.Mutex
entries []engine.ExecuteEntry
lastID int
}
// autocommitState keeps track of whether a single round-trip
// commit to vttablet is possible. It starts as autocommitable
// if we started a transaction because of the autocommit flag
// being set. Otherwise, it starts as notAutocommitable.
// If execute is recursively called using the same session,
// like from a vindex, we will already be in a transaction,
// and this should cause the state to become notAutocommitable.
//
// SafeSession lets you request a commit token, which will
// be issued if the state is autocommitable,
// implying that no intermediate transactions were started.
// If so, the state transitions to autocommited, which is terminal.
// If the token is successfully issued, the caller has to perform
// the commit. If a token cannot be issued, then a traditional
// commit has to be performed at the outermost level where
// the autocommitable transition happened.
autocommitState int
// savepointState keeps track of whether savepoints need to be inserted
// before running the query. This will help us prevent rolling back the
// entire transaction in case of partial failures, and be closer to MySQL
// compatibility, by only reverting the changes from the failed statement
// If execute is recursively called using the same session,
// like from a vindex, we should not override the savePointState.
// It is set the first time and is then permanent for the remainder of the query
// execution. It should not be affected later by transactions starting or not.
savepointState int
)
const (
notAutocommittable = autocommitState(iota)
@ -83,16 +107,6 @@ const (
autocommitted
)
// savepointState keeps track of whether savepoints need to be inserted
// before running the query. This will help us prevent rolling back the
// entire transaction in case of partial failures, and be closer to MySQL
// compatibility, by only reverting the changes from the failed statement
// If execute is recursively called using the same session,
// like from a vindex, we should not override the savePointState.
// It is set the first time and is then permanent for the remainder of the query
// execution. It should not be affected later by transactions starting or not.
type savepointState int
const (
savepointStateNotSet = savepointState(iota)
// savepointNotNeeded - savepoint is not required
@ -828,3 +842,59 @@ func (session *SafeSession) ClearAdvisoryLock() {
session.AdvisoryLock = nil
}
func (session *SafeSession) EnableLogging() {
session.mu.Lock()
defer session.mu.Unlock()
session.logging = &executeLogger{}
}
func (l *executeLogger) log(target *querypb.Target, query string, begin bool, bv map[string]*querypb.BindVariable) {
if l == nil {
return
}
l.mu.Lock()
defer l.mu.Unlock()
id := l.lastID
l.lastID++
if begin {
l.entries = append(l.entries, engine.ExecuteEntry{
ID: id,
Keyspace: target.Keyspace,
Shard: target.Shard,
TabletType: target.TabletType,
Cell: target.Cell,
Query: "begin",
})
}
ast, err := sqlparser.Parse(query)
if err != nil {
panic("query not able to parse. this should not happen")
}
pq := sqlparser.NewParsedQuery(ast)
if bv == nil {
bv = map[string]*querypb.BindVariable{}
}
q, err := pq.GenerateQuery(bv, nil)
if err != nil {
panic("query not able to generate query. this should not happen")
}
l.entries = append(l.entries, engine.ExecuteEntry{
ID: id,
Keyspace: target.Keyspace,
Shard: target.Shard,
TabletType: target.TabletType,
Cell: target.Cell,
Query: q,
})
}
func (l *executeLogger) GetLogs() []engine.ExecuteEntry {
l.mu.Lock()
defer l.mu.Unlock()
result := make([]engine.ExecuteEntry, len(l.entries))
copy(result, l.entries)
return result
}

Просмотреть файл

@ -252,6 +252,8 @@ func (stc *ScatterConn) ExecuteMultiShard(
default:
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] unexpected actionNeeded on query execution: %v", info.actionNeeded)
}
session.logging.log(rs.Target, queries[i].Sql, info.actionNeeded == begin || info.actionNeeded == reserveBegin, queries[i].BindVariables)
// We need to new shard info irrespective of the error.
newInfo := info.updateTransactionAndReservedID(transactionID, reservedID, alias)
if err != nil {
@ -447,6 +449,8 @@ func (stc *ScatterConn) StreamExecuteMulti(
default:
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] unexpected actionNeeded on query execution: %v", info.actionNeeded)
}
session.logging.log(rs.Target, query, info.actionNeeded == begin || info.actionNeeded == reserveBegin, bindVars[i])
// We need to new shard info irrespective of the error.
newInfo := info.updateTransactionAndReservedID(transactionID, reservedID, alias)
if err != nil {
@ -838,9 +842,9 @@ func lockInfo(target *querypb.Target, session *SafeSession, lockFuncType sqlpars
// TODO: after release 14.0, uncomment this line.
// This commented for backward compatiblity as there is a specific check in vttablet for lock functions,
// to always be on reserved connection.
//if lockFuncType != sqlparser.GetLock {
// if lockFuncType != sqlparser.GetLock {
// return info, nil
//}
// }
if info.reservedID == 0 {
info.actionNeeded = reserve
}

Просмотреть файл

@ -76,6 +76,7 @@ func (txc *TxConn) Commit(ctx context.Context, session *SafeSession) error {
case vtgatepb.TransactionMode_UNSPECIFIED:
twopc = txc.mode == vtgatepb.TransactionMode_TWOPC
}
if twopc {
return txc.commit2PC(ctx, session)
}
@ -89,7 +90,7 @@ func (txc *TxConn) queryService(alias *topodatapb.TabletAlias) (queryservice.Que
return txc.tabletGateway.QueryServiceByAlias(alias, nil)
}
func (txc *TxConn) commitShard(ctx context.Context, s *vtgatepb.Session_ShardSession) error {
func (txc *TxConn) commitShard(ctx context.Context, s *vtgatepb.Session_ShardSession, logging *executeLogger) error {
if s.TransactionId == 0 {
return nil
}
@ -105,24 +106,25 @@ func (txc *TxConn) commitShard(ctx context.Context, s *vtgatepb.Session_ShardSes
}
s.TransactionId = 0
s.ReservedId = reservedID
logging.log(s.Target, "commit", false, nil)
return nil
}
func (txc *TxConn) commitNormal(ctx context.Context, session *SafeSession) error {
if err := txc.runSessions(ctx, session.PreSessions, txc.commitShard); err != nil {
if err := txc.runSessions(ctx, session.PreSessions, session.logging, txc.commitShard); err != nil {
_ = txc.Release(ctx, session)
return err
}
// Retain backward compatibility on commit order for the normal session.
for _, shardSession := range session.ShardSessions {
if err := txc.commitShard(ctx, shardSession); err != nil {
if err := txc.commitShard(ctx, shardSession, session.logging); err != nil {
_ = txc.Release(ctx, session)
return err
}
}
if err := txc.runSessions(ctx, session.PostSessions, txc.commitShard); err != nil {
if err := txc.runSessions(ctx, session.PostSessions, session.logging, txc.commitShard); err != nil {
// If last commit fails, there will be nothing to rollback.
session.RecordWarning(&querypb.QueryWarning{Message: fmt.Sprintf("post-operation transaction had an error: %v", err)})
// With reserved connection we should release them.
@ -158,7 +160,7 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) error {
return err
}
err = txc.runSessions(ctx, session.ShardSessions[1:], func(ctx context.Context, s *vtgatepb.Session_ShardSession) error {
err = txc.runSessions(ctx, session.ShardSessions[1:], session.logging, func(ctx context.Context, s *vtgatepb.Session_ShardSession, logging *executeLogger) error {
return txc.tabletGateway.Prepare(ctx, s.Target, s.TransactionId, dtid)
})
if err != nil {
@ -176,7 +178,7 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) error {
return err
}
err = txc.runSessions(ctx, session.ShardSessions[1:], func(ctx context.Context, s *vtgatepb.Session_ShardSession) error {
err = txc.runSessions(ctx, session.ShardSessions[1:], session.logging, func(ctx context.Context, s *vtgatepb.Session_ShardSession, logging *executeLogger) error {
return txc.tabletGateway.CommitPrepared(ctx, s.Target, dtid)
})
if err != nil {
@ -196,7 +198,7 @@ func (txc *TxConn) Rollback(ctx context.Context, session *SafeSession) error {
allsessions := append(session.PreSessions, session.ShardSessions...)
allsessions = append(allsessions, session.PostSessions...)
err := txc.runSessions(ctx, allsessions, func(ctx context.Context, s *vtgatepb.Session_ShardSession) error {
err := txc.runSessions(ctx, allsessions, session.logging, func(ctx context.Context, s *vtgatepb.Session_ShardSession, logging *executeLogger) error {
if s.TransactionId == 0 {
return nil
}
@ -210,6 +212,7 @@ func (txc *TxConn) Rollback(ctx context.Context, session *SafeSession) error {
}
s.TransactionId = 0
s.ReservedId = reservedID
logging.log(s.Target, "rollback", false, nil)
return nil
})
if err != nil {
@ -231,7 +234,7 @@ func (txc *TxConn) Release(ctx context.Context, session *SafeSession) error {
allsessions := append(session.PreSessions, session.ShardSessions...)
allsessions = append(allsessions, session.PostSessions...)
return txc.runSessions(ctx, allsessions, func(ctx context.Context, s *vtgatepb.Session_ShardSession) error {
return txc.runSessions(ctx, allsessions, session.logging, func(ctx context.Context, s *vtgatepb.Session_ShardSession, logging *executeLogger) error {
if s.ReservedId == 0 && s.TransactionId == 0 {
return nil
}
@ -281,7 +284,7 @@ func (txc *TxConn) ReleaseAll(ctx context.Context, session *SafeSession) error {
allsessions = append(allsessions, session.LockSession)
}
return txc.runSessions(ctx, allsessions, func(ctx context.Context, s *vtgatepb.Session_ShardSession) error {
return txc.runSessions(ctx, allsessions, session.logging, func(ctx context.Context, s *vtgatepb.Session_ShardSession, loggging *executeLogger) error {
if s.ReservedId == 0 && s.TransactionId == 0 {
return nil
}
@ -361,11 +364,11 @@ func (txc *TxConn) resumeCommit(ctx context.Context, target *querypb.Target, tra
return txc.tabletGateway.ConcludeTransaction(ctx, target, transaction.Dtid)
}
// runSessions executes the action for all shardSessions in parallel and returns a consolildated error.
func (txc *TxConn) runSessions(ctx context.Context, shardSessions []*vtgatepb.Session_ShardSession, action func(context.Context, *vtgatepb.Session_ShardSession) error) error {
// runSessions executes the action for all shardSessions in parallel and returns a consolidated error.
func (txc *TxConn) runSessions(ctx context.Context, shardSessions []*vtgatepb.Session_ShardSession, logging *executeLogger, action func(context.Context, *vtgatepb.Session_ShardSession, *executeLogger) error) error {
// Fastpath.
if len(shardSessions) == 1 {
return action(ctx, shardSessions[0])
return action(ctx, shardSessions[0], logging)
}
allErrors := new(concurrency.AllErrorRecorder)
@ -374,7 +377,7 @@ func (txc *TxConn) runSessions(ctx context.Context, shardSessions []*vtgatepb.Se
wg.Add(1)
go func(s *vtgatepb.Session_ShardSession) {
defer wg.Done()
if err := action(ctx, s); err != nil {
if err := action(ctx, s, logging); err != nil {
allErrors.RecordError(err)
}
}(s)

Просмотреть файл

@ -1189,7 +1189,7 @@ func TestTxConnMultiGoSessions(t *testing.T) {
Keyspace: "0",
},
}}
err := txc.runSessions(ctx, input, func(ctx context.Context, s *vtgatepb.Session_ShardSession) error {
err := txc.runSessions(ctx, input, nil, func(ctx context.Context, s *vtgatepb.Session_ShardSession, logger *executeLogger) error {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "err %s", s.Target.Keyspace)
})
want := "err 0"
@ -1204,7 +1204,7 @@ func TestTxConnMultiGoSessions(t *testing.T) {
Keyspace: "1",
},
}}
err = txc.runSessions(ctx, input, func(ctx context.Context, s *vtgatepb.Session_ShardSession) error {
err = txc.runSessions(ctx, input, nil, func(ctx context.Context, s *vtgatepb.Session_ShardSession, logger *executeLogger) error {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "err %s", s.Target.Keyspace)
})
want = "err 0\nerr 1"
@ -1212,7 +1212,7 @@ func TestTxConnMultiGoSessions(t *testing.T) {
wantCode := vtrpcpb.Code_INTERNAL
assert.Equal(t, wantCode, vterrors.Code(err), "error code")
err = txc.runSessions(ctx, input, func(ctx context.Context, s *vtgatepb.Session_ShardSession) error {
err = txc.runSessions(ctx, input, nil, func(ctx context.Context, s *vtgatepb.Session_ShardSession, logger *executeLogger) error {
return nil
})
require.NoError(t, err)

Просмотреть файл

@ -427,6 +427,7 @@ func (vc *vcursorImpl) Execute(ctx context.Context, method string, query string,
if co == vtgatepb.CommitOrder_AUTOCOMMIT {
// For autocommit, we have to create an independent session.
session = NewAutocommitSession(vc.safeSession.Session)
session.logging = vc.safeSession.logging
rollbackOnError = false
} else {
session.SetCommitOrder(co)
@ -988,3 +989,11 @@ func (vc *vcursorImpl) cloneWithAutocommitSession() *vcursorImpl {
pv: vc.pv,
}
}
func (vc *vcursorImpl) VtExplainLogging() {
vc.safeSession.EnableLogging()
}
func (vc *vcursorImpl) GetVTExplainLogs() []engine.ExecuteEntry {
return vc.safeSession.logging.GetLogs()
}

Просмотреть файл

@ -51,7 +51,7 @@ func (tm *TabletManager) ExecuteFetchAsDba(ctx context.Context, query []byte, db
}
// Handle special possible directives
var directives sqlparser.CommentDirectives
var directives *sqlparser.CommentDirectives
if stmt, err := sqlparser.Parse(string(query)); err == nil {
if cmnt, ok := stmt.(sqlparser.Commented); ok {
directives = cmnt.GetParsedComments().Directives()

Просмотреть файл

@ -167,7 +167,7 @@ func (rs *rowStreamer) buildPlan() error {
}
directives := sel.Comments.Directives()
if s := directives.GetString("ukColumns", ""); s != "" {
if s, found := directives.GetString("ukColumns", ""); found {
rs.ukColumnNames, err = textutil.SplitUnescape(s, ",")
if err != nil {
return err

Просмотреть файл

@ -621,6 +621,15 @@
"RetryMax": 2,
"Tags": []
},
"vtgate_queries_vtexplain": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/vtgate/queries/vtexplain"],
"Command": [],
"Manual": false,
"Shard": "vtgate_queries",
"RetryMax": 2,
"Tags": []
},
"vtgate_concurrentdml": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/vtgate/concurrentdml"],