зеркало из https://github.com/github/vitess-gh.git
Merge branch 'suguwork' into sequence
This commit is contained in:
Коммит
07dfac4313
|
@ -1,5 +1,5 @@
|
|||
approve_by_comment: true
|
||||
approve_regex: '^(LGTM|lgtm)'
|
||||
approve_regex: ':lgtm:|^(LGTM|lgtm)'
|
||||
reject_regex: '^Rejected'
|
||||
reset_on_push: false
|
||||
reviewers:
|
||||
|
|
|
@ -218,6 +218,14 @@ func (mysqld *Mysqld) PreflightSchemaChange(dbName string, change string) (*tmut
|
|||
sql += td.Schema + ";\n"
|
||||
}
|
||||
}
|
||||
for _, td := range beforeSchema.TableDefinitions {
|
||||
if td.Type == tmutils.TableView {
|
||||
// Views will have {{.DatabaseName}} in there, replace
|
||||
// it with _vt_preflight
|
||||
s := strings.Replace(td.Schema, "`{{.DatabaseName}}`", "`_vt_preflight`", -1)
|
||||
sql += s + ";\n"
|
||||
}
|
||||
}
|
||||
if err = mysqld.executeMysqlCommands(mysqld.dba.Uname, sql); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -249,7 +257,7 @@ func (mysqld *Mysqld) PreflightSchemaChange(dbName string, change string) (*tmut
|
|||
// ApplySchemaChange will apply the schema change to the given database.
|
||||
func (mysqld *Mysqld) ApplySchemaChange(dbName string, change *tmutils.SchemaChange) (*tmutils.SchemaChangeResult, error) {
|
||||
// check current schema matches
|
||||
beforeSchema, err := mysqld.GetSchema(dbName, nil, nil, false)
|
||||
beforeSchema, err := mysqld.GetSchema(dbName, nil, nil, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -296,7 +304,7 @@ func (mysqld *Mysqld) ApplySchemaChange(dbName string, change *tmutils.SchemaCha
|
|||
}
|
||||
|
||||
// get AfterSchema
|
||||
afterSchema, err := mysqld.GetSchema(dbName, nil, nil, false)
|
||||
afterSchema, err := mysqld.GetSchema(dbName, nil, nil, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -51,12 +51,17 @@ func (agent *ActionAgent) loadBlacklistRules(tablet *topodatapb.Tablet, blacklis
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Infof("Blacklisting tables %v", strings.Join(tables, ", "))
|
||||
qr := tabletserver.NewQueryRule("enforce blacklisted tables", "blacklisted_table", tabletserver.QRFailRetry)
|
||||
for _, t := range tables {
|
||||
qr.AddTableCond(t)
|
||||
|
||||
// Verify that at least one table matches the wildcards, so
|
||||
// that we don't add a rule to blacklist all tables
|
||||
if len(tables) > 0 {
|
||||
log.Infof("Blacklisting tables %v", strings.Join(tables, ", "))
|
||||
qr := tabletserver.NewQueryRule("enforce blacklisted tables", "blacklisted_table", tabletserver.QRFailRetry)
|
||||
for _, t := range tables {
|
||||
qr.AddTableCond(t)
|
||||
}
|
||||
blacklistRules.Add(qr)
|
||||
}
|
||||
blacklistRules.Add(qr)
|
||||
}
|
||||
|
||||
loadRuleErr := agent.QueryServiceControl.SetQueryRules(blacklistQueryRules, blacklistRules)
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
"github.com/youtube/vitess/go/vt/tabletserver/tabletconn"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/vterrors"
|
||||
"github.com/youtube/vitess/go/vt/vtgate/txbuffer"
|
||||
|
||||
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
|
||||
vtrpcpb "github.com/youtube/vitess/go/vt/proto/vtrpc"
|
||||
|
@ -141,9 +142,13 @@ func (dg *discoveryGateway) StreamExecute(ctx context.Context, keyspace, shard s
|
|||
// Begin starts a transaction for the specified keyspace, shard, and tablet type.
|
||||
// It returns the transaction ID.
|
||||
func (dg *discoveryGateway) Begin(ctx context.Context, keyspace string, shard string, tabletType topodatapb.TabletType) (transactionID int64, err error) {
|
||||
attemptNumber := 0
|
||||
err = dg.withRetry(ctx, keyspace, shard, tabletType, func(conn tabletconn.TabletConn) error {
|
||||
var innerErr error
|
||||
// Potentially buffer this transaction.
|
||||
txbuffer.FakeBuffer(keyspace, shard, attemptNumber)
|
||||
transactionID, innerErr = conn.Begin(ctx)
|
||||
attemptNumber++
|
||||
return innerErr
|
||||
}, 0, false)
|
||||
return transactionID, err
|
||||
|
|
|
@ -19,6 +19,7 @@ import (
|
|||
"github.com/youtube/vitess/go/vt/tabletserver/tabletconn"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/vterrors"
|
||||
"github.com/youtube/vitess/go/vt/vtgate/txbuffer"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
|
||||
|
@ -166,9 +167,13 @@ func (sdc *ShardConn) StreamExecute(ctx context.Context, query string, bindVars
|
|||
|
||||
// Begin begins a transaction. The retry rules are the same as Execute.
|
||||
func (sdc *ShardConn) Begin(ctx context.Context) (transactionID int64, err error) {
|
||||
attemptNumber := 0
|
||||
err = sdc.withRetry(ctx, func(conn tabletconn.TabletConn) error {
|
||||
var innerErr error
|
||||
// Potentially buffer this transaction.
|
||||
txbuffer.FakeBuffer(sdc.keyspace, sdc.shard, attemptNumber)
|
||||
transactionID, innerErr = conn.Begin(ctx)
|
||||
attemptNumber++
|
||||
return innerErr
|
||||
}, 0, false)
|
||||
return transactionID, err
|
||||
|
|
|
@ -0,0 +1,70 @@
|
|||
// Copyright 2015, Google Inc. All rights reserved.
|
||||
// Use of this source code is governed by a BSD_style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
/*
|
||||
Package txbuffer contains experimental logic to buffer transactions in VTGate.
|
||||
Only the Begin statement of transactions will be buffered.
|
||||
|
||||
The reason why it might be useful to buffer transactions is during failovers:
|
||||
the master vttablet can become unavailable for a few seconds. Upstream clients
|
||||
(e.g., web workers) might not retry on failures, and instead may prefer for VTGate to wait for
|
||||
a few seconds for the failover to complete. Thiis will block upstream callers for that time,
|
||||
but will not return transient errors during the buffering time.
|
||||
*/
|
||||
package txbuffer
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/youtube/vitess/go/stats"
|
||||
)
|
||||
|
||||
var (
|
||||
enableFakeTxBuffer = flag.Bool("enable_fake_tx_buffer", false, "Enable fake transaction buffering.")
|
||||
bufferKeyspace = flag.String("buffer_keyspace", "", "The name of the keyspace to buffer transactions on.")
|
||||
bufferShard = flag.String("buffer_shard", "", "The name of the shard to buffer transactions on.")
|
||||
maxBufferSize = flag.Int("max_buffer_size", 10, "The maximum number of transactions to buffer at a time.")
|
||||
fakeBufferDelay = flag.Duration("fake_buffer_delay", 1*time.Second, "The amount of time that we should delay all transactions for, to fake a transaction buffer.")
|
||||
|
||||
bufferedTransactionsAttempted = stats.NewInt("BufferedTransactionsAttempted")
|
||||
bufferedTransactionsSuccessful = stats.NewInt("BufferedTransactionsSuccessful")
|
||||
// Use this lock when adding to the number of currently buffered transactions.
|
||||
bufferMu sync.Mutex
|
||||
bufferedTransactions = stats.NewInt("BufferedTransactions")
|
||||
)
|
||||
|
||||
// timeSleep can be mocked out in unit tests
|
||||
var timeSleep = time.Sleep
|
||||
|
||||
// FakeBuffer will pretend to buffer new transactions in VTGate.
|
||||
// Transactions *will NOT actually be buffered*, they will just have a delayed start time.
|
||||
// This can be useful to understand what the impact of trasaction buffering will be
|
||||
// on upstream callers. Once the impact is measured, it can be used to tweak parameter values
|
||||
// for the best behavior.
|
||||
// FakeBuffer should be called before the VtTablet Begin, otherwise it will increase transaction times.
|
||||
func FakeBuffer(keyspace, shard string, attemptNumber int) {
|
||||
// Only buffer on the first Begin attempt, not on possible retries.
|
||||
if !*enableFakeTxBuffer || attemptNumber != 0 {
|
||||
return
|
||||
}
|
||||
if keyspace != *bufferKeyspace || shard != *bufferShard {
|
||||
return
|
||||
}
|
||||
bufferedTransactionsAttempted.Add(1)
|
||||
|
||||
bufferMu.Lock()
|
||||
if int(bufferedTransactions.Get()) >= *maxBufferSize {
|
||||
bufferMu.Unlock()
|
||||
return
|
||||
}
|
||||
bufferedTransactions.Add(1)
|
||||
bufferMu.Unlock()
|
||||
|
||||
defer bufferedTransactionsSuccessful.Add(1)
|
||||
timeSleep(*fakeBufferDelay)
|
||||
// Don't need to lock for this, as there's no race when decrementing the count
|
||||
bufferedTransactions.Add(-1)
|
||||
}
|
|
@ -0,0 +1,209 @@
|
|||
// Copyright 2015, Google Inc. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package txbuffer
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// fakeSleepController is used to control a fake sleepFunc
|
||||
type fakeSleepController struct {
|
||||
called bool
|
||||
block bool
|
||||
// block until the done channel if closed, if configured to do so.
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
type sleepFunc func(d time.Duration)
|
||||
|
||||
// createFakeSleep creates a function that can be called to fake sleeping.
|
||||
// The created fake is managed by the passed in fakeSleepController.
|
||||
func createFakeSleep(c *fakeSleepController) sleepFunc {
|
||||
return func(d time.Duration) {
|
||||
c.called = true
|
||||
if !c.block {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case <-c.done:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestFakeBuffer(t *testing.T) {
|
||||
unbufferedKeyspace := "ukeyspace"
|
||||
unbufferedShard := "80-"
|
||||
bufferedKeyspace := "bkeyspace"
|
||||
bufferedShard := "-80"
|
||||
|
||||
*bufferKeyspace = bufferedKeyspace
|
||||
*bufferShard = bufferedShard
|
||||
|
||||
for _, test := range []struct {
|
||||
desc string
|
||||
enableFakeBuffer bool
|
||||
keyspace string
|
||||
shard string
|
||||
attemptNumber int
|
||||
bufferedTransactions int
|
||||
// was this transaction buffered?
|
||||
wantCalled bool
|
||||
// expected value of BufferedTransactionAttempts
|
||||
wantAttempted int
|
||||
}{
|
||||
{
|
||||
desc: "enableFakeBuffer=False",
|
||||
enableFakeBuffer: false,
|
||||
},
|
||||
{
|
||||
desc: "attemptNumber != 0",
|
||||
enableFakeBuffer: true,
|
||||
attemptNumber: 1,
|
||||
},
|
||||
{
|
||||
desc: "unbuffered keyspace",
|
||||
enableFakeBuffer: true,
|
||||
keyspace: unbufferedKeyspace,
|
||||
shard: bufferedShard,
|
||||
},
|
||||
{
|
||||
desc: "unbuffered shard",
|
||||
enableFakeBuffer: true,
|
||||
keyspace: bufferedKeyspace,
|
||||
shard: unbufferedShard,
|
||||
},
|
||||
{
|
||||
desc: "buffer full",
|
||||
enableFakeBuffer: true,
|
||||
keyspace: bufferedKeyspace,
|
||||
shard: bufferedShard,
|
||||
bufferedTransactions: *maxBufferSize,
|
||||
// When the buffer is full, bufferedTransactionsAttempted should still be incremented
|
||||
wantAttempted: 1,
|
||||
},
|
||||
{
|
||||
desc: "buffered successful",
|
||||
enableFakeBuffer: true,
|
||||
keyspace: bufferedKeyspace,
|
||||
shard: bufferedShard,
|
||||
wantCalled: true,
|
||||
wantAttempted: 1,
|
||||
},
|
||||
} {
|
||||
controller := &fakeSleepController{}
|
||||
timeSleep = createFakeSleep(controller)
|
||||
// reset counters
|
||||
bufferedTransactionsAttempted.Set(0)
|
||||
bufferedTransactionsSuccessful.Set(0)
|
||||
bufferedTransactions.Set(int64(test.bufferedTransactions))
|
||||
|
||||
*enableFakeTxBuffer = test.enableFakeBuffer
|
||||
|
||||
FakeBuffer(test.keyspace, test.shard, test.attemptNumber)
|
||||
|
||||
if controller.called != test.wantCalled {
|
||||
t.Errorf("With %v, FakeBuffer() => timeSleep.called: %v; want: %v",
|
||||
test.desc, controller.called, test.wantCalled)
|
||||
}
|
||||
|
||||
if bufferedTransactionsAttempted.Get() != int64(test.wantAttempted) {
|
||||
t.Errorf("With %v, FakeBuffer() => BufferedTransactionsAttempted got: %v; want: %v",
|
||||
test.desc, bufferedTransactionsAttempted.Get(), test.wantAttempted)
|
||||
}
|
||||
|
||||
if (!test.wantCalled && (bufferedTransactionsSuccessful.Get() == 1)) ||
|
||||
(test.wantCalled && (bufferedTransactionsSuccessful.Get() != 1)) {
|
||||
t.Errorf("With %v, FakeBuffer() => BufferedTransactionsSuccessful got: %v; want: 1",
|
||||
test.desc, bufferedTransactionsSuccessful.Get())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// min for ints
|
||||
func min(x, y int) int {
|
||||
if x < y {
|
||||
return x
|
||||
}
|
||||
return y
|
||||
}
|
||||
|
||||
func TestParallelFakeBuffer(t *testing.T) {
|
||||
bufferedKeyspace := "bkeyspace"
|
||||
bufferedShard := "-80"
|
||||
|
||||
*bufferKeyspace = bufferedKeyspace
|
||||
*bufferShard = bufferedShard
|
||||
*enableFakeTxBuffer = true
|
||||
|
||||
// reset counters
|
||||
bufferedTransactionsAttempted.Set(0)
|
||||
bufferedTransactionsSuccessful.Set(0)
|
||||
|
||||
var controllers []*fakeSleepController
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for i := 1; i <= *maxBufferSize+2; i++ {
|
||||
controller := &fakeSleepController{
|
||||
block: true,
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
timeSleep = createFakeSleep(controller)
|
||||
// Only the first maxBufferSize calls to FakeBuffer should actually call fakeSleep
|
||||
wantFakeSleepCalled := (i <= *maxBufferSize)
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
FakeBuffer(*bufferKeyspace, *bufferShard, 0)
|
||||
}()
|
||||
// Give the goroutine some time to run.
|
||||
// Ideally, we'd use a channel here to indicate when the fake sleep starts blocking.
|
||||
// However, we can't do that because for some of the goroutines, the fake sleep is never
|
||||
// even called.
|
||||
time.Sleep(10 * time.Microsecond)
|
||||
|
||||
if controller.called {
|
||||
controllers = append(controllers, controller)
|
||||
}
|
||||
|
||||
if controller.called != wantFakeSleepCalled {
|
||||
t.Errorf("On iteration %v, FakeBuffer() => timeSleep.called: %v; want: %v",
|
||||
i, controller.called, wantFakeSleepCalled)
|
||||
}
|
||||
|
||||
if int(bufferedTransactionsAttempted.Get()) != i {
|
||||
t.Errorf("On iteration %v, FakeBuffer() => BufferedTransactionsAttempted got: %v; want: %v",
|
||||
i, bufferedTransactionsAttempted.Get(), i)
|
||||
}
|
||||
|
||||
if int(bufferedTransactions.Get()) != min(i, *maxBufferSize) {
|
||||
t.Errorf("On iteration %v, FakeBuffer() => BufferedTransactions got: %v; want: %v",
|
||||
i, bufferedTransactions.Get(), min(i, *maxBufferSize))
|
||||
}
|
||||
|
||||
if int(bufferedTransactionsSuccessful.Get()) != 0 {
|
||||
t.Errorf("On iteration %v, FakeBuffer() => BufferedTransactionsSuccessful got: %v; want: 0",
|
||||
i, bufferedTransactionsSuccessful.Get())
|
||||
}
|
||||
}
|
||||
|
||||
// signal to all the buffered calls that they can stop buffering, and wait for them.
|
||||
for _, c := range controllers {
|
||||
close(c.done)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
if int(bufferedTransactionsSuccessful.Get()) != *maxBufferSize {
|
||||
t.Errorf("After all FakeBuffer() calls are done, BufferedTransactionsSuccessful got: %v; want: %v",
|
||||
bufferedTransactionsSuccessful.Get(), *maxBufferSize)
|
||||
}
|
||||
if int(bufferedTransactions.Get()) != 0 {
|
||||
t.Errorf("After all FakeBuffer() calls are done, BufferedTransactions got: %v; want: %v",
|
||||
bufferedTransactions.Get(), 0)
|
||||
}
|
||||
}
|
|
@ -536,6 +536,24 @@ index by_msg (msg)
|
|||
# check the stats are correct
|
||||
self._check_stats()
|
||||
|
||||
# now remove the tables on the source shard. The blacklisted tables
|
||||
# in the source shard won't match any table, make sure that works.
|
||||
utils.run_vtctl(['ApplySchema',
|
||||
'-sql=drop view view1',
|
||||
'source_keyspace'],
|
||||
auto_log=True)
|
||||
for t in ['moving1', 'moving2']:
|
||||
utils.run_vtctl(['ApplySchema',
|
||||
'-sql=drop table %s' % (t),
|
||||
'source_keyspace'],
|
||||
auto_log=True)
|
||||
for t in [source_master, source_replica, source_rdonly1, source_rdonly2]:
|
||||
utils.run_vtctl(['ReloadSchema', t.tablet_alias])
|
||||
qr = source_master.execute('select count(1) from staying1')
|
||||
self.assertEqual(len(qr['rows']), 1,
|
||||
'cannot read staying1: got %s' % str(qr))
|
||||
|
||||
# test SetShardTabletControl
|
||||
self._verify_vtctl_set_shard_tablet_control()
|
||||
|
||||
def _verify_vtctl_set_shard_tablet_control(self):
|
||||
|
|
Загрузка…
Ссылка в новой задаче