зеркало из https://github.com/github/vitess-gh.git
go driver: deprecate Atomicity
Deleting the context based atomicity before anyone starts using it. This will be replaced by future SET TTRANSACTION MODE statements.
This commit is contained in:
Родитель
7d37d6e526
Коммит
53d4122329
|
@ -5,8 +5,6 @@
|
|||
package services
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/youtube/vitess/go/vt/vtgate/vtgateservice"
|
||||
|
@ -29,9 +27,6 @@ func newSuccessClient(fallback vtgateservice.VTGateService) *successClient {
|
|||
}
|
||||
|
||||
func (c *successClient) Begin(ctx context.Context, singledb bool) (*vtgatepb.Session, error) {
|
||||
if singledb {
|
||||
return nil, errors.New("single db")
|
||||
}
|
||||
return &vtgatepb.Session{
|
||||
InTransaction: true,
|
||||
}, nil
|
||||
|
@ -41,9 +36,6 @@ func (c *successClient) Commit(ctx context.Context, twopc bool, session *vtgatep
|
|||
if session != nil && session.InTransaction {
|
||||
return nil
|
||||
}
|
||||
if twopc {
|
||||
return errors.New("twopc")
|
||||
}
|
||||
return c.fallback.Commit(ctx, twopc, session)
|
||||
}
|
||||
|
||||
|
|
|
@ -14,8 +14,6 @@ import (
|
|||
"database/sql/driver"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/youtube/vitess/go/vt/vtgate/vtgateconn"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -31,28 +29,6 @@ var (
|
|||
_ driver.StmtExecContext = &stmt{}
|
||||
)
|
||||
|
||||
// These are synonyms of the constants defined in vtgateconn.
|
||||
const (
|
||||
// AtomicityMulti is the default level. It allows distributed transactions
|
||||
// with best effort commits. Partial commits are possible.
|
||||
AtomicityMulti = vtgateconn.AtomicityMulti
|
||||
// AtomicitySingle prevents a transaction from crossing the boundary of
|
||||
// a single database.
|
||||
AtomicitySingle = vtgateconn.AtomicitySingle
|
||||
// Atomicity2PC allows distributed transactions, and performs 2PC commits.
|
||||
Atomicity2PC = vtgateconn.Atomicity2PC
|
||||
)
|
||||
|
||||
// WithAtomicity returns a context with the atomicity level set.
|
||||
func WithAtomicity(ctx context.Context, level vtgateconn.Atomicity) context.Context {
|
||||
return vtgateconn.WithAtomicity(ctx, level)
|
||||
}
|
||||
|
||||
// AtomicityFromContext returns the atomicity of the context.
|
||||
func AtomicityFromContext(ctx context.Context) vtgateconn.Atomicity {
|
||||
return vtgateconn.AtomicityFromContext(ctx)
|
||||
}
|
||||
|
||||
func (c *conn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
|
||||
if opts.Isolation != driver.IsolationLevel(0) || opts.ReadOnly {
|
||||
return nil, errIsolationUnsupported
|
||||
|
|
|
@ -23,31 +23,6 @@ var (
|
|||
VtgateProtocol = flag.String("vtgate_protocol", "grpc", "how to talk to vtgate")
|
||||
)
|
||||
|
||||
// Atomicity specifies atomicity level of a transaction.
|
||||
type Atomicity int
|
||||
|
||||
const (
|
||||
// AtomicityMulti is the default level. It allows distributed transactions
|
||||
// with best effort commits. Partial commits are possible.
|
||||
AtomicityMulti = Atomicity(iota)
|
||||
// AtomicitySingle prevents a transaction from crossing the boundary of
|
||||
// a single database.
|
||||
AtomicitySingle
|
||||
// Atomicity2PC allows distributed transactions, and performs 2PC commits.
|
||||
Atomicity2PC
|
||||
)
|
||||
|
||||
// WithAtomicity returns a context with the atomicity level set.
|
||||
func WithAtomicity(ctx context.Context, level Atomicity) context.Context {
|
||||
return context.WithValue(ctx, Atomicity(0), level)
|
||||
}
|
||||
|
||||
// AtomicityFromContext returns the atomicity of the context.
|
||||
func AtomicityFromContext(ctx context.Context) Atomicity {
|
||||
v, _ := ctx.Value(Atomicity(0)).(Atomicity)
|
||||
return v
|
||||
}
|
||||
|
||||
// VTGateConn is the client API object to talk to vtgate.
|
||||
// It is constructed using the Dial method. It supports
|
||||
// legacy V2 APIs. It can be used concurrently. To access
|
||||
|
@ -149,16 +124,14 @@ func (conn *VTGateConn) MessageAck(ctx context.Context, keyspace string, name st
|
|||
|
||||
// Begin starts a transaction and returns a VTGateTX.
|
||||
func (conn *VTGateConn) Begin(ctx context.Context) (*VTGateTx, error) {
|
||||
atomicity := AtomicityFromContext(ctx)
|
||||
session, err := conn.impl.Begin(ctx, atomicity == AtomicitySingle)
|
||||
session, err := conn.impl.Begin(ctx, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &VTGateTx{
|
||||
conn: conn,
|
||||
session: session,
|
||||
atomicity: atomicity,
|
||||
conn: conn,
|
||||
session: session,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -227,9 +200,8 @@ func (vsn *VTGateSession) StreamExecute(ctx context.Context, query string, bindV
|
|||
// VTGateTx defines an ongoing transaction.
|
||||
// It should not be concurrently used across goroutines.
|
||||
type VTGateTx struct {
|
||||
conn *VTGateConn
|
||||
session *vtgatepb.Session
|
||||
atomicity Atomicity
|
||||
conn *VTGateConn
|
||||
session *vtgatepb.Session
|
||||
}
|
||||
|
||||
// ExecuteShards executes a query for multiple shards on vtgate within the current transaction.
|
||||
|
@ -297,7 +269,7 @@ func (tx *VTGateTx) Commit(ctx context.Context) error {
|
|||
if tx.session == nil {
|
||||
return fmt.Errorf("commit: not in transaction")
|
||||
}
|
||||
err := tx.conn.impl.Commit(ctx, tx.session, tx.atomicity == Atomicity2PC)
|
||||
err := tx.conn.impl.Commit(ctx, tx.session, false)
|
||||
tx.session = nil
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -33,14 +33,3 @@ func TestGetDialerWithProtocol(t *testing.T) {
|
|||
t.Fatalf("dialerFunc has been registered, should not get nil: %v %v", err, c)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAtomicity(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
if v := AtomicityFromContext(ctx); v != AtomicityMulti {
|
||||
t.Errorf("Atomicity: %v, want %d", v, AtomicityMulti)
|
||||
}
|
||||
ctx = WithAtomicity(ctx, Atomicity2PC)
|
||||
if v := AtomicityFromContext(ctx); v != Atomicity2PC {
|
||||
t.Errorf("Atomicity: %v, want %d", v, Atomicity2PC)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -707,10 +707,6 @@ func (f *fakeVTGateService) Begin(ctx context.Context, singledb bool) (*vtgatepb
|
|||
panic(fmt.Errorf("test forced panic"))
|
||||
default:
|
||||
}
|
||||
if singledb {
|
||||
// Communicate this as an error.
|
||||
return nil, errors.New("single db")
|
||||
}
|
||||
return session1, nil
|
||||
}
|
||||
|
||||
|
@ -723,10 +719,6 @@ func (f *fakeVTGateService) Commit(ctx context.Context, twopc bool, inSession *v
|
|||
if f.panics {
|
||||
panic(fmt.Errorf("test forced panic"))
|
||||
}
|
||||
if twopc {
|
||||
// Communicate this as an error.
|
||||
return errors.New("twopc")
|
||||
}
|
||||
if !proto.Equal(inSession, session2) {
|
||||
return errors.New("commit: session mismatch")
|
||||
}
|
||||
|
@ -975,7 +967,6 @@ func TestSuite(t *testing.T, impl vtgateconn.Impl, fakeServer vtgateservice.VTGa
|
|||
fs := fakeServer.(*fakeVTGateService)
|
||||
|
||||
testBegin(t, conn)
|
||||
testCommit(t, conn)
|
||||
testExecute(t, vsn)
|
||||
testExecuteBatch(t, vsn)
|
||||
testExecuteShards(t, conn)
|
||||
|
@ -1093,25 +1084,10 @@ func verifyErrorString(t *testing.T, err error, method string) {
|
|||
}
|
||||
|
||||
func testBegin(t *testing.T, conn *vtgateconn.VTGateConn) {
|
||||
ctx := vtgateconn.WithAtomicity(newContext(), vtgateconn.AtomicitySingle)
|
||||
_, err := conn.Begin(ctx)
|
||||
want := "single db"
|
||||
if err == nil || !strings.Contains(err.Error(), want) {
|
||||
t.Errorf("Begin(singldb): %v, want %v", err, want)
|
||||
}
|
||||
}
|
||||
|
||||
func testCommit(t *testing.T, conn *vtgateconn.VTGateConn) {
|
||||
ctx := vtgateconn.WithAtomicity(newContext(), vtgateconn.Atomicity2PC)
|
||||
tx, err := conn.Begin(ctx)
|
||||
_, err := conn.Begin(newContext())
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
err = tx.Commit(ctx)
|
||||
want := "twopc"
|
||||
if err == nil || !strings.Contains(err.Error(), want) {
|
||||
t.Errorf("Commit(twopc): %v, want %v", err, want)
|
||||
}
|
||||
}
|
||||
|
||||
func testExecute(t *testing.T, vsn *vtgateconn.VTGateSession) {
|
||||
|
|
|
@ -247,20 +247,6 @@ class TestErrors(TestPythonClientBase):
|
|||
self.conn.get_srv_keyspace(error_request)
|
||||
|
||||
|
||||
class TestTransactionFlags(TestPythonClientBase):
|
||||
"""Test transaction flags."""
|
||||
|
||||
def test_begin(self):
|
||||
"""Test begin transaction flags."""
|
||||
self.conn.begin()
|
||||
with self.assertRaisesRegexp(dbexceptions.DatabaseError, 'single db'):
|
||||
self.conn.begin(single_db=True)
|
||||
|
||||
self.conn.commit()
|
||||
with self.assertRaisesRegexp(dbexceptions.DatabaseError, 'twopc'):
|
||||
self.conn.commit(twopc=True)
|
||||
|
||||
|
||||
class TestSuccess(TestPythonClientBase):
|
||||
"""Success test cases for the Python client."""
|
||||
|
||||
|
|
|
@ -79,11 +79,6 @@ class TestErrors(TestPythonClientBase,
|
|||
"""Test cases to verify that the Python client can handle errors correctly."""
|
||||
|
||||
|
||||
class TestTransactionFlags(TestPythonClientBase,
|
||||
vtgate_client_testsuite.TestTransactionFlags):
|
||||
"""Success test cases for the Python client."""
|
||||
|
||||
|
||||
class TestSuccess(TestPythonClientBase,
|
||||
vtgate_client_testsuite.TestSuccess):
|
||||
"""Success test cases for the Python client."""
|
||||
|
|
Загрузка…
Ссылка в новой задаче