Removing the sqldb.Conn interface.

Now using *mysql.Conn everywhere.
This commit is contained in:
Alain Jobart 2017-05-17 16:54:23 -07:00
Родитель 5bf6248b73
Коммит 828939550b
9 изменённых файлов: 47 добавлений и 150 удалений

Просмотреть файл

@ -131,11 +131,11 @@ type Conn struct {
writer *bufio.Writer
sequence uint8
// Internal variables for sqldb.Conn API for stream queries.
// This is set only if a streaming query is in progress, it is
// nil if no streaming query is in progress. If the streaming
// query returned no fields, this is set to an empty array
// (but not nil).
// fields contains the fields definitions for an on-going
// streaming query. It is set by ExecuteStreamFetch, and
// cleared by the last FetchNext(). It is nil if no streaming
// query is in progress. If the streaming query returned no
// fields, this is set to an empty array (but not nil).
fields []*querypb.Field
// Internal buffer for zero-allocation reads and writes. This
@ -535,6 +535,11 @@ func (c *Conn) RemoteAddr() net.Addr {
return c.conn.RemoteAddr()
}
// ID returns the MySQL connection ID for this connection.
func (c *Conn) ID() int64 {
return int64(c.ConnectionID)
}
// Close closes the connection. It can be called from a different go
// routine to interrupt the current connection.
func (c *Conn) Close() {
@ -542,6 +547,13 @@ func (c *Conn) Close() {
c.conn.Close()
}
// IsClosed returns true if this connection was ever closed by the
// Close() method. Note if the other side closes the connection, but
// Close() wasn't called, this will return false.
func (c *Conn) IsClosed() bool {
return c.Closed
}
//
// Packet writing methods, for generic packets.
//

Просмотреть файл

@ -250,7 +250,7 @@ func (c *Conn) parseRow(data []byte, fields []*querypb.Field) ([]sqltypes.Value,
return result, nil
}
// ExecuteFetch is the same as sqldb.Conn.ExecuteFetch.
// ExecuteFetch executes a query and returns the result.
// Returns a sqldb.SQLError. Depending on the transport used, the error
// returned might be different for the same condition:
//

Просмотреть файл

@ -17,23 +17,16 @@ limitations under the License.
package mysql
import (
"golang.org/x/net/context"
"github.com/youtube/vitess/go/sqldb"
"github.com/youtube/vitess/go/sqltypes"
querypb "github.com/youtube/vitess/go/vt/proto/query"
)
// This file contains the methods needed to implement the sqldb.Conn
// interface. These methods don't necessarely make sense, but it's
// easier to implement them as is, and then later on refactor
// everything, once the C version of mysql connection is gone.
//
// ExecuteFetch is in query.go.
// Close() is in conn.go.
// This file contains the methods needed to execute streaming queries.
// ExecuteStreamFetch is part of the sqldb.Conn interface.
// ExecuteStreamFetch starts a streaming query. Fields(), FetchNext() and
// CloseResult() can be called once this is successful.
// Returns a sqldb.SQLError.
func (c *Conn) ExecuteStreamFetch(query string) (err error) {
defer func() {
@ -105,7 +98,7 @@ func (c *Conn) ExecuteStreamFetch(query string) (err error) {
return nil
}
// Fields is part of the sqldb.Conn interface.
// Fields returns the fields for an ongoing streaming query.
func (c *Conn) Fields() ([]*querypb.Field, error) {
if c.fields == nil {
return nil, sqldb.NewSQLError(CRCommandsOutOfSync, SSUnknownSQLState, "no streaming query in progress")
@ -117,7 +110,8 @@ func (c *Conn) Fields() ([]*querypb.Field, error) {
return c.fields, nil
}
// FetchNext is part of the sqldb.Conn interface.
// FetchNext returns the next result for an ongoing streaming query.
// It returns (nil, nil) if there is nothing more to read.
func (c *Conn) FetchNext() ([]sqltypes.Value, error) {
if c.fields == nil {
// We are already done, and the result was closed.
@ -154,8 +148,8 @@ func (c *Conn) FetchNext() ([]sqltypes.Value, error) {
return c.parseRow(data, c.fields)
}
// CloseResult is part of the sqldb.Conn interface.
// Just drain the remaining values.
// CloseResult can be used to terminate a streaming query
// early. It just drains the remaining values.
func (c *Conn) CloseResult() {
for c.fields != nil {
rows, err := c.FetchNext()
@ -165,24 +159,3 @@ func (c *Conn) CloseResult() {
}
}
}
// IsClosed is part of the sqldb.Conn interface.
func (c *Conn) IsClosed() bool {
return c.Closed
}
// ID is part of the sqldb.Conn interface.
func (c *Conn) ID() int64 {
return int64(c.ConnectionID)
}
func init() {
sqldb.Register("mysqlconn", func(params sqldb.ConnParams) (sqldb.Conn, error) {
ctx := context.Background()
return Connect(ctx, &params)
})
sqldb.RegisterDefault(func(params sqldb.ConnParams) (sqldb.Conn, error) {
ctx := context.Background()
return Connect(ctx, &params)
})
}

Просмотреть файл

@ -1,94 +0,0 @@
/*
Copyright 2017 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package sqldb defines an interface for low level db connection.
package sqldb
import (
"fmt"
"sync"
"github.com/youtube/vitess/go/sqltypes"
querypb "github.com/youtube/vitess/go/vt/proto/query"
)
// NewConnFunc is a factory method that creates a Conn instance
// using given ConnParams.
type NewConnFunc func(params ConnParams) (Conn, error)
var (
defaultConn NewConnFunc
// mu protects conns.
mu sync.Mutex
conns = make(map[string]NewConnFunc)
)
// Conn defines the behavior for the low level db connection
type Conn interface {
// ExecuteFetch executes the query on the connection
ExecuteFetch(query string, maxrows int, wantfields bool) (*sqltypes.Result, error)
// ExecuteStreamFetch starts a streaming query to db server. Use FetchNext
// on the Connection until it returns nil or error
ExecuteStreamFetch(query string) error
// Close closes the db connection
Close()
// IsClosed returns if the connection was ever closed
IsClosed() bool
// CloseResult finishes the result set
CloseResult()
// Fields returns the current fields description for the query
Fields() ([]*querypb.Field, error)
// ID returns the connection id.
ID() int64
// FetchNext returns the next row for a query
FetchNext() ([]sqltypes.Value, error)
}
// RegisterDefault registers the default connection function.
// Only one default can be registered.
func RegisterDefault(fn NewConnFunc) {
if defaultConn != nil {
panic("default connection initialized more than once")
}
defaultConn = fn
}
// Register registers a db connection.
func Register(name string, fn NewConnFunc) {
mu.Lock()
defer mu.Unlock()
if _, ok := conns[name]; ok {
panic(fmt.Sprintf("register a registered key: %s", name))
}
conns[name] = fn
}
// Connect returns a sqldb.Conn using the default connection creation function.
func Connect(params ConnParams) (Conn, error) {
// Use a lock-free fast path for default.
if params.Engine == "" {
return defaultConn(params)
}
mu.Lock()
defer mu.Unlock()
fn, ok := conns[params.Engine]
if !ok {
panic(fmt.Sprintf("connection function not found for engine: %s", params.Engine))
}
return fn(params)
}

Просмотреть файл

@ -17,20 +17,22 @@ limitations under the License.
package dbconnpool
import (
"context"
"fmt"
"time"
"github.com/youtube/vitess/go/mysql"
"github.com/youtube/vitess/go/sqldb"
"github.com/youtube/vitess/go/sqltypes"
"github.com/youtube/vitess/go/stats"
"github.com/youtube/vitess/go/vt/dbconfigs"
)
// DBConnection re-exposes sqldb.Conn with some wrapping to implement
// DBConnection re-exposes mysql.Conn with some wrapping to implement
// most of PoolConnection interface, except Recycle. That way it can be used
// by itself. (Recycle needs to know about the Pool).
type DBConnection struct {
sqldb.Conn
*mysql.Conn
mysqlStats *stats.Timings
}
@ -123,6 +125,7 @@ func NewDBConnection(info *sqldb.ConnParams, mysqlStats *stats.Timings) (*DBConn
if err != nil {
return nil, err
}
c, err := sqldb.Connect(params)
ctx := context.Background()
c, err := mysql.Connect(ctx, &params)
return &DBConnection{c, mysqlStats}, err
}

Просмотреть файл

@ -41,6 +41,7 @@ import (
"bytes"
log "github.com/golang/glog"
"github.com/youtube/vitess/go/mysql"
"github.com/youtube/vitess/go/sqldb"
"github.com/youtube/vitess/go/stats"
"github.com/youtube/vitess/go/vt/dbconfigs"
@ -313,7 +314,7 @@ func (mysqld *Mysqld) wait(ctx context.Context, params sqldb.ConnParams) error {
_, statErr := os.Stat(mysqld.config.SocketFile)
if statErr == nil {
// Make sure the socket file isn't stale.
conn, connErr := sqldb.Connect(params)
conn, connErr := mysql.Connect(ctx, &params)
if connErr == nil {
conn.Close()
return nil

Просмотреть файл

@ -30,7 +30,7 @@ import (
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
"github.com/youtube/vitess/go/sqldb"
"github.com/youtube/vitess/go/mysql"
"github.com/youtube/vitess/go/sqltypes"
"github.com/youtube/vitess/go/vt/sqlparser"
"github.com/youtube/vitess/go/vt/vttablet/endtoend/framework"
@ -241,7 +241,8 @@ func TestUpsertNonPKHit(t *testing.T) {
}
func TestSchemaReload(t *testing.T) {
conn, err := sqldb.Connect(connParams)
ctx := context.Background()
conn, err := mysql.Connect(ctx, &connParams)
if err != nil {
t.Error(err)
return
@ -275,7 +276,8 @@ func TestSchemaReload(t *testing.T) {
}
func TestSidecarTables(t *testing.T) {
conn, err := sqldb.Connect(connParams)
ctx := context.Background()
conn, err := mysql.Connect(ctx, &connParams)
if err != nil {
t.Error(err)
return

Просмотреть файл

@ -17,6 +17,7 @@ limitations under the License.
package endtoend
import (
"context"
"fmt"
"reflect"
"strings"
@ -25,7 +26,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/youtube/vitess/go/sqldb"
"github.com/youtube/vitess/go/mysql"
"github.com/youtube/vitess/go/vt/vterrors"
"github.com/youtube/vitess/go/vt/vttablet/endtoend/framework"
"github.com/youtube/vitess/go/vt/vttablet/tabletserver"
@ -780,7 +781,9 @@ func TestManualTwopcz(t *testing.T) {
t.Skip()
client := framework.NewClient()
defer client.Execute("delete from vitess_test where intval=4", nil)
conn, err := sqldb.Connect(connParams)
ctx := context.Background()
conn, err := mysql.Connect(ctx, &connParams)
if err != nil {
t.Error(err)
return

Просмотреть файл

@ -25,13 +25,9 @@ import (
"golang.org/x/net/context"
"github.com/youtube/vitess/go/sqldb"
"github.com/youtube/vitess/go/mysql"
"github.com/youtube/vitess/go/vt/vtgate/vtgateconn"
// FIXME(alainjobart) remove this when it's the only option.
// Registers our implementation.
_ "github.com/youtube/vitess/go/mysql"
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
vschemapb "github.com/youtube/vitess/go/vt/proto/vschema"
vttestpb "github.com/youtube/vitess/go/vt/proto/vttest"
@ -146,7 +142,8 @@ func TestMySQL(t *testing.T) {
if err != nil {
t.Error(err)
}
conn, err := sqldb.Connect(params)
ctx := context.Background()
conn, err := mysql.Connect(ctx, &params)
if err != nil {
t.Fatal(err)
}