зеркало из https://github.com/github/vitess-gh.git
Moving go client rows and streaming_rows out of vtgateconn
and into the go client driver.
This commit is contained in:
Родитель
a02c96c73b
Коммит
e83e214573
|
@ -150,7 +150,7 @@ func (s *stmt) Query(args []driver.Value) (driver.Rows, error) {
|
|||
defer cancel()
|
||||
if s.c.Streaming {
|
||||
qrc, errFunc := s.c.vtgateConn.StreamExecute(ctx, s.query, makeBindVars(args), s.c.TabletType)
|
||||
return vtgateconn.NewStreamingRows(qrc, errFunc), nil
|
||||
return newStreamingRows(qrc, errFunc), nil
|
||||
}
|
||||
var qr *mproto.QueryResult
|
||||
var err error
|
||||
|
@ -162,7 +162,7 @@ func (s *stmt) Query(args []driver.Value) (driver.Rows, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return vtgateconn.NewRows(qr), nil
|
||||
return newRows(qr), nil
|
||||
}
|
||||
|
||||
func makeBindVars(args []driver.Value) map[string]interface{} {
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package vtgateconn
|
||||
package client
|
||||
|
||||
import (
|
||||
"database/sql/driver"
|
||||
|
@ -20,8 +20,8 @@ type rows struct {
|
|||
index int
|
||||
}
|
||||
|
||||
// NewRows creates a new rows from qr.
|
||||
func NewRows(qr *mproto.QueryResult) driver.Rows {
|
||||
// newRows creates a new rows from qr.
|
||||
func newRows(qr *mproto.QueryResult) driver.Rows {
|
||||
return &rows{qr: qr}
|
||||
}
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
package vtgateconn
|
||||
package client
|
||||
|
||||
import (
|
||||
"database/sql/driver"
|
||||
|
@ -10,7 +10,7 @@ import (
|
|||
"github.com/youtube/vitess/go/sqltypes"
|
||||
)
|
||||
|
||||
var result1 = mproto.QueryResult{
|
||||
var rowsResult1 = mproto.QueryResult{
|
||||
Fields: []mproto.Field{
|
||||
mproto.Field{
|
||||
Name: "field1",
|
||||
|
@ -70,7 +70,7 @@ func logMismatchedTypes(t *testing.T, gotRow, wantRow []driver.Value) {
|
|||
}
|
||||
|
||||
func TestRows(t *testing.T) {
|
||||
ri := NewRows(&result1)
|
||||
ri := newRows(&rowsResult1)
|
||||
wantCols := []string{
|
||||
"field1",
|
||||
"field2",
|
||||
|
@ -148,7 +148,7 @@ var badResult2 = mproto.QueryResult{
|
|||
}
|
||||
|
||||
func TestRowsFail(t *testing.T) {
|
||||
ri := NewRows(&badResult1)
|
||||
ri := newRows(&badResult1)
|
||||
var dest []driver.Value
|
||||
err := ri.Next(dest)
|
||||
want := "length mismatch: dest is 0, fields are 1"
|
||||
|
@ -156,7 +156,7 @@ func TestRowsFail(t *testing.T) {
|
|||
t.Errorf("Next: %v, want %s", err, want)
|
||||
}
|
||||
|
||||
ri = NewRows(&badResult1)
|
||||
ri = newRows(&badResult1)
|
||||
dest = make([]driver.Value, 1)
|
||||
err = ri.Next(dest)
|
||||
want = "internal error: length mismatch: dest is 1, fields are 0"
|
||||
|
@ -164,7 +164,7 @@ func TestRowsFail(t *testing.T) {
|
|||
t.Errorf("Next: %v, want %s", err, want)
|
||||
}
|
||||
|
||||
ri = NewRows(&badResult2)
|
||||
ri = newRows(&badResult2)
|
||||
dest = make([]driver.Value, 1)
|
||||
err = ri.Next(dest)
|
||||
want = `conversion error: field: {field1 3 0}, val: value: strconv.ParseInt: parsing "value": invalid syntax`
|
|
@ -2,7 +2,7 @@
|
|||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package vtgateconn
|
||||
package client
|
||||
|
||||
import (
|
||||
"database/sql/driver"
|
||||
|
@ -10,21 +10,22 @@ import (
|
|||
"io"
|
||||
|
||||
mproto "github.com/youtube/vitess/go/mysql/proto"
|
||||
"github.com/youtube/vitess/go/vt/vtgate/vtgateconn"
|
||||
)
|
||||
|
||||
// streamingRows creates a database/sql/driver compliant Row iterator
|
||||
// for a streaming query.
|
||||
type streamingRows struct {
|
||||
qrc <-chan *mproto.QueryResult
|
||||
errFunc ErrFunc
|
||||
errFunc vtgateconn.ErrFunc
|
||||
failed error
|
||||
fields []mproto.Field
|
||||
qr *mproto.QueryResult
|
||||
index int
|
||||
}
|
||||
|
||||
// NewStreamingRows creates a new streamingRows from qrc and errFunc.
|
||||
func NewStreamingRows(qrc <-chan *mproto.QueryResult, errFunc ErrFunc) driver.Rows {
|
||||
// newStreamingRows creates a new streamingRows from qrc and errFunc.
|
||||
func newStreamingRows(qrc <-chan *mproto.QueryResult, errFunc vtgateconn.ErrFunc) driver.Rows {
|
||||
return &streamingRows{qrc: qrc, errFunc: errFunc}
|
||||
}
|
||||
|
|
@ -2,7 +2,7 @@
|
|||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package vtgateconn
|
||||
package client
|
||||
|
||||
import (
|
||||
"database/sql/driver"
|
||||
|
@ -14,6 +14,7 @@ import (
|
|||
|
||||
mproto "github.com/youtube/vitess/go/mysql/proto"
|
||||
"github.com/youtube/vitess/go/sqltypes"
|
||||
"github.com/youtube/vitess/go/vt/vtgate/vtgateconn"
|
||||
)
|
||||
|
||||
var packet1 = mproto.QueryResult{
|
||||
|
@ -54,7 +55,7 @@ var packet3 = mproto.QueryResult{
|
|||
}
|
||||
|
||||
func TestStreamingRows(t *testing.T) {
|
||||
qrc, errFunc := func() (<-chan *mproto.QueryResult, ErrFunc) {
|
||||
qrc, errFunc := func() (<-chan *mproto.QueryResult, vtgateconn.ErrFunc) {
|
||||
ch := make(chan *mproto.QueryResult)
|
||||
go func() {
|
||||
ch <- &packet1
|
||||
|
@ -64,7 +65,7 @@ func TestStreamingRows(t *testing.T) {
|
|||
}()
|
||||
return ch, func() error { return nil }
|
||||
}()
|
||||
ri := NewStreamingRows(qrc, errFunc)
|
||||
ri := newStreamingRows(qrc, errFunc)
|
||||
wantCols := []string{
|
||||
"field1",
|
||||
"field2",
|
||||
|
@ -111,7 +112,7 @@ func TestStreamingRows(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestStreamingRowsReversed(t *testing.T) {
|
||||
qrc, errFunc := func() (<-chan *mproto.QueryResult, ErrFunc) {
|
||||
qrc, errFunc := func() (<-chan *mproto.QueryResult, vtgateconn.ErrFunc) {
|
||||
ch := make(chan *mproto.QueryResult)
|
||||
go func() {
|
||||
ch <- &packet1
|
||||
|
@ -121,7 +122,7 @@ func TestStreamingRowsReversed(t *testing.T) {
|
|||
}()
|
||||
return ch, func() error { return nil }
|
||||
}()
|
||||
ri := NewStreamingRows(qrc, errFunc)
|
||||
ri := newStreamingRows(qrc, errFunc)
|
||||
|
||||
wantRow := []driver.Value{
|
||||
int64(1),
|
||||
|
@ -151,14 +152,14 @@ func TestStreamingRowsReversed(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestStreamingRowsError(t *testing.T) {
|
||||
qrc, errFunc := func() (<-chan *mproto.QueryResult, ErrFunc) {
|
||||
qrc, errFunc := func() (<-chan *mproto.QueryResult, vtgateconn.ErrFunc) {
|
||||
ch := make(chan *mproto.QueryResult)
|
||||
go func() {
|
||||
close(ch)
|
||||
}()
|
||||
return ch, func() error { return errors.New("error before fields") }
|
||||
}()
|
||||
ri := NewStreamingRows(qrc, errFunc)
|
||||
ri := newStreamingRows(qrc, errFunc)
|
||||
gotCols := ri.Columns()
|
||||
if gotCols != nil {
|
||||
t.Errorf("cols: %v, want nil", gotCols)
|
||||
|
@ -171,7 +172,7 @@ func TestStreamingRowsError(t *testing.T) {
|
|||
}
|
||||
_ = ri.Close()
|
||||
|
||||
qrc, errFunc = func() (<-chan *mproto.QueryResult, ErrFunc) {
|
||||
qrc, errFunc = func() (<-chan *mproto.QueryResult, vtgateconn.ErrFunc) {
|
||||
ch := make(chan *mproto.QueryResult)
|
||||
go func() {
|
||||
ch <- &packet1
|
||||
|
@ -179,7 +180,7 @@ func TestStreamingRowsError(t *testing.T) {
|
|||
}()
|
||||
return ch, func() error { return errors.New("error after fields") }
|
||||
}()
|
||||
ri = NewStreamingRows(qrc, errFunc)
|
||||
ri = newStreamingRows(qrc, errFunc)
|
||||
wantCols := []string{
|
||||
"field1",
|
||||
"field2",
|
||||
|
@ -202,7 +203,7 @@ func TestStreamingRowsError(t *testing.T) {
|
|||
}
|
||||
_ = ri.Close()
|
||||
|
||||
qrc, errFunc = func() (<-chan *mproto.QueryResult, ErrFunc) {
|
||||
qrc, errFunc = func() (<-chan *mproto.QueryResult, vtgateconn.ErrFunc) {
|
||||
ch := make(chan *mproto.QueryResult)
|
||||
go func() {
|
||||
ch <- &packet1
|
||||
|
@ -211,7 +212,7 @@ func TestStreamingRowsError(t *testing.T) {
|
|||
}()
|
||||
return ch, func() error { return errors.New("error after rows") }
|
||||
}()
|
||||
ri = NewStreamingRows(qrc, errFunc)
|
||||
ri = newStreamingRows(qrc, errFunc)
|
||||
gotRow = make([]driver.Value, 3)
|
||||
err = ri.Next(gotRow)
|
||||
if err != nil {
|
||||
|
@ -224,7 +225,7 @@ func TestStreamingRowsError(t *testing.T) {
|
|||
}
|
||||
_ = ri.Close()
|
||||
|
||||
qrc, errFunc = func() (<-chan *mproto.QueryResult, ErrFunc) {
|
||||
qrc, errFunc = func() (<-chan *mproto.QueryResult, vtgateconn.ErrFunc) {
|
||||
ch := make(chan *mproto.QueryResult)
|
||||
go func() {
|
||||
ch <- &packet2
|
||||
|
@ -232,7 +233,7 @@ func TestStreamingRowsError(t *testing.T) {
|
|||
}()
|
||||
return ch, func() error { return nil }
|
||||
}()
|
||||
ri = NewStreamingRows(qrc, errFunc)
|
||||
ri = newStreamingRows(qrc, errFunc)
|
||||
gotRow = make([]driver.Value, 3)
|
||||
err = ri.Next(gotRow)
|
||||
wantErr = "first packet did not return fields"
|
Загрузка…
Ссылка в новой задаче