зеркало из https://github.com/github/vitess-gh.git
codegen: use strings.Builder instead of hack
For the sake of efficiency, we used to 'hack' some places by coercing a []byte into a string. This hack is now officially encapsulated safely by strings.Builder. The last place where we're still using hack.String is in sqltypes.Value. That will require some thinking. I've also deleted code for other hacks that we don't use any more. Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
This commit is contained in:
Родитель
66c45d95c1
Коммит
b607d6439c
|
@ -23,53 +23,13 @@ import (
|
|||
"unsafe"
|
||||
)
|
||||
|
||||
// StringArena lets you consolidate allocations for a group of strings
|
||||
// that have similar life length
|
||||
type StringArena struct {
|
||||
buf []byte
|
||||
str string
|
||||
}
|
||||
|
||||
// NewStringArena creates an arena of the specified size.
|
||||
func NewStringArena(size int) *StringArena {
|
||||
sa := &StringArena{buf: make([]byte, 0, size)}
|
||||
pbytes := (*reflect.SliceHeader)(unsafe.Pointer(&sa.buf))
|
||||
pstring := (*reflect.StringHeader)(unsafe.Pointer(&sa.str))
|
||||
pstring.Data = pbytes.Data
|
||||
pstring.Len = pbytes.Cap
|
||||
return sa
|
||||
}
|
||||
|
||||
// NewString copies a byte slice into the arena and returns it as a string.
|
||||
// If the arena is full, it returns a traditional go string.
|
||||
func (sa *StringArena) NewString(b []byte) string {
|
||||
if len(b) == 0 {
|
||||
return ""
|
||||
}
|
||||
if len(sa.buf)+len(b) > cap(sa.buf) {
|
||||
return string(b)
|
||||
}
|
||||
start := len(sa.buf)
|
||||
sa.buf = append(sa.buf, b...)
|
||||
return sa.str[start : start+len(b)]
|
||||
}
|
||||
|
||||
// SpaceLeft returns the amount of space left in the arena.
|
||||
func (sa *StringArena) SpaceLeft() int {
|
||||
return cap(sa.buf) - len(sa.buf)
|
||||
}
|
||||
|
||||
// String force casts a []byte to a string.
|
||||
// USE AT YOUR OWN RISK
|
||||
func String(b []byte) (s string) {
|
||||
if len(b) == 0 {
|
||||
return ""
|
||||
}
|
||||
pbytes := (*reflect.SliceHeader)(unsafe.Pointer(&b))
|
||||
pstring := (*reflect.StringHeader)(unsafe.Pointer(&s))
|
||||
pstring.Data = pbytes.Data
|
||||
pstring.Len = pbytes.Len
|
||||
return
|
||||
return *(*string)(unsafe.Pointer(&b))
|
||||
}
|
||||
|
||||
// StringPointer returns &s[0], which is not allowed in go
|
||||
|
|
|
@ -20,61 +20,6 @@ import (
|
|||
"testing"
|
||||
)
|
||||
|
||||
func TestStringArena(t *testing.T) {
|
||||
sarena := NewStringArena(10)
|
||||
|
||||
s0 := sarena.NewString(nil)
|
||||
checkint(t, len(sarena.buf), 0)
|
||||
checkint(t, sarena.SpaceLeft(), 10)
|
||||
checkstring(t, s0, "")
|
||||
|
||||
s1 := sarena.NewString([]byte("01234"))
|
||||
checkint(t, len(sarena.buf), 5)
|
||||
checkint(t, sarena.SpaceLeft(), 5)
|
||||
checkstring(t, s1, "01234")
|
||||
|
||||
s2 := sarena.NewString([]byte("5678"))
|
||||
checkint(t, len(sarena.buf), 9)
|
||||
checkint(t, sarena.SpaceLeft(), 1)
|
||||
checkstring(t, s2, "5678")
|
||||
|
||||
// s3 will be allocated outside of sarena
|
||||
s3 := sarena.NewString([]byte("ab"))
|
||||
checkint(t, len(sarena.buf), 9)
|
||||
checkint(t, sarena.SpaceLeft(), 1)
|
||||
checkstring(t, s3, "ab")
|
||||
|
||||
// s4 should still fit in sarena
|
||||
s4 := sarena.NewString([]byte("9"))
|
||||
checkint(t, len(sarena.buf), 10)
|
||||
checkint(t, sarena.SpaceLeft(), 0)
|
||||
checkstring(t, s4, "9")
|
||||
|
||||
sarena.buf[0] = 'A'
|
||||
checkstring(t, s1, "A1234")
|
||||
|
||||
sarena.buf[5] = 'B'
|
||||
checkstring(t, s2, "B678")
|
||||
|
||||
sarena.buf[9] = 'C'
|
||||
// s3 will not change
|
||||
checkstring(t, s3, "ab")
|
||||
checkstring(t, s4, "C")
|
||||
checkstring(t, sarena.str, "A1234B678C")
|
||||
}
|
||||
|
||||
func checkstring(t *testing.T, actual, expected string) {
|
||||
if actual != expected {
|
||||
t.Errorf("received %s, expecting %s", actual, expected)
|
||||
}
|
||||
}
|
||||
|
||||
func checkint(t *testing.T, actual, expected int) {
|
||||
if actual != expected {
|
||||
t.Errorf("received %d, expecting %d", actual, expected)
|
||||
}
|
||||
}
|
||||
|
||||
func TestByteToString(t *testing.T) {
|
||||
v1 := []byte("1234")
|
||||
if s := String(v1); s != "1234" {
|
||||
|
|
|
@ -625,7 +625,7 @@ func (bls *Streamer) appendInserts(statements []FullBinlogStatement, tce *tableC
|
|||
|
||||
statement := &binlogdatapb.BinlogTransaction_Statement{
|
||||
Category: binlogdatapb.BinlogTransaction_Statement_BL_INSERT,
|
||||
Sql: sql.Bytes(),
|
||||
Sql: []byte(sql.String()),
|
||||
}
|
||||
statements = append(statements, FullBinlogStatement{
|
||||
Statement: statement,
|
||||
|
@ -668,7 +668,7 @@ func (bls *Streamer) appendUpdates(statements []FullBinlogStatement, tce *tableC
|
|||
|
||||
update := &binlogdatapb.BinlogTransaction_Statement{
|
||||
Category: binlogdatapb.BinlogTransaction_Statement_BL_UPDATE,
|
||||
Sql: sql.Bytes(),
|
||||
Sql: []byte(sql.String()),
|
||||
}
|
||||
statements = append(statements, FullBinlogStatement{
|
||||
Statement: update,
|
||||
|
@ -704,7 +704,7 @@ func (bls *Streamer) appendDeletes(statements []FullBinlogStatement, tce *tableC
|
|||
|
||||
statement := &binlogdatapb.BinlogTransaction_Statement{
|
||||
Category: binlogdatapb.BinlogTransaction_Statement_BL_DELETE,
|
||||
Sql: sql.Bytes(),
|
||||
Sql: []byte(sql.String()),
|
||||
}
|
||||
statements = append(statements, FullBinlogStatement{
|
||||
Statement: statement,
|
||||
|
|
|
@ -17,7 +17,6 @@ limitations under the License.
|
|||
package sqlparser
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
|
@ -265,9 +264,9 @@ func String(node SQLNode) string {
|
|||
}
|
||||
|
||||
// Append appends the SQLNode to the buffer.
|
||||
func Append(buf *bytes.Buffer, node SQLNode) {
|
||||
func Append(buf *strings.Builder, node SQLNode) {
|
||||
tbuf := &TrackedBuffer{
|
||||
Buffer: buf,
|
||||
Builder: buf,
|
||||
}
|
||||
node.Format(tbuf)
|
||||
}
|
||||
|
|
|
@ -33,7 +33,7 @@ func TestAppend(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
var b bytes.Buffer
|
||||
var b strings.Builder
|
||||
Append(&b, tree)
|
||||
got := b.String()
|
||||
want := query
|
||||
|
|
|
@ -17,7 +17,7 @@ limitations under the License.
|
|||
package sqlparser
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"strings"
|
||||
|
||||
"vitess.io/vitess/go/sqltypes"
|
||||
)
|
||||
|
@ -27,7 +27,7 @@ import (
|
|||
// Encodable defines the interface for types that can
|
||||
// be custom-encoded into SQL.
|
||||
type Encodable interface {
|
||||
EncodeSQL(buf *bytes.Buffer)
|
||||
EncodeSQL(buf *strings.Builder)
|
||||
}
|
||||
|
||||
// InsertValues is a custom SQL encoder for the values of
|
||||
|
@ -35,7 +35,7 @@ type Encodable interface {
|
|||
type InsertValues [][]sqltypes.Value
|
||||
|
||||
// EncodeSQL performs the SQL encoding for InsertValues.
|
||||
func (iv InsertValues) EncodeSQL(buf *bytes.Buffer) {
|
||||
func (iv InsertValues) EncodeSQL(buf *strings.Builder) {
|
||||
for i, rows := range iv {
|
||||
if i != 0 {
|
||||
buf.WriteString(", ")
|
||||
|
@ -60,7 +60,7 @@ type TupleEqualityList struct {
|
|||
|
||||
// EncodeSQL generates the where clause constraints for the tuple
|
||||
// equality.
|
||||
func (tpl *TupleEqualityList) EncodeSQL(buf *bytes.Buffer) {
|
||||
func (tpl *TupleEqualityList) EncodeSQL(buf *strings.Builder) {
|
||||
if len(tpl.Columns) == 1 {
|
||||
tpl.encodeAsIn(buf)
|
||||
return
|
||||
|
@ -68,7 +68,7 @@ func (tpl *TupleEqualityList) EncodeSQL(buf *bytes.Buffer) {
|
|||
tpl.encodeAsEquality(buf)
|
||||
}
|
||||
|
||||
func (tpl *TupleEqualityList) encodeAsIn(buf *bytes.Buffer) {
|
||||
func (tpl *TupleEqualityList) encodeAsIn(buf *strings.Builder) {
|
||||
Append(buf, tpl.Columns[0])
|
||||
buf.WriteString(" in (")
|
||||
for i, r := range tpl.Rows {
|
||||
|
@ -80,7 +80,7 @@ func (tpl *TupleEqualityList) encodeAsIn(buf *bytes.Buffer) {
|
|||
buf.WriteByte(')')
|
||||
}
|
||||
|
||||
func (tpl *TupleEqualityList) encodeAsEquality(buf *bytes.Buffer) {
|
||||
func (tpl *TupleEqualityList) encodeAsEquality(buf *strings.Builder) {
|
||||
for i, r := range tpl.Rows {
|
||||
if i != 0 {
|
||||
buf.WriteString(" or ")
|
||||
|
|
|
@ -17,7 +17,7 @@ limitations under the License.
|
|||
package sqlparser
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"vitess.io/vitess/go/sqltypes"
|
||||
|
@ -64,7 +64,7 @@ func TestEncodable(t *testing.T) {
|
|||
out: "(pk1 = 1 and pk2 = 'aa') or (pk1 = 2 and pk2 = 'bb')",
|
||||
}}
|
||||
for _, tcase := range tcases {
|
||||
buf := new(bytes.Buffer)
|
||||
buf := new(strings.Builder)
|
||||
tcase.in.EncodeSQL(buf)
|
||||
if out := buf.String(); out != tcase.out {
|
||||
t.Errorf("EncodeSQL(%v): %s, want %s", tcase.in, out, tcase.out)
|
||||
|
|
|
@ -17,9 +17,9 @@ limitations under the License.
|
|||
package sqlparser
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"vitess.io/vitess/go/sqltypes"
|
||||
|
||||
|
@ -47,28 +47,29 @@ func NewParsedQuery(node SQLNode) *ParsedQuery {
|
|||
// GenerateQuery generates a query by substituting the specified
|
||||
// bindVariables. The extras parameter specifies special parameters
|
||||
// that can perform custom encoding.
|
||||
func (pq *ParsedQuery) GenerateQuery(bindVariables map[string]*querypb.BindVariable, extras map[string]Encodable) ([]byte, error) {
|
||||
func (pq *ParsedQuery) GenerateQuery(bindVariables map[string]*querypb.BindVariable, extras map[string]Encodable) (string, error) {
|
||||
if len(pq.bindLocations) == 0 {
|
||||
return []byte(pq.Query), nil
|
||||
return pq.Query, nil
|
||||
}
|
||||
buf := bytes.NewBuffer(make([]byte, 0, len(pq.Query)))
|
||||
var buf strings.Builder
|
||||
buf.Grow(len(pq.Query))
|
||||
current := 0
|
||||
for _, loc := range pq.bindLocations {
|
||||
buf.WriteString(pq.Query[current:loc.offset])
|
||||
name := pq.Query[loc.offset : loc.offset+loc.length]
|
||||
if encodable, ok := extras[name[1:]]; ok {
|
||||
encodable.EncodeSQL(buf)
|
||||
encodable.EncodeSQL(&buf)
|
||||
} else {
|
||||
supplied, _, err := FetchBindVar(name, bindVariables)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return "", err
|
||||
}
|
||||
EncodeValue(buf, supplied)
|
||||
EncodeValue(&buf, supplied)
|
||||
}
|
||||
current = loc.offset + loc.length
|
||||
}
|
||||
buf.WriteString(pq.Query[current:])
|
||||
return buf.Bytes(), nil
|
||||
return buf.String(), nil
|
||||
}
|
||||
|
||||
// MarshalJSON is a custom JSON marshaler for ParsedQuery.
|
||||
|
@ -78,7 +79,7 @@ func (pq *ParsedQuery) MarshalJSON() ([]byte, error) {
|
|||
}
|
||||
|
||||
// EncodeValue encodes one bind variable value into the query.
|
||||
func EncodeValue(buf *bytes.Buffer, value *querypb.BindVariable) {
|
||||
func EncodeValue(buf *strings.Builder, value *querypb.BindVariable) {
|
||||
if value.Type != querypb.Type_TUPLE {
|
||||
// Since we already check for TUPLE, we don't expect an error.
|
||||
v, _ := sqltypes.BindVariableToValue(value)
|
||||
|
|
|
@ -17,8 +17,8 @@ limitations under the License.
|
|||
package sqlparser
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// NodeFormatter defines the signature of a custom node formatter
|
||||
|
@ -33,7 +33,7 @@ type NodeFormatter func(buf *TrackedBuffer, node SQLNode)
|
|||
// But you can supply a different formatting function if you
|
||||
// want to generate a query that's different from the default.
|
||||
type TrackedBuffer struct {
|
||||
*bytes.Buffer
|
||||
*strings.Builder
|
||||
bindLocations []bindLocation
|
||||
nodeFormatter NodeFormatter
|
||||
}
|
||||
|
@ -41,7 +41,7 @@ type TrackedBuffer struct {
|
|||
// NewTrackedBuffer creates a new TrackedBuffer.
|
||||
func NewTrackedBuffer(nodeFormatter NodeFormatter) *TrackedBuffer {
|
||||
return &TrackedBuffer{
|
||||
Buffer: new(bytes.Buffer),
|
||||
Builder: new(strings.Builder),
|
||||
nodeFormatter: nodeFormatter,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
"flag"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"vitess.io/vitess/go/jsonutil"
|
||||
|
@ -102,7 +103,7 @@ func (tq *TabletQuery) MarshalJSON() ([]byte, error) {
|
|||
// Convert Bindvars to strings for nicer output
|
||||
bindVars := make(map[string]string)
|
||||
for k, v := range tq.BindVars {
|
||||
var b bytes.Buffer
|
||||
var b strings.Builder
|
||||
sqlparser.EncodeValue(&b, v)
|
||||
bindVars[k] = b.String()
|
||||
}
|
||||
|
|
|
@ -183,7 +183,7 @@ func generateInsertShardedQuery(node *sqlparser.Insert, eins *engine.Insert, val
|
|||
for rowNum, val := range valueTuples {
|
||||
midBuf.Myprintf("%v", val)
|
||||
eins.Mid[rowNum] = midBuf.String()
|
||||
midBuf.Truncate(0)
|
||||
midBuf.Reset()
|
||||
}
|
||||
suffixBuf.Myprintf("%v", node.OnDup)
|
||||
eins.Suffix = suffixBuf.String()
|
||||
|
|
|
@ -25,7 +25,6 @@ import (
|
|||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"vitess.io/vitess/go/hack"
|
||||
"vitess.io/vitess/go/sqlescape"
|
||||
"vitess.io/vitess/go/sqltypes"
|
||||
"vitess.io/vitess/go/timer"
|
||||
|
@ -202,7 +201,7 @@ func (r *Reader) bindHeartbeatFetch() (string, error) {
|
|||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return hack.String(bound), nil
|
||||
return bound, nil
|
||||
}
|
||||
|
||||
// parseHeartbeatResult turns a raw result into the timestamp for processing.
|
||||
|
|
|
@ -25,7 +25,6 @@ import (
|
|||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"vitess.io/vitess/go/hack"
|
||||
"vitess.io/vitess/go/mysql"
|
||||
"vitess.io/vitess/go/sqlescape"
|
||||
"vitess.io/vitess/go/sqltypes"
|
||||
|
@ -198,7 +197,7 @@ func (w *Writer) bindHeartbeatVars(query string) (string, error) {
|
|||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return hack.String(bound), nil
|
||||
return bound, nil
|
||||
}
|
||||
|
||||
// writeHeartbeat updates the heartbeat row for this tablet with the current time in nanoseconds.
|
||||
|
|
|
@ -121,7 +121,7 @@ func validateValue(col *schema.TableColumn, value sqltypes.Value) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func buildStreamComment(table *schema.Table, pkValueList [][]sqltypes.Value, secondaryList [][]sqltypes.Value) []byte {
|
||||
func buildStreamComment(table *schema.Table, pkValueList [][]sqltypes.Value, secondaryList [][]sqltypes.Value) string {
|
||||
buf := sqlparser.NewTrackedBuffer(nil)
|
||||
buf.Myprintf(" /* _stream %v (", table.Name)
|
||||
// We assume the first index exists, and is the pk
|
||||
|
@ -132,7 +132,7 @@ func buildStreamComment(table *schema.Table, pkValueList [][]sqltypes.Value, sec
|
|||
buildPKValueList(buf, table, pkValueList)
|
||||
buildPKValueList(buf, table, secondaryList)
|
||||
buf.WriteString("; */")
|
||||
return buf.Bytes()
|
||||
return buf.String()
|
||||
}
|
||||
|
||||
func buildPKValueList(buf *sqlparser.TrackedBuffer, table *schema.Table, pkValueList [][]sqltypes.Value) {
|
||||
|
|
|
@ -184,7 +184,7 @@ func TestCodexBuildStreamComment(t *testing.T) {
|
|||
pk2SecVal := sqltypes.NewVarChar("xyz")
|
||||
secondaryPKValues := []sqltypes.PlanValue{{}, {Value: pk2SecVal}}
|
||||
secondaryList, _ := buildSecondaryList(table, pkList, secondaryPKValues, bindVars)
|
||||
want := []byte(" /* _stream `Table` (pk1 pk2 ) (1 'YWJj' ) (1 'eHl6' ); */")
|
||||
want := " /* _stream `Table` (pk1 pk2 ) (1 'YWJj' ) (1 'eHl6' ); */"
|
||||
got := buildStreamComment(table, pkList, secondaryList)
|
||||
if !reflect.DeepEqual(got, want) {
|
||||
t.Fatalf("case 1 failed, got\n%s, want\n%s", got, want)
|
||||
|
|
|
@ -25,7 +25,6 @@ import (
|
|||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"vitess.io/vitess/go/hack"
|
||||
"vitess.io/vitess/go/mysql"
|
||||
"vitess.io/vitess/go/sqltypes"
|
||||
"vitess.io/vitess/go/trace"
|
||||
|
@ -131,7 +130,7 @@ func (qre *QueryExecutor) Execute() (reply *sqltypes.Result, err error) {
|
|||
if !qre.tsv.qe.allowUnsafeDMLs && (qre.tsv.qe.binlogFormat != connpool.BinlogFormatRow) {
|
||||
return nil, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "unsupported: cannot identify primary key of statement")
|
||||
}
|
||||
return qre.txFetch(conn, qre.plan.FullQuery, qre.bindVars, nil, nil, true, true)
|
||||
return qre.txFetch(conn, qre.plan.FullQuery, qre.bindVars, nil, "", true, true)
|
||||
case planbuilder.PlanInsertPK:
|
||||
return qre.execInsertPK(conn)
|
||||
case planbuilder.PlanInsertMessage:
|
||||
|
@ -147,7 +146,7 @@ func (qre *QueryExecutor) Execute() (reply *sqltypes.Result, err error) {
|
|||
case planbuilder.PlanUpsertPK:
|
||||
return qre.execUpsertPK(conn)
|
||||
case planbuilder.PlanSet:
|
||||
return qre.txFetch(conn, qre.plan.FullQuery, qre.bindVars, nil, nil, true, true)
|
||||
return qre.txFetch(conn, qre.plan.FullQuery, qre.bindVars, nil, "", true, true)
|
||||
case planbuilder.PlanPassSelect, planbuilder.PlanSelectLock, planbuilder.PlanSelectImpossible:
|
||||
return qre.execDirect(conn)
|
||||
default:
|
||||
|
@ -242,7 +241,7 @@ func (qre *QueryExecutor) Stream(callback func(*sqltypes.Result) error) error {
|
|||
qre.tsv.qe.streamQList.Add(qd)
|
||||
defer qre.tsv.qe.streamQList.Remove(qd)
|
||||
|
||||
return qre.streamFetch(conn, qre.plan.FullQuery, qre.bindVars, nil, callback)
|
||||
return qre.streamFetch(conn, qre.plan.FullQuery, qre.bindVars, "", callback)
|
||||
}
|
||||
|
||||
// MessageStream streams messages from a message table.
|
||||
|
@ -281,7 +280,7 @@ func (qre *QueryExecutor) execDmlAutoCommit() (reply *sqltypes.Result, err error
|
|||
if !qre.tsv.qe.allowUnsafeDMLs && (qre.tsv.qe.binlogFormat != connpool.BinlogFormatRow) {
|
||||
return nil, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "unsupported: cannot identify primary key of statement")
|
||||
}
|
||||
reply, err = qre.txFetch(conn, qre.plan.FullQuery, qre.bindVars, nil, nil, true, true)
|
||||
reply, err = qre.txFetch(conn, qre.plan.FullQuery, qre.bindVars, nil, "", true, true)
|
||||
case planbuilder.PlanInsertPK:
|
||||
reply, err = qre.execInsertPK(conn)
|
||||
case planbuilder.PlanInsertMessage:
|
||||
|
@ -413,7 +412,7 @@ func (qre *QueryExecutor) execDDL() (*sqltypes.Result, error) {
|
|||
sql := qre.query
|
||||
var err error
|
||||
if qre.plan.FullQuery != nil {
|
||||
sql, _, err = qre.generateFinalSQL(qre.plan.FullQuery, qre.bindVars, nil, nil)
|
||||
sql, _, err = qre.generateFinalSQL(qre.plan.FullQuery, qre.bindVars, nil, "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -527,14 +526,14 @@ func (qre *QueryExecutor) execNextval() (*sqltypes.Result, error) {
|
|||
// execDirect is for reads inside transactions. Always send to MySQL.
|
||||
func (qre *QueryExecutor) execDirect(conn *TxConnection) (*sqltypes.Result, error) {
|
||||
if qre.plan.Fields != nil {
|
||||
result, err := qre.txFetch(conn, qre.plan.FullQuery, qre.bindVars, nil, nil, true, false)
|
||||
result, err := qre.txFetch(conn, qre.plan.FullQuery, qre.bindVars, nil, "", true, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result.Fields = qre.plan.Fields
|
||||
return result, nil
|
||||
}
|
||||
return qre.txFetch(conn, qre.plan.FullQuery, qre.bindVars, nil, nil, true, false)
|
||||
return qre.txFetch(conn, qre.plan.FullQuery, qre.bindVars, nil, "", true, false)
|
||||
}
|
||||
|
||||
// execSelect sends a query to mysql only if another identical query is not running. Otherwise, it waits and
|
||||
|
@ -555,7 +554,7 @@ func (qre *QueryExecutor) execSelect() (*sqltypes.Result, error) {
|
|||
return nil, err
|
||||
}
|
||||
defer conn.Recycle()
|
||||
return qre.dbConnFetch(conn, qre.plan.FullQuery, qre.bindVars, nil, true)
|
||||
return qre.dbConnFetch(conn, qre.plan.FullQuery, qre.bindVars, "", true)
|
||||
}
|
||||
|
||||
func (qre *QueryExecutor) execInsertPK(conn *TxConnection) (*sqltypes.Result, error) {
|
||||
|
@ -609,7 +608,7 @@ func (qre *QueryExecutor) execInsertMessage(conn *TxConnection) (*sqltypes.Resul
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
readback, err := qre.txFetch(conn, loadMessages, qre.bindVars, extras, nil, true, false)
|
||||
readback, err := qre.txFetch(conn, loadMessages, qre.bindVars, extras, "", true, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -629,7 +628,7 @@ func (qre *QueryExecutor) execInsertMessage(conn *TxConnection) (*sqltypes.Resul
|
|||
}
|
||||
|
||||
func (qre *QueryExecutor) execInsertSubquery(conn *TxConnection) (*sqltypes.Result, error) {
|
||||
innerResult, err := qre.txFetch(conn, qre.plan.Subquery, qre.bindVars, nil, nil, true, false)
|
||||
innerResult, err := qre.txFetch(conn, qre.plan.Subquery, qre.bindVars, nil, "", true, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -656,7 +655,7 @@ func (qre *QueryExecutor) execInsertSubquery(conn *TxConnection) (*sqltypes.Resu
|
|||
}
|
||||
|
||||
func (qre *QueryExecutor) execInsertPKRows(conn *TxConnection, extras map[string]sqlparser.Encodable, pkRows [][]sqltypes.Value) (*sqltypes.Result, error) {
|
||||
var bsc []byte
|
||||
var bsc string
|
||||
// Build comments only if we're not in RBR mode.
|
||||
if qre.tsv.qe.binlogFormat != connpool.BinlogFormatRow {
|
||||
secondaryList, err := buildSecondaryList(qre.plan.Table, pkRows, qre.plan.SecondaryPKValues, qre.bindVars)
|
||||
|
@ -671,7 +670,7 @@ func (qre *QueryExecutor) execInsertPKRows(conn *TxConnection, extras map[string
|
|||
func (qre *QueryExecutor) execUpsertPK(conn *TxConnection) (*sqltypes.Result, error) {
|
||||
// For RBR, upserts are passed through.
|
||||
if qre.tsv.qe.binlogFormat == connpool.BinlogFormatRow {
|
||||
return qre.txFetch(conn, qre.plan.FullQuery, qre.bindVars, nil, nil, true, true)
|
||||
return qre.txFetch(conn, qre.plan.FullQuery, qre.bindVars, nil, "", true, true)
|
||||
}
|
||||
|
||||
// For statement or mixed mode, we have to split into two ops.
|
||||
|
@ -720,7 +719,7 @@ func (qre *QueryExecutor) execDMLPK(conn *TxConnection) (*sqltypes.Result, error
|
|||
}
|
||||
|
||||
func (qre *QueryExecutor) execDMLSubquery(conn *TxConnection) (*sqltypes.Result, error) {
|
||||
innerResult, err := qre.txFetch(conn, qre.plan.Subquery, qre.bindVars, nil, nil, true, false)
|
||||
innerResult, err := qre.txFetch(conn, qre.plan.Subquery, qre.bindVars, nil, "", true, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -748,7 +747,7 @@ func (qre *QueryExecutor) execDMLPKRows(conn *TxConnection, query *sqlparser.Par
|
|||
if secondaryList != nil {
|
||||
secondaryList = secondaryList[i:end]
|
||||
}
|
||||
var bsc []byte
|
||||
var bsc string
|
||||
// Build comments only if we're not in RBR mode.
|
||||
if qre.tsv.qe.binlogFormat != connpool.BinlogFormatRow {
|
||||
bsc = buildStreamComment(qre.plan.Table, pkRows, secondaryList)
|
||||
|
@ -787,7 +786,7 @@ func (qre *QueryExecutor) execSet() (*sqltypes.Result, error) {
|
|||
return nil, err
|
||||
}
|
||||
defer conn.Recycle()
|
||||
return qre.dbConnFetch(conn, qre.plan.FullQuery, qre.bindVars, nil, false)
|
||||
return qre.dbConnFetch(conn, qre.plan.FullQuery, qre.bindVars, "", false)
|
||||
}
|
||||
|
||||
func (qre *QueryExecutor) getConn() (*connpool.DBConn, error) {
|
||||
|
@ -825,7 +824,7 @@ func (qre *QueryExecutor) getStreamConn() (*connpool.DBConn, error) {
|
|||
}
|
||||
|
||||
func (qre *QueryExecutor) qFetch(logStats *tabletenv.LogStats, parsedQuery *sqlparser.ParsedQuery, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
|
||||
sql, sqlWithoutComments, err := qre.generateFinalSQL(parsedQuery, bindVars, nil, nil)
|
||||
sql, sqlWithoutComments, err := qre.generateFinalSQL(parsedQuery, bindVars, nil, "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -865,7 +864,7 @@ func (qre *QueryExecutor) qFetch(logStats *tabletenv.LogStats, parsedQuery *sqlp
|
|||
}
|
||||
|
||||
// txFetch fetches from a TxConnection.
|
||||
func (qre *QueryExecutor) txFetch(conn *TxConnection, parsedQuery *sqlparser.ParsedQuery, bindVars map[string]*querypb.BindVariable, extras map[string]sqlparser.Encodable, buildStreamComment []byte, wantfields, record bool) (*sqltypes.Result, error) {
|
||||
func (qre *QueryExecutor) txFetch(conn *TxConnection, parsedQuery *sqlparser.ParsedQuery, bindVars map[string]*querypb.BindVariable, extras map[string]sqlparser.Encodable, buildStreamComment string, wantfields, record bool) (*sqltypes.Result, error) {
|
||||
sql, _, err := qre.generateFinalSQL(parsedQuery, bindVars, extras, buildStreamComment)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -882,7 +881,7 @@ func (qre *QueryExecutor) txFetch(conn *TxConnection, parsedQuery *sqlparser.Par
|
|||
}
|
||||
|
||||
// dbConnFetch fetches from a connpool.DBConn.
|
||||
func (qre *QueryExecutor) dbConnFetch(conn *connpool.DBConn, parsedQuery *sqlparser.ParsedQuery, bindVars map[string]*querypb.BindVariable, buildStreamComment []byte, wantfields bool) (*sqltypes.Result, error) {
|
||||
func (qre *QueryExecutor) dbConnFetch(conn *connpool.DBConn, parsedQuery *sqlparser.ParsedQuery, bindVars map[string]*querypb.BindVariable, buildStreamComment string, wantfields bool) (*sqltypes.Result, error) {
|
||||
sql, _, err := qre.generateFinalSQL(parsedQuery, bindVars, nil, buildStreamComment)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -891,7 +890,7 @@ func (qre *QueryExecutor) dbConnFetch(conn *connpool.DBConn, parsedQuery *sqlpar
|
|||
}
|
||||
|
||||
// streamFetch performs a streaming fetch.
|
||||
func (qre *QueryExecutor) streamFetch(conn *connpool.DBConn, parsedQuery *sqlparser.ParsedQuery, bindVars map[string]*querypb.BindVariable, buildStreamComment []byte, callback func(*sqltypes.Result) error) error {
|
||||
func (qre *QueryExecutor) streamFetch(conn *connpool.DBConn, parsedQuery *sqlparser.ParsedQuery, bindVars map[string]*querypb.BindVariable, buildStreamComment string, callback func(*sqltypes.Result) error) error {
|
||||
sql, _, err := qre.generateFinalSQL(parsedQuery, bindVars, nil, buildStreamComment)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -899,22 +898,24 @@ func (qre *QueryExecutor) streamFetch(conn *connpool.DBConn, parsedQuery *sqlpar
|
|||
return qre.execStreamSQL(conn, sql, callback)
|
||||
}
|
||||
|
||||
func (qre *QueryExecutor) generateFinalSQL(parsedQuery *sqlparser.ParsedQuery, bindVars map[string]*querypb.BindVariable, extras map[string]sqlparser.Encodable, buildStreamComment []byte) (string, string, error) {
|
||||
func (qre *QueryExecutor) generateFinalSQL(parsedQuery *sqlparser.ParsedQuery, bindVars map[string]*querypb.BindVariable, extras map[string]sqlparser.Encodable, buildStreamComment string) (string, string, error) {
|
||||
bindVars["#maxLimit"] = sqltypes.Int64BindVariable(qre.getLimit(parsedQuery))
|
||||
|
||||
var sql []byte
|
||||
sql = append(sql, qre.marginComments.Leading...)
|
||||
var buf strings.Builder
|
||||
buf.WriteString(qre.marginComments.Leading)
|
||||
|
||||
query, err := parsedQuery.GenerateQuery(bindVars, extras)
|
||||
if err != nil {
|
||||
return "", "", vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "%s", err)
|
||||
}
|
||||
sql = append(sql, query...)
|
||||
if buildStreamComment != nil {
|
||||
sql = append(sql, buildStreamComment...)
|
||||
buf.WriteString(query)
|
||||
if buildStreamComment != "" {
|
||||
buf.WriteString(buildStreamComment)
|
||||
}
|
||||
fullSQL := append(sql, qre.marginComments.Trailing...)
|
||||
return hack.String(fullSQL), hack.String(sql), nil
|
||||
withoutComments := buf.String()
|
||||
buf.WriteString(qre.marginComments.Trailing)
|
||||
fullSQL := buf.String()
|
||||
return fullSQL, withoutComments, nil
|
||||
}
|
||||
|
||||
func (qre *QueryExecutor) getLimit(query *sqlparser.ParsedQuery) int64 {
|
||||
|
|
|
@ -32,7 +32,6 @@ import (
|
|||
"golang.org/x/net/context"
|
||||
|
||||
"vitess.io/vitess/go/acl"
|
||||
"vitess.io/vitess/go/hack"
|
||||
"vitess.io/vitess/go/history"
|
||||
"vitess.io/vitess/go/mysql"
|
||||
"vitess.io/vitess/go/sqltypes"
|
||||
|
@ -152,12 +151,12 @@ type TabletServer struct {
|
|||
// for health checks. This does not affect how queries are served.
|
||||
// target specifies the primary target type, and also allow specifies
|
||||
// secondary types that should be additionally allowed.
|
||||
mu sync.Mutex
|
||||
state int64
|
||||
lameduck sync2.AtomicInt32
|
||||
target querypb.Target
|
||||
alsoAllow []topodatapb.TabletType
|
||||
requests sync.WaitGroup
|
||||
mu sync.Mutex
|
||||
state int64
|
||||
lameduck sync2.AtomicInt32
|
||||
target querypb.Target
|
||||
alsoAllow []topodatapb.TabletType
|
||||
requests sync.WaitGroup
|
||||
|
||||
// The following variables should be initialized only once
|
||||
// before starting the tabletserver.
|
||||
|
@ -229,7 +228,7 @@ type TxPoolController interface {
|
|||
AcceptReadOnly() error
|
||||
|
||||
// InitDBConfig must be called before Init.
|
||||
InitDBConfig(dbcfgs *dbconfigs.DBConfigs)
|
||||
InitDBConfig(dbcfgs *dbconfigs.DBConfigs)
|
||||
|
||||
// Init must be called once when vttablet starts for setting
|
||||
// up the metadata tables.
|
||||
|
@ -1200,7 +1199,7 @@ func (tsv *TabletServer) computeTxSerializerKey(ctx context.Context, logStats *t
|
|||
}
|
||||
|
||||
// Example: table1 where id = 1 and sub_id = 2
|
||||
key := fmt.Sprintf("%s%s", tableName, hack.String(where))
|
||||
key := fmt.Sprintf("%s%s", tableName, where)
|
||||
return key, tableName.String()
|
||||
}
|
||||
|
||||
|
@ -1766,7 +1765,7 @@ func (se *splitQuerySQLExecuter) SQLExecute(
|
|||
se.conn,
|
||||
parsedQuery,
|
||||
sqltypes.CopyBindVariables(bindVariables),
|
||||
nil, /* buildStreamComment */
|
||||
"", /* buildStreamComment */
|
||||
true, /* wantfields */
|
||||
)
|
||||
}
|
||||
|
|
|
@ -22,7 +22,6 @@ import (
|
|||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"vitess.io/vitess/go/hack"
|
||||
"vitess.io/vitess/go/mysql"
|
||||
"vitess.io/vitess/go/sqlescape"
|
||||
"vitess.io/vitess/go/sqltypes"
|
||||
|
@ -233,11 +232,11 @@ func (tpc *TwoPC) SaveRedo(ctx context.Context, conn *TxConnection, dtid string,
|
|||
extras := map[string]sqlparser.Encodable{
|
||||
"vals": sqlparser.InsertValues(rows),
|
||||
}
|
||||
b, err := tpc.insertRedoStmt.GenerateQuery(nil, extras)
|
||||
q, err := tpc.insertRedoStmt.GenerateQuery(nil, extras)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = conn.Exec(ctx, hack.String(b), 1, false)
|
||||
_, err = conn.Exec(ctx, q, 1, false)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -361,11 +360,11 @@ func (tpc *TwoPC) CreateTransaction(ctx context.Context, conn *TxConnection, dti
|
|||
extras := map[string]sqlparser.Encodable{
|
||||
"vals": sqlparser.InsertValues(rows),
|
||||
}
|
||||
b, err := tpc.insertParticipants.GenerateQuery(nil, extras)
|
||||
q, err := tpc.insertParticipants.GenerateQuery(nil, extras)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = conn.Exec(ctx, hack.String(b), 1, false)
|
||||
_, err = conn.Exec(ctx, q, 1, false)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -533,17 +532,17 @@ func (tpc *TwoPC) ReadAllTransactions(ctx context.Context) ([]*DistributedTx, er
|
|||
}
|
||||
|
||||
func (tpc *TwoPC) exec(ctx context.Context, conn *TxConnection, pq *sqlparser.ParsedQuery, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
|
||||
b, err := pq.GenerateQuery(bindVars, nil)
|
||||
q, err := pq.GenerateQuery(bindVars, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return conn.Exec(ctx, hack.String(b), 1, false)
|
||||
return conn.Exec(ctx, q, 1, false)
|
||||
}
|
||||
|
||||
func (tpc *TwoPC) read(ctx context.Context, conn *connpool.DBConn, pq *sqlparser.ParsedQuery, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
|
||||
b, err := pq.GenerateQuery(bindVars, nil)
|
||||
q, err := pq.GenerateQuery(bindVars, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return conn.Exec(ctx, hack.String(b), 10000, false)
|
||||
return conn.Exec(ctx, q, 10000, false)
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче