Merge pull request #2517 from alainjobart/mysqlconn2

Make mysqlconn the default implementation.
This commit is contained in:
Alain Jobart 2017-05-16 10:52:56 -07:00 коммит произвёл GitHub
Родитель 9270473f7c a6e5e27431
Коммит 32e07c2e4d
11 изменённых файлов: 51 добавлений и 35 удалений

Просмотреть файл

@ -45,10 +45,6 @@ func init() {
// This needs to be called before threads begin to spawn.
C.vt_library_init()
sqldb.Register("libmysqlclient", Connect)
// Comment this out and uncomment call to sqldb.RegisterDefault in
// go/mysqlconn/sqldb_conn.go to make it the default.
sqldb.RegisterDefault(Connect)
}
func handleError(err *error) {
@ -65,6 +61,11 @@ type Connection struct {
// Connect uses the connection parameters to connect and returns the connection
func Connect(params sqldb.ConnParams) (sqldb.Conn, error) {
// FIXME(alainjobart) adding a panic to make sure this library is
// unused. Before the defer on purpose, so it actually panics.
if true {
panic("Added to make sure this library is unused.")
}
var err error
defer handleError(&err)

Просмотреть файл

@ -35,7 +35,7 @@ import (
)
// assertSQLError makes sure we get the right error.
func assertSQLError(t *testing.T, err error, code int, sqlState string, subtext string) {
func assertSQLError(t *testing.T, err error, code int, sqlState string, subtext string, query string) {
if err == nil {
t.Fatalf("was expecting SQLError %v / %v / %v but got no error.", code, sqlState, subtext)
}
@ -51,7 +51,9 @@ func assertSQLError(t *testing.T, err error, code int, sqlState string, subtext
}
if subtext != "" && !strings.Contains(serr.Message, subtext) {
t.Fatalf("was expecting SQLError %v / %v / %v but got message %v", code, sqlState, subtext, serr.Message)
}
if serr.Query != query {
t.Fatalf("was expecting SQLError %v / %v / %v with Query '%v' but got query '%v'", code, sqlState, subtext, query, serr.Query)
}
}
@ -111,14 +113,14 @@ func TestConnectTimeout(t *testing.T) {
}()
ctx = context.Background()
_, err = Connect(ctx, params)
assertSQLError(t, err, CRServerLost, SSUnknownSQLState, "initial packet read failed")
assertSQLError(t, err, CRServerLost, SSUnknownSQLState, "initial packet read failed", "")
// Now close the listener. Connect should fail right away,
// check the error.
listener.Close()
wg.Wait()
_, err = Connect(ctx, params)
assertSQLError(t, err, CRConnHostError, SSUnknownSQLState, "connection refused")
assertSQLError(t, err, CRConnHostError, SSUnknownSQLState, "connection refused", "")
// Tests a connection where Dial to a unix socket fails
// properly returns the right error. To simulate exactly the
@ -133,7 +135,7 @@ func TestConnectTimeout(t *testing.T) {
ctx = context.Background()
_, err = Connect(ctx, params)
os.Remove(name)
assertSQLError(t, err, CRConnectionError, SSUnknownSQLState, "connection refused")
assertSQLError(t, err, CRConnectionError, SSUnknownSQLState, "connection refused", "")
}
// testKillWithRealDatabase opens a connection, issues a command that
@ -165,7 +167,7 @@ func testKillWithRealDatabase(t *testing.T, params *sqldb.ConnParams) {
}
err = <-errChan
assertSQLError(t, err, CRServerLost, SSUnknownSQLState, "EOF")
assertSQLError(t, err, CRServerLost, SSUnknownSQLState, "EOF", "select sleep(10) from dual")
}
// testKill2006WithRealDatabase opens a connection, kills the
@ -193,7 +195,7 @@ func testKill2006WithRealDatabase(t *testing.T, params *sqldb.ConnParams) {
// unix socket, we will get a broken pipe when the server
// closes the connection and we are trying to write the command.
_, err = conn.ExecuteFetch("select sleep(10) from dual", 1000, false)
assertSQLError(t, err, CRServerGone, SSUnknownSQLState, "broken pipe")
assertSQLError(t, err, CRServerGone, SSUnknownSQLState, "broken pipe", "select sleep(10) from dual")
}
// testDupEntryWithRealDatabase tests a duplicate key is properly raised.
@ -212,7 +214,7 @@ func testDupEntryWithRealDatabase(t *testing.T, params *sqldb.ConnParams) {
t.Fatalf("first insert failed: %v", err)
}
_, err = conn.ExecuteFetch("insert into dup_entry(id, name) values(2, 10)", 0, false)
assertSQLError(t, err, ERDupEntry, SSDupKey, "Duplicate entry")
assertSQLError(t, err, ERDupEntry, SSDupKey, "Duplicate entry", "insert into dup_entry(id, name) values(2, 10)")
}
// testClientFoundRows tests if the CLIENT_FOUND_ROWS flag works.

Просмотреть файл

@ -78,6 +78,9 @@ type Conn struct {
// - at accept time for the server.
ConnectionID uint32
// Closed is set to true when Close() is called on the connection.
Closed bool
// Capabilities is the current set of features this connection
// is using. It is the features that are both supported by
// the client and the server, and currently in use.
@ -535,6 +538,7 @@ func (c *Conn) RemoteAddr() net.Addr {
// Close closes the connection. It can be called from a different go
// routine to interrupt the current connection.
func (c *Conn) Close() {
c.Closed = true
c.conn.Close()
}

Просмотреть файл

@ -273,7 +273,15 @@ func (c *Conn) parseRow(data []byte, fields []*querypb.Field) ([]sqltypes.Value,
//
// 2. if the server closes the connection when a command is in flight,
// readComQueryResponse will fail, and we'll return CRServerLost(2013).
func (c *Conn) ExecuteFetch(query string, maxrows int, wantfields bool) (*sqltypes.Result, error) {
func (c *Conn) ExecuteFetch(query string, maxrows int, wantfields bool) (result *sqltypes.Result, err error) {
defer func() {
if err != nil {
if sqlerr, ok := err.(*sqldb.SQLError); ok {
sqlerr.Query = query
}
}
}()
// This is a new command, need to reset the sequence.
c.sequence = 0
@ -296,7 +304,7 @@ func (c *Conn) ExecuteFetch(query string, maxrows int, wantfields bool) (*sqltyp
}
fields := make([]querypb.Field, colNumber)
result := &sqltypes.Result{
result = &sqltypes.Result{
Fields: make([]*querypb.Field, colNumber),
}
@ -369,7 +377,6 @@ func (c *Conn) ExecuteFetch(query string, maxrows int, wantfields bool) (*sqltyp
return nil, &sqldb.SQLError{
Num: 0,
Message: fmt.Sprintf("Row count exceeded %d", maxrows),
Query: query,
}
}

Просмотреть файл

@ -189,6 +189,9 @@ func TestClientFoundRows(t *testing.T) {
t.Errorf("FoundRows flag: %x, second bit must be 0", th.lastConn.Capabilities)
}
c.Close()
if !c.IsClosed() {
t.Errorf("IsClosed returned true on Close-d connection.")
}
// Test with flag.
params.Flags |= CapabilityClientFoundRows

Просмотреть файл

@ -35,7 +35,15 @@ import (
// ExecuteStreamFetch is part of the sqldb.Conn interface.
// Returns a sqldb.SQLError.
func (c *Conn) ExecuteStreamFetch(query string) error {
func (c *Conn) ExecuteStreamFetch(query string) (err error) {
defer func() {
if err != nil {
if sqlerr, ok := err.(*sqldb.SQLError); ok {
sqlerr.Query = query
}
}
}()
// Sanity check.
if c.fields != nil {
return sqldb.NewSQLError(CRCommandsOutOfSync, SSUnknownSQLState, "streaming query already in progress")
@ -160,13 +168,7 @@ func (c *Conn) CloseResult() {
// IsClosed is part of the sqldb.Conn interface.
func (c *Conn) IsClosed() bool {
return c.ConnectionID == 0
}
// Shutdown is part of the sqldb.Conn interface.
func (c *Conn) Shutdown() {
c.ConnectionID = 0
c.conn.Close()
return c.Closed
}
// ID is part of the sqldb.Conn interface.
@ -179,12 +181,8 @@ func init() {
ctx := context.Background()
return Connect(ctx, &params)
})
// Uncomment this and comment out the call to sqldb.RegisterDefault in
// go/mysql/mysql.go to make this the default.
// sqldb.RegisterDefault(func(params sqldb.ConnParams) (sqldb.Conn, error) {
// ctx := context.Background()
// return Connect(ctx, &params)
// })
sqldb.RegisterDefault(func(params sqldb.ConnParams) (sqldb.Conn, error) {
ctx := context.Background()
return Connect(ctx, &params)
})
}

Просмотреть файл

@ -58,7 +58,7 @@ const redactedPassword = "****"
// The flags will change the global singleton
func registerConnFlags(connParams *sqldb.ConnParams, name string) {
flag.StringVar(&connParams.Engine, "db-config-"+name+"-engine", "libmysqlclient", "db "+name+" engine to use (empty for default, libmysqlclient or mysqlconn)")
flag.StringVar(&connParams.Engine, "db-config-"+name+"-engine", "mysqlconn", "db "+name+" engine to use (empty for default, libmysqlclient or mysqlconn)")
flag.StringVar(&connParams.Host, "db-config-"+name+"-host", "", "db "+name+" connection host")
flag.IntVar(&connParams.Port, "db-config-"+name+"-port", 0, "db "+name+" connection port")
flag.StringVar(&connParams.Uname, "db-config-"+name+"-uname", "", "db "+name+" connection uname")

Просмотреть файл

@ -64,6 +64,7 @@ func TestMessage(t *testing.T) {
}); err != nil {
t.Fatal(err)
}
close(ch)
}()
// Once the test is done, consume any left-over pending
// messages. Some could make it into the pipeline and get

Просмотреть файл

@ -92,7 +92,7 @@ func TestDBConnKill(t *testing.T) {
// Kill failed because we are not able to connect to the database
db.EnableConnFail()
err = dbConn.Kill("test kill")
want := "Lost connection"
want := "errno 2013"
if err == nil || !strings.Contains(err.Error(), want) {
t.Errorf("Exec: %v, want %s", err, want)
}

Просмотреть файл

@ -240,7 +240,7 @@ func TestTxPoolBeginWithPoolConnectionError_Errno2006_Permanent(t *testing.T) {
// DBConn.Exec() will return the reconnect error as final error and not the
// initial connection error.
_, err = txPool.LocalBegin(context.Background(), false)
if err == nil || !strings.Contains(err.Error(), "Lost connection to MySQL server") || !strings.Contains(err.Error(), "(errno 2013)") {
if err == nil || !strings.Contains(err.Error(), "(errno 2013)") {
t.Fatalf("Begin did not return the reconnect error: %v", err)
}
sqlErr, ok := err.(*sqldb.SQLError)

Просмотреть файл

@ -30,7 +30,7 @@ import (
// FIXME(alainjobart) remove this when it's the only option.
// Registers our implementation.
_ "github.com/youtube/vitess/go/mysql"
_ "github.com/youtube/vitess/go/mysqlconn"
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
vschemapb "github.com/youtube/vitess/go/vt/proto/vschema"