Merge branch 'master' into suguwork

This commit is contained in:
Sugu Sougoumarane 2016-02-27 12:58:35 -08:00
Родитель b14fca5d53 4e13248a69
Коммит 654aeb2c47
49 изменённых файлов: 907 добавлений и 329 удалений

Просмотреть файл

@ -118,6 +118,7 @@ repos="github.com/golang/glog \
google.golang.org/cloud \
google.golang.org/cloud/storage \
golang.org/x/crypto/ssh/terminal \
github.com/olekukonko/tablewriter \
"
# Packages for uploading code coverage to coveralls.io (used by Travis CI).

Просмотреть файл

@ -63,3 +63,20 @@ transaction-isolation = REPEATABLE-READ
# READ-COMMITTED would be better, but mysql 5.1 disables this with statement based replication
# READ-UNCOMMITTED might be better
lower_case_table_names = 1
# Semi-sync replication is required for automated unplanned failover
# (when the master goes away). Here we just load the plugin so it's
# available if desired, but it's disabled at startup.
#
# If the -enable_semi_sync flag is used, VTTablet will enable semi-sync
# at the proper time when replication is set up, or when masters are
# promoted or demoted.
plugin-load = rpl_semi_sync_master=semisync_master.so;rpl_semi_sync_slave=semisync_slave.so
# When semi-sync is enabled, don't allow fallback to async
# if you get no ack, or have no slaves. This is necessary to
# prevent alternate futures when doing a failover in response to
# a master that becomes unresponsive.
rpl_semi_sync_master_timeout = 1000000000000000000
rpl_semi_sync_master_wait_no_slave = 1

Просмотреть файл

@ -76,6 +76,7 @@ spec:
-db-config-filtered-dbname vt_{{keyspace}}
-db-config-filtered-charset utf8
-enable-rowcache
-enable_semi_sync
-rowcache-bin /usr/bin/memcached
-rowcache-socket $VTDATAROOT/{{tablet_subdir}}/memcache.sock
-restore_from_backup {{backup_flags}}" vitess

Просмотреть файл

@ -94,6 +94,7 @@ for uid_index in $uids; do
-target_tablet_type $tablet_type \
-health_check_interval 5s \
-enable-rowcache \
-enable_semi_sync \
-rowcache-bin $memcached_path \
-rowcache-socket $VTDATAROOT/$tablet_dir/memcache.sock \
-backup_storage_implementation file \

Просмотреть файл

@ -13,11 +13,11 @@ import (
"time"
log "github.com/golang/glog"
"github.com/olekukonko/tablewriter"
"github.com/youtube/vitess/go/exit"
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/vtgate/vtgateconn"
"github.com/youtube/vitess/go/vt/vitessdriver"
"github.com/youtube/vitess/go/vt/vtgate/vtgateconn"
)
var (
@ -35,6 +35,7 @@ in the form of :v1, :v2, etc.
bindVariables = newBindvars("bind_variables", "bind variables as a json list")
keyspace = flag.String("keyspace", "", "Keyspace of a specific keyspace/shard to target. Disables vtgate v3.")
shard = flag.String("shard", "", "Shard of a specific keyspace/shard to target. Disables vtgate v3.")
jsonOutput = flag.Bool("json", false, "Output JSON instead of human-readable table")
)
func init() {
@ -121,7 +122,7 @@ func main() {
}
log.Infof("Sending the query...")
now := time.Now()
startTime := time.Now()
// handle dml
if isDml(args[0]) {
@ -145,9 +146,8 @@ func main() {
rowsAffected, err := result.RowsAffected()
lastInsertID, err := result.LastInsertId()
log.Infof("Total time: %v / Row affected: %v / Last Insert Id: %v", time.Now().Sub(now), rowsAffected, lastInsertID)
log.Infof("Total time: %v / Row affected: %v / Last Insert Id: %v", time.Since(startTime), rowsAffected, lastInsertID)
} else {
// launch the query
rows, err := db.Query(args[0], []interface{}(*bindVariables)...)
if err != nil {
@ -156,20 +156,16 @@ func main() {
}
defer rows.Close()
// print the headers
// get the headers
var qr results
cols, err := rows.Columns()
if err != nil {
log.Errorf("client error: %v", err)
exit.Return(1)
}
line := "Index"
for _, field := range cols {
line += "\t" + field
}
fmt.Printf("%s\n", line)
qr.Fields = cols
// get the rows
rowIndex := 0
for rows.Next() {
row := make([]interface{}, len(cols))
for i := range row {
@ -181,18 +177,41 @@ func main() {
exit.Return(1)
}
// print the line
line := fmt.Sprintf("%d", rowIndex)
// unpack []*string into []string
vals := make([]string, 0, len(row))
for _, value := range row {
line += fmt.Sprintf("\t%v", *(value.(*string)))
vals = append(vals, *(value.(*string)))
}
fmt.Printf("%s\n", line)
rowIndex++
qr.Rows = append(qr.Rows, vals)
}
if err := rows.Err(); err != nil {
log.Errorf("Error %v\n", err)
exit.Return(1)
}
log.Infof("Total time: %v / Row count: %v", time.Now().Sub(now), rowIndex)
if *jsonOutput {
data, err := json.MarshalIndent(qr, "", " ")
if err != nil {
log.Errorf("cannot marshal data: %v", err)
exit.Return(1)
}
fmt.Print(string(data))
} else {
printTable(qr, time.Since(startTime))
}
}
}
type results struct {
Fields []string `json:"fields"`
Rows [][]string `json:"rows"`
}
func printTable(qr results, dur time.Duration) {
table := tablewriter.NewWriter(os.Stdout)
table.SetHeader(qr.Fields)
table.SetAutoFormatHeaders(false)
table.AppendBulk(qr.Rows)
table.Render()
fmt.Printf("%v rows in set (%v)\n", len(qr.Rows), dur)
}

Просмотреть файл

@ -8,10 +8,10 @@ import querypb "github.com/youtube/vitess/go/vt/proto/query"
// Result represents a query result.
type Result struct {
Fields []*querypb.Field
RowsAffected uint64
InsertID uint64
Rows [][]Value
Fields []*querypb.Field `json:"fields"`
RowsAffected uint64 `json:"rows_affected"`
InsertID uint64 `json:"insert_id"`
Rows [][]Value `json:"rows"`
}
// Repair fixes the type info in the rows

Просмотреть файл

@ -27,7 +27,7 @@ type Histogram struct {
// NewHistogram creates a histogram with auto-generated labels
// based on the cutoffs. The buckets are categorized using the
// following criterion: cutoff[i-1] < value <= cutoff[i]. Anything
// following criterion: cutoff[i-1] <= value < cutoff[i]. Anything
// higher than the highest cutoff is labeled as "inf".
func NewHistogram(name string, cutoffs []int64) *Histogram {
labels := make([]string, len(cutoffs)+1)
@ -60,11 +60,12 @@ func NewGenericHistogram(name string, cutoffs []int64, labels []string, countLab
return h
}
// Add adds a new measurement to the Histogram.
func (h *Histogram) Add(value int64) {
for i := range h.labels {
if i == len(h.labels)-1 || value <= h.cutoffs[i] {
if i == len(h.labels)-1 || value < h.cutoffs[i] {
h.mu.Lock()
h.buckets[i] += 1
h.buckets[i]++
h.total += value
h.mu.Unlock()
return
@ -72,11 +73,13 @@ func (h *Histogram) Add(value int64) {
}
}
// String returns a string representation of the Histogram.
func (h *Histogram) String() string {
b, _ := h.MarshalJSON()
return string(b)
}
// MarshalJSON returns a JSON representation of the Histogram.
func (h *Histogram) MarshalJSON() ([]byte, error) {
h.mu.Lock()
defer h.mu.Unlock()
@ -94,6 +97,7 @@ func (h *Histogram) MarshalJSON() ([]byte, error) {
return b.Bytes(), nil
}
// Counts returns a map from labels to the current count in the Histogram for that label.
func (h *Histogram) Counts() map[string]int64 {
h.mu.Lock()
defer h.mu.Unlock()
@ -105,10 +109,12 @@ func (h *Histogram) Counts() map[string]int64 {
return counts
}
// CountLabel returns the count label that was set when this Histogram was created.
func (h *Histogram) CountLabel() string {
return h.countLabel
}
// Count returns the number of times Add has been called.
func (h *Histogram) Count() (count int64) {
h.mu.Lock()
defer h.mu.Unlock()
@ -119,22 +125,33 @@ func (h *Histogram) Count() (count int64) {
return
}
// TotalLabel returns the total label that was set when this Histogram was created.
func (h *Histogram) TotalLabel() string {
return h.totalLabel
}
// Total returns the sum of all values that have been added to this Histogram.
func (h *Histogram) Total() (total int64) {
h.mu.Lock()
defer h.mu.Unlock()
return h.total
}
// Labels returns the labels that were set when this Histogram was created.
func (h *Histogram) Labels() []string {
return h.labels
}
// Cutoffs returns the cutoffs that were set when this Histogram was created.
func (h *Histogram) Cutoffs() []int64 {
return h.cutoffs
}
// Buckets returns a snapshot of the current values in all buckets.
func (h *Histogram) Buckets() []int64 {
h.mu.Lock()
defer h.mu.Unlock()
return h.buckets
buckets := make([]int64, len(h.buckets))
copy(buckets, h.buckets)
return buckets
}

Просмотреть файл

@ -15,31 +15,29 @@ func TestHistogram(t *testing.T) {
for i := 0; i < 10; i++ {
h.Add(int64(i))
}
want := `{"1": 2, "5": 6, "inf": 10, "Count": 10, "Total": 45}`
want := `{"1": 1, "5": 5, "inf": 10, "Count": 10, "Total": 45}`
if h.String() != want {
t.Errorf("want %s, got %s", want, h.String())
t.Errorf("got %v, want %v", h.String(), want)
}
counts := h.Counts()
if counts["1"] != 2 {
t.Errorf("want 2, got %d", counts["1"])
counts["Count"] = h.Count()
counts["Total"] = h.Total()
for k, want := range map[string]int64{
"1": 1,
"5": 4,
"inf": 5,
"Count": 10,
"Total": 45,
} {
if got := counts[k]; got != want {
t.Errorf("histogram counts [%v]: got %d, want %d", k, got, want)
}
}
if counts["5"] != 4 {
t.Errorf("want 4, got %d", counts["2"])
if got, want := h.CountLabel(), "Count"; got != want {
t.Errorf("got %v, want %v", got, want)
}
if counts["inf"] != 4 {
t.Errorf("want 4, got %d", counts["inf"])
}
if h.Count() != 10 {
t.Errorf("want 10, got %d", h.Count())
}
if h.CountLabel() != "Count" {
t.Errorf("want Count, got %s", h.CountLabel())
}
if h.Total() != 45 {
t.Errorf("want 45, got %d", h.Total())
}
if h.TotalLabel() != "Total" {
t.Errorf("want Total, got %s", h.TotalLabel())
if got, want := h.TotalLabel(), "Total"; got != want {
t.Errorf("got %v, want %v", got, want)
}
}
@ -53,8 +51,8 @@ func TestGenericHistogram(t *testing.T) {
"total",
)
want := `{"one": 0, "five": 0, "max": 0, "count": 0, "total": 0}`
if h.String() != want {
t.Errorf("want %s, got %s", want, h.String())
if got := h.String(); got != want {
t.Errorf("got %v, want %v", got, want)
}
}
@ -67,11 +65,12 @@ func TestHistogramHook(t *testing.T) {
gotv = v.(*Histogram)
})
v := NewHistogram("hist2", []int64{1})
if gotname != "hist2" {
t.Errorf("want hist2, got %s", gotname)
name := "hist2"
v := NewHistogram(name, []int64{1})
if gotname != name {
t.Errorf("got %v; want %v", gotname, name)
}
if gotv != v {
t.Errorf("want %#v, got %#v", v, gotv)
t.Errorf("got %#v, want %#v", gotv, v)
}
}

Просмотреть файл

@ -16,7 +16,7 @@ func TestTimings(t *testing.T) {
tm.Add("tag1", 500*time.Microsecond)
tm.Add("tag1", 1*time.Millisecond)
tm.Add("tag2", 1*time.Millisecond)
want := `{"TotalCount":3,"TotalTime":2500000,"Histograms":{"tag1":{"500000":1,"1000000":2,"5000000":2,"10000000":2,"50000000":2,"100000000":2,"500000000":2,"1000000000":2,"5000000000":2,"10000000000":2,"inf":2,"Count":2,"Time":1500000},"tag2":{"500000":0,"1000000":1,"5000000":1,"10000000":1,"50000000":1,"100000000":1,"500000000":1,"1000000000":1,"5000000000":1,"10000000000":1,"inf":1,"Count":1,"Time":1000000}}}`
want := `{"TotalCount":3,"TotalTime":2500000,"Histograms":{"tag1":{"500000":0,"1000000":1,"5000000":2,"10000000":2,"50000000":2,"100000000":2,"500000000":2,"1000000000":2,"5000000000":2,"10000000000":2,"inf":2,"Count":2,"Time":1500000},"tag2":{"500000":0,"1000000":0,"5000000":1,"10000000":1,"50000000":1,"100000000":1,"500000000":1,"1000000000":1,"5000000000":1,"10000000000":1,"inf":1,"Count":1,"Time":1000000}}}`
if tm.String() != want {
t.Errorf("want %s, got %s", want, tm.String())
}
@ -28,7 +28,7 @@ func TestMultiTimings(t *testing.T) {
mtm.Add([]string{"tag1a", "tag1b"}, 500*time.Microsecond)
mtm.Add([]string{"tag1a", "tag1b"}, 1*time.Millisecond)
mtm.Add([]string{"tag2a", "tag2b"}, 1*time.Millisecond)
want := `{"TotalCount":3,"TotalTime":2500000,"Histograms":{"tag1a.tag1b":{"500000":1,"1000000":2,"5000000":2,"10000000":2,"50000000":2,"100000000":2,"500000000":2,"1000000000":2,"5000000000":2,"10000000000":2,"inf":2,"Count":2,"Time":1500000},"tag2a.tag2b":{"500000":0,"1000000":1,"5000000":1,"10000000":1,"50000000":1,"100000000":1,"500000000":1,"1000000000":1,"5000000000":1,"10000000000":1,"inf":1,"Count":1,"Time":1000000}}}`
want := `{"TotalCount":3,"TotalTime":2500000,"Histograms":{"tag1a.tag1b":{"500000":0,"1000000":1,"5000000":2,"10000000":2,"50000000":2,"100000000":2,"500000000":2,"1000000000":2,"5000000000":2,"10000000000":2,"inf":2,"Count":2,"Time":1500000},"tag2a.tag2b":{"500000":0,"1000000":0,"5000000":1,"10000000":1,"50000000":1,"100000000":1,"500000000":1,"1000000000":1,"5000000000":1,"10000000000":1,"inf":1,"Count":1,"Time":1000000}}}`
if mtm.String() != want {
t.Errorf("want %s, got %s", want, mtm.String())
}

Просмотреть файл

@ -7,44 +7,49 @@ import (
)
// ConsoleLogger is a Logger that uses glog directly to log, at the right level.
//
// Note that methods on ConsoleLogger must use pointer receivers,
// because otherwise an autogenerated conversion method will be inserted in the
// call stack when ConsoleLogger is used via TeeLogger, making the log depth
// incorrect.
type ConsoleLogger struct{}
// NewConsoleLogger returns a simple ConsoleLogger
func NewConsoleLogger() ConsoleLogger {
return ConsoleLogger{}
// NewConsoleLogger returns a simple ConsoleLogger.
func NewConsoleLogger() *ConsoleLogger {
return &ConsoleLogger{}
}
// Infof is part of the Logger interface
func (cl ConsoleLogger) Infof(format string, v ...interface{}) {
log.InfoDepth(2, fmt.Sprintf(format, v...))
func (cl *ConsoleLogger) Infof(format string, v ...interface{}) {
cl.InfoDepth(1, fmt.Sprintf(format, v...))
}
// Warningf is part of the Logger interface
func (cl ConsoleLogger) Warningf(format string, v ...interface{}) {
log.WarningDepth(2, fmt.Sprintf(format, v...))
func (cl *ConsoleLogger) Warningf(format string, v ...interface{}) {
cl.WarningDepth(1, fmt.Sprintf(format, v...))
}
// Errorf is part of the Logger interface
func (cl ConsoleLogger) Errorf(format string, v ...interface{}) {
log.ErrorDepth(2, fmt.Sprintf(format, v...))
func (cl *ConsoleLogger) Errorf(format string, v ...interface{}) {
cl.ErrorDepth(1, fmt.Sprintf(format, v...))
}
// Printf is part of the Logger interface
func (cl ConsoleLogger) Printf(format string, v ...interface{}) {
func (cl *ConsoleLogger) Printf(format string, v ...interface{}) {
fmt.Printf(format, v...)
}
// InfoDepth is part of the Logger interface.
func (cl ConsoleLogger) InfoDepth(depth int, s string) {
func (cl *ConsoleLogger) InfoDepth(depth int, s string) {
log.InfoDepth(1+depth, s)
}
// WarningDepth is part of the Logger interface.
func (cl ConsoleLogger) WarningDepth(depth int, s string) {
func (cl *ConsoleLogger) WarningDepth(depth int, s string) {
log.WarningDepth(1+depth, s)
}
// ErrorDepth is part of the Logger interface.
func (cl ConsoleLogger) ErrorDepth(depth int, s string) {
func (cl *ConsoleLogger) ErrorDepth(depth int, s string) {
log.ErrorDepth(1+depth, s)
}

Просмотреть файл

@ -0,0 +1,76 @@
package logutil
import (
"fmt"
"io/ioutil"
"os"
"os/exec"
"regexp"
"strings"
"testing"
)
func TestConsoleLogger(t *testing.T) {
testConsoleLogger(t, false, "TestConsoleLogger")
}
func TestTeeConsoleLogger(t *testing.T) {
testConsoleLogger(t, true, "TestTeeConsoleLogger")
}
func testConsoleLogger(t *testing.T, tee bool, entrypoint string) {
if os.Getenv("TEST_CONSOLE_LOGGER") == "1" {
// Generate output in subprocess.
var logger Logger
if tee {
logger = NewTeeLogger(NewConsoleLogger(), NewMemoryLogger())
} else {
logger = NewConsoleLogger()
}
// Add 'tee' to the output to make sure we've
// called the right method in the subprocess.
logger.Infof("info %v %v", 1, tee)
logger.Warningf("warning %v %v", 2, tee)
logger.Errorf("error %v %v", 3, tee)
return
}
// Run subprocess and collect console output.
cmd := exec.Command(os.Args[0], "-test.run=^"+entrypoint+"$", "-logtostderr")
cmd.Env = append(os.Environ(), "TEST_CONSOLE_LOGGER=1")
stderr, err := cmd.StderrPipe()
if err != nil {
t.Fatalf("cmd.StderrPipe() error: %v", err)
}
if err := cmd.Start(); err != nil {
t.Fatalf("cmd.Start() error: %v", err)
}
out, err := ioutil.ReadAll(stderr)
if err != nil {
t.Fatalf("ioutil.ReadAll(sterr) error: %v", err)
}
if err := cmd.Wait(); err != nil {
t.Fatalf("cmd.Wait() error: %v", err)
}
// Check output.
gotlines := strings.Split(string(out), "\n")
wantlines := []string{
fmt.Sprintf("^I.*info 1 %v$", tee),
fmt.Sprintf("^W.*warning 2 %v$", tee),
fmt.Sprintf("^E.*error 3 %v$", tee),
}
for i, want := range wantlines {
got := gotlines[i]
if !strings.Contains(got, "console_logger_test.go") {
t.Errorf("wrong file: %v", got)
}
match, err := regexp.MatchString(want, got)
if err != nil {
t.Errorf("regexp.MatchString error: %v", err)
}
if !match {
t.Errorf("got %q, want %q", got, want)
}
}
}

Просмотреть файл

@ -320,20 +320,17 @@ func (tl *TeeLogger) ErrorDepth(depth int, s string) {
// Infof is part of the Logger interface
func (tl *TeeLogger) Infof(format string, v ...interface{}) {
tl.One.InfoDepth(1, fmt.Sprintf(format, v...))
tl.Two.InfoDepth(1, fmt.Sprintf(format, v...))
tl.InfoDepth(1, fmt.Sprintf(format, v...))
}
// Warningf is part of the Logger interface
func (tl *TeeLogger) Warningf(format string, v ...interface{}) {
tl.One.WarningDepth(1, fmt.Sprintf(format, v...))
tl.Two.WarningDepth(1, fmt.Sprintf(format, v...))
tl.WarningDepth(1, fmt.Sprintf(format, v...))
}
// Errorf is part of the Logger interface
func (tl *TeeLogger) Errorf(format string, v ...interface{}) {
tl.One.ErrorDepth(1, fmt.Sprintf(format, v...))
tl.Two.ErrorDepth(1, fmt.Sprintf(format, v...))
tl.ErrorDepth(1, fmt.Sprintf(format, v...))
}
// Printf is part of the Logger interface

Просмотреть файл

@ -114,66 +114,43 @@ func TestChannelLogger(t *testing.T) {
}
func TestTeeLogger(t *testing.T) {
ml1 := NewMemoryLogger()
ml2 := NewMemoryLogger()
tl := NewTeeLogger(ml1, ml2)
ml := NewMemoryLogger()
cl := NewChannelLogger(10)
tl := NewTeeLogger(ml, cl)
tl.Infof("test infof %v %v", 1, 2)
tl.Warningf("test warningf %v %v", 2, 3)
tl.Errorf("test errorf %v %v", 3, 4)
tl.Printf("test printf %v %v", 4, 5)
tl.InfoDepth(0, "test infoDepth")
tl.WarningDepth(0, "test warningDepth")
tl.ErrorDepth(0, "test errorDepth")
for i, ml := range []*MemoryLogger{ml1, ml2} {
if len(ml.Events) != 7 {
t.Fatalf("Invalid ml%v size: %v", i+1, ml)
}
if ml.Events[0].Value != "test infof 1 2" {
t.Errorf("Invalid ml%v[0]: %v", i+1, ml.Events[0].Value)
}
if ml.Events[0].Level != logutilpb.Level_INFO {
t.Errorf("Invalid ml%v[0].level: %v", i+1, ml.Events[0].Level)
}
if ml.Events[1].Value != "test warningf 2 3" {
t.Errorf("Invalid ml%v[0]: %v", i+1, ml.Events[1].Value)
}
if ml.Events[1].Level != logutilpb.Level_WARNING {
t.Errorf("Invalid ml%v[0].level: %v", i+1, ml.Events[1].Level)
}
if ml.Events[2].Value != "test errorf 3 4" {
t.Errorf("Invalid ml%v[0]: %v", i+1, ml.Events[2].Value)
}
if ml.Events[2].Level != logutilpb.Level_ERROR {
t.Errorf("Invalid ml%v[0].level: %v", i+1, ml.Events[2].Level)
}
if ml.Events[3].Value != "test printf 4 5" {
t.Errorf("Invalid ml%v[0]: %v", i+1, ml.Events[3].Value)
}
if ml.Events[3].Level != logutilpb.Level_CONSOLE {
t.Errorf("Invalid ml%v[0].level: %v", i+1, ml.Events[3].Level)
}
if ml.Events[4].Value != "test infoDepth" {
t.Errorf("Invalid ml%v[0]: %v", i+1, ml.Events[4].Value)
}
if ml.Events[4].Level != logutilpb.Level_INFO {
t.Errorf("Invalid ml%v[0].level: %v", i+1, ml.Events[4].Level)
}
if ml.Events[5].Value != "test warningDepth" {
t.Errorf("Invalid ml%v[0]: %v", i+1, ml.Events[5].Value)
}
if ml.Events[5].Level != logutilpb.Level_WARNING {
t.Errorf("Invalid ml%v[0].level: %v", i+1, ml.Events[5].Level)
}
if ml.Events[6].Value != "test errorDepth" {
t.Errorf("Invalid ml%v[0]: %v", i+1, ml.Events[6].Value)
}
if ml.Events[6].Level != logutilpb.Level_ERROR {
t.Errorf("Invalid ml%v[0].level: %v", i+1, ml.Events[6].Level)
}
close(cl)
for j := range ml.Events {
if got, want := ml.Events[j].File, "logger_test.go"; got != want && ml.Events[j].Level != logutilpb.Level_CONSOLE {
t.Errorf("ml%v[%v].File = %q, want %q", i+1, j, got, want)
clEvents := []*logutilpb.Event{}
for e := range cl {
clEvents = append(clEvents, e)
}
wantEvents := []*logutilpb.Event{
{Level: logutilpb.Level_INFO, Value: "test infof 1 2"},
{Level: logutilpb.Level_WARNING, Value: "test warningf 2 3"},
{Level: logutilpb.Level_ERROR, Value: "test errorf 3 4"},
{Level: logutilpb.Level_CONSOLE, Value: "test printf 4 5"},
}
wantFile := "logger_test.go"
for i, events := range [][]*logutilpb.Event{ml.Events, clEvents} {
if got, want := len(events), len(wantEvents); got != want {
t.Fatalf("[%v] len(events) = %v, want %v", i, got, want)
}
for j, got := range events {
want := wantEvents[j]
if got.Level != want.Level {
t.Errorf("[%v] events[%v].Level = %s, want %s", i, j, got.Level, want.Level)
}
if got.Value != want.Value {
t.Errorf("[%v] events[%v].Value = %q, want %q", i, j, got.Value, want.Value)
}
if got.File != wantFile && got.Level != logutilpb.Level_CONSOLE {
t.Errorf("[%v] events[%v].File = %q, want %q", i, j, got.File, wantFile)
}
}
}

Просмотреть файл

@ -214,6 +214,7 @@ func backup(ctx context.Context, mysqld MysqlDaemon, logger logutil.Logger, bh b
sourceIsMaster := false
readOnly := true
var replicationPosition replication.Position
semiSyncMaster, semiSyncSlave := mysqld.SemiSyncEnabled()
// see if we need to restart replication after backup
logger.Infof("getting current replication status")
@ -225,35 +226,35 @@ func backup(ctx context.Context, mysqld MysqlDaemon, logger logutil.Logger, bh b
// keep going if we're the master, might be a degenerate case
sourceIsMaster = true
default:
return fmt.Errorf("cannot get slave status: %v", err)
return fmt.Errorf("can't get slave status: %v", err)
}
// get the read-only flag
readOnly, err = mysqld.IsReadOnly()
if err != nil {
return fmt.Errorf("cannot get read only status: %v", err)
return fmt.Errorf("can't get read-only status: %v", err)
}
// get the replication position
if sourceIsMaster {
if !readOnly {
logger.Infof("turning master read-onyl before backup")
logger.Infof("turning master read-only before backup")
if err = mysqld.SetReadOnly(true); err != nil {
return fmt.Errorf("cannot get read only status: %v", err)
return fmt.Errorf("can't set read-only status: %v", err)
}
}
replicationPosition, err = mysqld.MasterPosition()
if err != nil {
return fmt.Errorf("cannot get master position: %v", err)
return fmt.Errorf("can't get master position: %v", err)
}
} else {
if err = StopSlave(mysqld, hookExtraEnv); err != nil {
return fmt.Errorf("cannot stop slave: %v", err)
return fmt.Errorf("can't stop slave: %v", err)
}
var slaveStatus replication.Status
slaveStatus, err = mysqld.SlaveStatus()
if err != nil {
return fmt.Errorf("cannot get slave status: %v", err)
return fmt.Errorf("can't get slave status: %v", err)
}
replicationPosition = slaveStatus.Position
}
@ -262,28 +263,38 @@ func backup(ctx context.Context, mysqld MysqlDaemon, logger logutil.Logger, bh b
// shutdown mysqld
err = mysqld.Shutdown(ctx, true)
if err != nil {
return fmt.Errorf("cannot shutdown mysqld: %v", err)
return fmt.Errorf("can't shutdown mysqld: %v", err)
}
// get the files to backup
fes, err := findFilesTobackup(mysqld.Cnf())
if err != nil {
return fmt.Errorf("cannot find files to backup: %v", err)
return fmt.Errorf("can't find files to backup: %v", err)
}
logger.Infof("found %v files to backup", len(fes))
// backup everything
if err := backupFiles(mysqld, logger, bh, fes, replicationPosition, backupConcurrency); err != nil {
return fmt.Errorf("cannot backup files: %v", err)
return fmt.Errorf("can't backup files: %v", err)
}
// Try to restart mysqld
err = mysqld.Start(ctx)
if err != nil {
return fmt.Errorf("cannot restart mysqld: %v", err)
return fmt.Errorf("can't restart mysqld: %v", err)
}
// Restore original mysqld state that we saved above.
if semiSyncMaster || semiSyncSlave {
// Only do this if one of them was on, since both being off could mean
// the plugin isn't even loaded, and the server variables don't exist.
logger.Infof("restoring semi-sync settings from before backup: master=%v, slave=%v",
semiSyncMaster, semiSyncSlave)
err := mysqld.SetSemiSyncEnabled(semiSyncMaster, semiSyncSlave)
if err != nil {
return err
}
}
if slaveStartRequired {
logger.Infof("restarting mysql replication")
if err := StartSlave(mysqld, hookExtraEnv); err != nil {

Просмотреть файл

@ -38,6 +38,8 @@ type MysqlDaemon interface {
// replication related methods
SlaveStatus() (replication.Status, error)
SetSemiSyncEnabled(master, slave bool) error
SemiSyncEnabled() (master, slave bool)
// reparenting related methods
ResetReplicationCommands() ([]string, error)
@ -192,6 +194,11 @@ type FakeMysqlDaemon struct {
// BinlogPlayerEnabled is used by {Enable,Disable}BinlogPlayer
BinlogPlayerEnabled bool
// SemiSyncMasterEnabled represents the state of rpl_semi_sync_master_enabled.
SemiSyncMasterEnabled bool
// SemiSyncSlaveEnabled represents the state of rpl_semi_sync_slave_enabled.
SemiSyncSlaveEnabled bool
}
// NewFakeMysqlDaemon returns a FakeMysqlDaemon where mysqld appears
@ -438,3 +445,15 @@ func (fmd *FakeMysqlDaemon) GetAppConnection() (dbconnpool.PoolConnection, error
func (fmd *FakeMysqlDaemon) GetDbaConnection() (*dbconnpool.DBConnection, error) {
return dbconnpool.NewDBConnection(&sqldb.ConnParams{Engine: fmd.db.Name}, stats.NewTimings(""))
}
// SetSemiSyncEnabled is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) SetSemiSyncEnabled(master, slave bool) error {
fmd.SemiSyncMasterEnabled = master
fmd.SemiSyncSlaveEnabled = slave
return nil
}
// SemiSyncEnabled is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) SemiSyncEnabled() (master, slave bool) {
return fmd.SemiSyncMasterEnabled, fmd.SemiSyncSlaveEnabled
}

Просмотреть файл

@ -7,11 +7,34 @@ package mysqlctl
import (
"fmt"
"strings"
"time"
log "github.com/golang/glog"
"github.com/youtube/vitess/go/sqldb"
"github.com/youtube/vitess/go/sqltypes"
"github.com/youtube/vitess/go/vt/dbconnpool"
)
// getPoolReconnect gets a connection from a pool, tests it, and reconnects if
// it gets errno 2006.
func getPoolReconnect(pool *dbconnpool.ConnectionPool, timeout time.Duration) (dbconnpool.PoolConnection, error) {
conn, err := pool.Get(timeout)
if err != nil {
return conn, err
}
// Run a test query to see if this connection is still good.
if _, err := conn.ExecuteFetch("SELECT 1", 1, false); err != nil {
// If we get "MySQL server has gone away (errno 2006)", try to reconnect.
if sqlErr, ok := err.(*sqldb.SQLError); ok && sqlErr.Number() == 2006 {
if err := conn.Reconnect(); err != nil {
conn.Recycle()
return conn, err
}
}
}
return conn, nil
}
// ExecuteSuperQuery allows the user to execute a query as a super user.
func (mysqld *Mysqld) ExecuteSuperQuery(query string) error {
return mysqld.ExecuteSuperQueryList([]string{query})
@ -19,7 +42,7 @@ func (mysqld *Mysqld) ExecuteSuperQuery(query string) error {
// ExecuteSuperQueryList alows the user to execute queries as a super user.
func (mysqld *Mysqld) ExecuteSuperQueryList(queryList []string) error {
conn, connErr := mysqld.dbaPool.Get(0)
conn, connErr := getPoolReconnect(mysqld.dbaPool, 0)
if connErr != nil {
return connErr
}
@ -35,7 +58,7 @@ func (mysqld *Mysqld) ExecuteSuperQueryList(queryList []string) error {
// FetchSuperQuery returns the results of executing a query as a super user.
func (mysqld *Mysqld) FetchSuperQuery(query string) (*sqltypes.Result, error) {
conn, connErr := mysqld.dbaPool.Get(0)
conn, connErr := getPoolReconnect(mysqld.dbaPool, 0)
if connErr != nil {
return nil, connErr
}
@ -62,13 +85,31 @@ func (mysqld *Mysqld) fetchSuperQueryMap(query string) (map[string]string, error
return nil, fmt.Errorf("query %#v returned %d column names, expected %d", query, len(qr.Fields), len(qr.Rows[0]))
}
rowMap := make(map[string]string)
rowMap := make(map[string]string, len(qr.Rows[0]))
for i, value := range qr.Rows[0] {
rowMap[qr.Fields[i].Name] = value.String()
}
return rowMap, nil
}
// fetchVariables returns a map from MySQL variable names to variable value
// for variables that match the given pattern.
func (mysqld *Mysqld) fetchVariables(pattern string) (map[string]string, error) {
query := fmt.Sprintf("SHOW VARIABLES LIKE '%s'", pattern)
qr, err := mysqld.FetchSuperQuery(query)
if err != nil {
return nil, err
}
if len(qr.Fields) != 2 {
return nil, fmt.Errorf("query %#v returned %d columns, expected 2", query, len(qr.Fields))
}
varMap := make(map[string]string, len(qr.Rows))
for _, row := range qr.Rows {
varMap[row[0].String()] = row[1].String()
}
return varMap, nil
}
const masterPasswordStart = " MASTER_PASSWORD = '"
const masterPasswordEnd = "',\n"

Просмотреть файл

@ -346,3 +346,43 @@ func (mysqld *Mysqld) DisableBinlogPlayback() error {
}
return flavor.DisableBinlogPlayback(mysqld)
}
// SetSemiSyncEnabled enables or disables semi-sync replication for
// master and/or slave mode.
func (mysqld *Mysqld) SetSemiSyncEnabled(master, slave bool) error {
log.Infof("Setting semi-sync mode: master=%v, slave=%v", master, slave)
// Convert bool to int.
var m, s int
if master {
m = 1
}
if slave {
s = 1
}
err := mysqld.ExecuteSuperQuery(
fmt.Sprintf(
"SET GLOBAL rpl_semi_sync_master_enabled = %v, GLOBAL rpl_semi_sync_slave_enabled = %v",
m, s))
if err != nil {
return fmt.Errorf("can't set semi-sync mode: %v; make sure plugins are loaded in my.cnf", err)
}
return nil
}
// SemiSyncEnabled returns whether semi-sync is enabled for master or slave.
// If the semi-sync plugin is not loaded, we assume semi-sync is disabled.
func (mysqld *Mysqld) SemiSyncEnabled() (master, slave bool) {
vars, err := mysqld.fetchVariables("rpl_semi_sync_%_enabled")
if err != nil {
return false, false
}
if mval, mok := vars["rpl_semi_sync_master_enabled"]; mok {
master = (mval == "ON")
}
if sval, sok := vars["rpl_semi_sync_slave_enabled"]; sok {
slave = (sval == "ON")
}
return master, slave
}

Просмотреть файл

@ -101,6 +101,13 @@ func (agent *ActionAgent) startReplication(ctx context.Context, pos replication.
return fmt.Errorf("Cannot read master tablet %v: %v", si.MasterAlias, err)
}
// If using semi-sync, we need to enable it before connecting to master.
if *EnableSemiSync {
if err := agent.enableSemiSync(false); err != nil {
return err
}
}
// Set master and start slave.
cmds, err = agent.MysqlDaemon.SetMasterCommands(ti.Hostname, int(ti.PortMap["mysql"]))
if err != nil {

Просмотреть файл

@ -5,6 +5,7 @@
package tabletmanager
import (
"flag"
"fmt"
"time"
@ -20,6 +21,11 @@ import (
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
)
var (
// EnableSemiSync is exported for use by vt/wrangler/testlib.
EnableSemiSync = flag.Bool("enable_semi_sync", false, "Enable semi-sync when configuring replication, on master and replica tablets only (rdonly tablets will not ack).")
)
// SlaveStatus returns the replication status
// Should be called under RPCWrap.
func (agent *ActionAgent) SlaveStatus(ctx context.Context) (*replicationdatapb.Status, error) {
@ -72,6 +78,11 @@ func (agent *ActionAgent) StopSlaveMinimum(ctx context.Context, position string,
// replication or not (using hook if not).
// Should be called under RPCWrapLock.
func (agent *ActionAgent) StartSlave(ctx context.Context) error {
if *EnableSemiSync {
if err := agent.enableSemiSync(false); err != nil {
return err
}
}
return mysqlctl.StartSlave(agent.MysqlDaemon, agent.hookExtraEnv())
}
@ -107,10 +118,16 @@ func (agent *ActionAgent) InitMaster(ctx context.Context) (string, error) {
return "", err
}
// If using semi-sync, we need to enable it before going read-write.
if *EnableSemiSync {
if err := agent.enableSemiSync(true); err != nil {
return "", err
}
}
// Set the server read-write, from now on we can accept real
// client writes. Note that if semi-sync replication is enabled,
// we'll still need some slaves to be able to commit
// transactions.
// we'll still need some slaves to be able to commit transactions.
if err := agent.MysqlDaemon.SetReadOnly(false); err != nil {
return "", err
}
@ -152,6 +169,13 @@ func (agent *ActionAgent) InitSlave(ctx context.Context, parent *topodatapb.Tabl
return err
}
// If using semi-sync, we need to enable it before connecting to master.
if *EnableSemiSync {
if err := agent.enableSemiSync(false); err != nil {
return err
}
}
cmds, err := agent.MysqlDaemon.SetSlavePositionCommands(pos)
if err != nil {
return err
@ -189,6 +213,13 @@ func (agent *ActionAgent) DemoteMaster(ctx context.Context) (string, error) {
return "", fmt.Errorf("disallowQueries failed: %v", err)
}
// If using semi-sync, we need to disable master-side.
if *EnableSemiSync {
if err := agent.enableSemiSync(false); err != nil {
return "", err
}
}
pos, err := agent.MysqlDaemon.DemoteMaster()
if err != nil {
return "", err
@ -227,6 +258,13 @@ func (agent *ActionAgent) PromoteSlaveWhenCaughtUp(ctx context.Context, position
return "", err
}
// If using semi-sync, we need to enable it before going read-write.
if *EnableSemiSync {
if err := agent.enableSemiSync(true); err != nil {
return "", err
}
}
if err := agent.MysqlDaemon.SetReadOnly(false); err != nil {
return "", err
}
@ -265,6 +303,13 @@ func (agent *ActionAgent) SetMaster(ctx context.Context, parentAlias *topodatapb
shouldbeReplicating = true
}
// If using semi-sync, we need to enable it before connecting to master.
if *EnableSemiSync {
if err := agent.enableSemiSync(false); err != nil {
return err
}
}
// Create the list of commands to set the master
cmds := []string{}
if wasReplicating {
@ -347,6 +392,13 @@ func (agent *ActionAgent) PromoteSlave(ctx context.Context) (string, error) {
return "", err
}
// If using semi-sync, we need to enable it before going read-write.
if *EnableSemiSync {
if err := agent.enableSemiSync(true); err != nil {
return "", err
}
}
// Set the server read-write
if err := agent.MysqlDaemon.SetReadOnly(false); err != nil {
return "", err
@ -358,3 +410,37 @@ func (agent *ActionAgent) PromoteSlave(ctx context.Context) (string, error) {
return replication.EncodePosition(pos), nil
}
func (agent *ActionAgent) isMasterEligible() (bool, error) {
switch agent.Tablet().Type {
case topodatapb.TabletType_MASTER, topodatapb.TabletType_REPLICA:
return true, nil
case topodatapb.TabletType_SPARE:
// If we're SPARE, it could be because healthcheck is enabled.
tt, err := topoproto.ParseTabletType(*targetTabletType)
if err != nil {
return false, fmt.Errorf("can't determine if tablet is master-eligible: currently SPARE and no -target_tablet_type flag specified")
}
if tt == topodatapb.TabletType_REPLICA {
return true, nil
}
}
return false, nil
}
func (agent *ActionAgent) enableSemiSync(master bool) error {
// Only enable if we're eligible for becoming master (REPLICA type).
// Ineligible slaves (RDONLY) shouldn't ACK because we'll never promote them.
masterEligible, err := agent.isMasterEligible()
if err != nil {
return fmt.Errorf("can't enable semi-sync: %v", err)
}
if !masterEligible {
return nil
}
// Always enable slave-side since it doesn't hurt to keep it on for a master.
// The master-side needs to be off for a slave, or else it will get stuck.
return agent.MysqlDaemon.SetSemiSyncEnabled(master, true)
}

Просмотреть файл

@ -132,7 +132,7 @@ func NewSchemaInfo(
queryServiceStats *QueryServiceStats) *SchemaInfo {
si := &SchemaInfo{
queries: cache.NewLRUCache(int64(queryCacheSize)),
connPool: NewConnPool("", 2, idleTimeout, enablePublishStats, queryServiceStats, checker),
connPool: NewConnPool("", 3, idleTimeout, enablePublishStats, queryServiceStats, checker),
cachePool: cachePool,
ticks: timer.NewTimer(reloadTime),
endpoints: endpoints,
@ -186,23 +186,37 @@ func (si *SchemaInfo) Open(appParams, dbaParams *sqldb.ConnParams, schemaOverrid
tables := make(map[string]*TableInfo, len(tableData.Rows)+1)
tables["dual"] = &TableInfo{Table: schema.NewTable("dual")}
wg := sync.WaitGroup{}
mu := sync.Mutex{}
for _, row := range tableData.Rows {
tableName := row[0].String()
tableInfo, err := NewTableInfo(
conn,
tableName,
row[1].String(), // table_type
row[3].String(), // table_comment
si.cachePool,
)
if err != nil {
si.recordSchemaError(err, tableName)
// Skip over the table that had an error and move on to the next one
continue
}
tableInfo.SetMysqlStats(row[4], row[5], row[6], row[7])
tables[tableName] = tableInfo
wg.Add(1)
go func(row []sqltypes.Value) {
defer wg.Done()
conn := getOrPanic(ctx, si.connPool)
defer conn.Recycle()
tableName := row[0].String()
tableInfo, err := NewTableInfo(
conn,
tableName,
row[1].String(), // table_type
row[3].String(), // table_comment
si.cachePool,
)
if err != nil {
si.recordSchemaError(err, tableName)
// Skip over the table that had an error and move on to the next one
return
}
tableInfo.SetMysqlStats(row[4], row[5], row[6], row[7])
mu.Lock()
tables[tableName] = tableInfo
mu.Unlock()
}(row)
}
wg.Wait()
// Fail if we can't load the schema for any tables, but we know that some tables exist. This points to a configuration problem.
if len(tableData.Rows) != 0 && len(tables) == 1 { // len(tables) is always at least 1 because of the "dual" table
panic(NewTabletError(ErrFail, vtrpcpb.ErrorCode_INTERNAL_ERROR, "could not get schema for any tables"))

Просмотреть файл

@ -9,10 +9,14 @@ import (
"encoding/json"
"flag"
"fmt"
"io"
"strconv"
"strings"
"time"
"github.com/olekukonko/tablewriter"
"github.com/youtube/vitess/go/sqltypes"
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/tabletserver/tabletconn"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/topo/topoproto"
@ -34,17 +38,17 @@ func init() {
addCommand(queriesGroupName, command{
"VtGateExecute",
commandVtGateExecute,
"-server <vtgate> [-bind_variables <JSON map>] [-connect_timeout <connect timeout>] [-tablet_type <tablet type>] <sql>",
"-server <vtgate> [-bind_variables <JSON map>] [-connect_timeout <connect timeout>] [-tablet_type <tablet type>] [-json] <sql>",
"Executes the given SQL query with the provided bound variables against the vtgate server."})
addCommand(queriesGroupName, command{
"VtGateExecuteShards",
commandVtGateExecuteShards,
"-server <vtgate> -keyspace <keyspace> -shards <shard0>,<shard1>,... [-bind_variables <JSON map>] [-connect_timeout <connect timeout>] [-tablet_type <tablet type>] <sql>",
"-server <vtgate> -keyspace <keyspace> -shards <shard0>,<shard1>,... [-bind_variables <JSON map>] [-connect_timeout <connect timeout>] [-tablet_type <tablet type>] [-json] <sql>",
"Executes the given SQL query with the provided bound variables against the vtgate server. It is routed to the provided shards."})
addCommand(queriesGroupName, command{
"VtGateExecuteKeyspaceIds",
commandVtGateExecuteKeyspaceIds,
"-server <vtgate> -keyspace <keyspace> -keyspace_ids <ks1 in hex>,<k2 in hex>,... [-bind_variables <JSON map>] [-connect_timeout <connect timeout>] [-tablet_type <tablet type>] <sql>",
"-server <vtgate> -keyspace <keyspace> -keyspace_ids <ks1 in hex>,<k2 in hex>,... [-bind_variables <JSON map>] [-connect_timeout <connect timeout>] [-tablet_type <tablet type>] [-json] <sql>",
"Executes the given SQL query with the provided bound variables against the vtgate server. It is routed to the shards that contain the provided keyspace ids."})
addCommand(queriesGroupName, command{
"VtGateSplitQuery",
@ -56,7 +60,7 @@ func init() {
addCommand(queriesGroupName, command{
"VtTabletExecute",
commandVtTabletExecute,
"[-bind_variables <JSON map>] [-connect_timeout <connect timeout>] [-transaction_id <transaction_id>] [-tablet_type <tablet_type>] -keyspace <keyspace> -shard <shard> <tablet alias> <sql>",
"[-bind_variables <JSON map>] [-connect_timeout <connect timeout>] [-transaction_id <transaction_id>] [-tablet_type <tablet_type>] [-json] -keyspace <keyspace> -shard <shard> <tablet alias> <sql>",
"Executes the given query on the given tablet."})
addCommand(queriesGroupName, command{
"VtTabletBegin",
@ -125,6 +129,8 @@ func commandVtGateExecute(ctx context.Context, wr *wrangler.Wrangler, subFlags *
bindVariables := newBindvars(subFlags)
connectTimeout := subFlags.Duration("connect_timeout", 30*time.Second, "Connection timeout for vtgate client")
tabletType := subFlags.String("tablet_type", "master", "tablet type to query")
json := subFlags.Bool("json", false, "Output JSON instead of human-readable table")
if err := subFlags.Parse(args); err != nil {
return err
}
@ -145,7 +151,11 @@ func commandVtGateExecute(ctx context.Context, wr *wrangler.Wrangler, subFlags *
if err != nil {
return fmt.Errorf("Execute failed: %v", err)
}
return printJSON(wr, qr)
if *json {
return printJSON(wr.Logger(), qr)
}
printQueryResult(loggerWriter{wr.Logger()}, qr)
return nil
}
func commandVtGateExecuteShards(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
@ -155,6 +165,8 @@ func commandVtGateExecuteShards(ctx context.Context, wr *wrangler.Wrangler, subF
tabletType := subFlags.String("tablet_type", "master", "tablet type to query")
keyspace := subFlags.String("keyspace", "", "keyspace to send query to")
shardsStr := subFlags.String("shards", "", "comma-separated list of shards to send query to")
json := subFlags.Bool("json", false, "Output JSON instead of human-readable table")
if err := subFlags.Parse(args); err != nil {
return err
}
@ -179,7 +191,11 @@ func commandVtGateExecuteShards(ctx context.Context, wr *wrangler.Wrangler, subF
if err != nil {
return fmt.Errorf("Execute failed: %v", err)
}
return printJSON(wr, qr)
if *json {
return printJSON(wr.Logger(), qr)
}
printQueryResult(loggerWriter{wr.Logger()}, qr)
return nil
}
func commandVtGateExecuteKeyspaceIds(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
@ -189,6 +205,8 @@ func commandVtGateExecuteKeyspaceIds(ctx context.Context, wr *wrangler.Wrangler,
tabletType := subFlags.String("tablet_type", "master", "tablet type to query")
keyspace := subFlags.String("keyspace", "", "keyspace to send query to")
keyspaceIDsStr := subFlags.String("keyspace_ids", "", "comma-separated list of keyspace ids (in hex) that will map into shards to send query to")
json := subFlags.Bool("json", false, "Output JSON instead of human-readable table")
if err := subFlags.Parse(args); err != nil {
return err
}
@ -220,7 +238,11 @@ func commandVtGateExecuteKeyspaceIds(ctx context.Context, wr *wrangler.Wrangler,
if err != nil {
return fmt.Errorf("Execute failed: %v", err)
}
return printJSON(wr, qr)
if *json {
return printJSON(wr.Logger(), qr)
}
printQueryResult(loggerWriter{wr.Logger()}, qr)
return nil
}
func commandVtGateSplitQuery(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
@ -246,7 +268,7 @@ func commandVtGateSplitQuery(ctx context.Context, wr *wrangler.Wrangler, subFlag
if err != nil {
return fmt.Errorf("SplitQuery failed: %v", err)
}
return printJSON(wr, r)
return printJSON(wr.Logger(), r)
}
func commandVtTabletExecute(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
@ -256,6 +278,8 @@ func commandVtTabletExecute(ctx context.Context, wr *wrangler.Wrangler, subFlags
shard := subFlags.String("shard", "", "shard the tablet belongs to")
tabletType := subFlags.String("tablet_type", "unknown", "tablet type we expect from the tablet (use unknown to use sessionId)")
connectTimeout := subFlags.Duration("connect_timeout", 30*time.Second, "Connection timeout for vttablet client")
json := subFlags.Bool("json", false, "Output JSON instead of human-readable table")
if err := subFlags.Parse(args); err != nil {
return err
}
@ -289,7 +313,11 @@ func commandVtTabletExecute(ctx context.Context, wr *wrangler.Wrangler, subFlags
if err != nil {
return fmt.Errorf("Execute failed: %v", err)
}
return printJSON(wr, qr)
if *json {
return printJSON(wr.Logger(), qr)
}
printQueryResult(loggerWriter{wr.Logger()}, qr)
return nil
}
func commandVtTabletBegin(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
@ -333,7 +361,7 @@ func commandVtTabletBegin(ctx context.Context, wr *wrangler.Wrangler, subFlags *
result := map[string]int64{
"transaction_id": transactionID,
}
return printJSON(wr, result)
return printJSON(wr.Logger(), result)
}
func commandVtTabletCommit(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
@ -465,3 +493,39 @@ func commandVtTabletStreamHealth(ctx context.Context, wr *wrangler.Wrangler, sub
}
return nil
}
// loggerWriter turns a Logger into a Writer by decorating it with a Write()
// method that sends everything to Logger.Printf().
type loggerWriter struct {
logutil.Logger
}
func (lw loggerWriter) Write(p []byte) (int, error) {
lw.Logger.Printf("%s", p)
return len(p), nil
}
// printQueryResult will pretty-print a QueryResult to the logger.
func printQueryResult(writer io.Writer, qr *sqltypes.Result) {
table := tablewriter.NewWriter(writer)
table.SetAutoFormatHeaders(false)
// Make header.
header := make([]string, 0, len(qr.Fields))
for _, field := range qr.Fields {
header = append(header, field.Name)
}
table.SetHeader(header)
// Add rows.
for _, row := range qr.Rows {
vals := make([]string, 0, len(row))
for _, val := range row {
vals = append(vals, val.String())
}
table.Append(vals)
}
// Print table.
table.Render()
}

39
go/vt/vtctl/query_test.go Normal file
Просмотреть файл

@ -0,0 +1,39 @@
// Copyright 2016, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package vtctl
import (
"bytes"
"testing"
"github.com/youtube/vitess/go/sqltypes"
querypb "github.com/youtube/vitess/go/vt/proto/query"
)
func TestPrintQueryResult(t *testing.T) {
input := &sqltypes.Result{
Fields: []*querypb.Field{{Name: "a"}, {Name: "b"}},
Rows: [][]sqltypes.Value{
{sqltypes.MakeString([]byte("1")), sqltypes.MakeString([]byte("2"))},
{sqltypes.MakeString([]byte("3")), sqltypes.MakeString([]byte("4"))},
},
}
// Use a simple example so we're not sensitive to alignment settings, etc.
want := `
+---+---+
| a | b |
+---+---+
| 1 | 2 |
| 3 | 4 |
+---+---+
`[1:]
buf := &bytes.Buffer{}
printQueryResult(buf, input)
if got := buf.String(); got != want {
t.Errorf("printQueryResult() = %q, want %q", got, want)
}
}

Просмотреть файл

@ -90,6 +90,7 @@ import (
"golang.org/x/net/context"
"github.com/youtube/vitess/go/flagutil"
"github.com/youtube/vitess/go/sqltypes"
hk "github.com/youtube/vitess/go/vt/hook"
"github.com/youtube/vitess/go/vt/key"
"github.com/youtube/vitess/go/vt/logutil"
@ -183,7 +184,7 @@ var commands = []commandGroup{
"Runs the specified hook on the given tablet. A hook is a script that resides in the $VTROOT/vthook directory. You can put any script into that directory and use this command to run that script.\n" +
"For this command, the param=value arguments are parameters that the command passes to the specified hook."},
{"ExecuteFetchAsDba", commandExecuteFetchAsDba,
"[--max_rows=10000] [--disable_binlogs] <tablet alias> <sql command>",
"[-max_rows=10000] [-disable_binlogs] [-json] <tablet alias> <sql command>",
"Runs the given SQL command as a DBA on the remote tablet."},
},
},
@ -657,7 +658,7 @@ func commandGetTablet(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag
if err != nil {
return err
}
return printJSON(wr, tabletInfo)
return printJSON(wr.Logger(), tabletInfo)
}
func commandUpdateTabletAddrs(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
@ -981,6 +982,7 @@ func commandExecuteFetchAsDba(ctx context.Context, wr *wrangler.Wrangler, subFla
maxRows := subFlags.Int("max_rows", 10000, "Specifies the maximum number of rows to allow in reset")
disableBinlogs := subFlags.Bool("disable_binlogs", false, "Disables writing to binlogs during the query")
reloadSchema := subFlags.Bool("reload_schema", false, "Indicates whether the tablet schema will be reloaded after executing the SQL command. The default value is <code>false</code>, which indicates that the tablet schema will not be reloaded.")
json := subFlags.Bool("json", false, "Output JSON instead of human-readable table")
if err := subFlags.Parse(args); err != nil {
return err
@ -994,11 +996,16 @@ func commandExecuteFetchAsDba(ctx context.Context, wr *wrangler.Wrangler, subFla
return err
}
query := subFlags.Arg(1)
qr, err := wr.ExecuteFetchAsDba(ctx, alias, query, *maxRows, *disableBinlogs, *reloadSchema)
qrproto, err := wr.ExecuteFetchAsDba(ctx, alias, query, *maxRows, *disableBinlogs, *reloadSchema)
if err != nil {
return err
}
return printJSON(wr, qr)
qr := sqltypes.Proto3ToResult(qrproto)
if *json {
return printJSON(wr.Logger(), qr)
}
printQueryResult(loggerWriter{wr.Logger()}, qr)
return nil
}
func commandExecuteHook(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
@ -1018,7 +1025,7 @@ func commandExecuteHook(ctx context.Context, wr *wrangler.Wrangler, subFlags *fl
if err != nil {
return err
}
return printJSON(wr, hr)
return printJSON(wr.Logger(), hr)
}
func commandCreateShard(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
@ -1065,7 +1072,7 @@ func commandGetShard(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.
if err != nil {
return err
}
return printJSON(wr, shardInfo)
return printJSON(wr.Logger(), shardInfo)
}
func commandRebuildShardGraph(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
@ -1552,7 +1559,7 @@ func commandGetKeyspace(ctx context.Context, wr *wrangler.Wrangler, subFlags *fl
if err != nil {
return err
}
return printJSON(wr, keyspaceInfo)
return printJSON(wr.Logger(), keyspaceInfo)
}
func commandGetKeyspaces(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
@ -1729,7 +1736,7 @@ func commandFindAllShardsInKeyspace(ctx context.Context, wr *wrangler.Wrangler,
if err != nil {
return err
}
return printJSON(wr, result)
return printJSON(wr.Logger(), result)
}
func commandValidate(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
@ -1828,7 +1835,7 @@ func commandGetSchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag
}
return nil
}
return printJSON(wr, sd)
return printJSON(wr.Logger(), sd)
}
func commandReloadSchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
@ -2061,7 +2068,7 @@ func commandGetSrvKeyspace(ctx context.Context, wr *wrangler.Wrangler, subFlags
if err != nil {
return err
}
return printJSON(wr, srvKeyspace)
return printJSON(wr.Logger(), srvKeyspace)
}
func commandGetSrvKeyspaceNames(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
@ -2098,7 +2105,7 @@ func commandGetSrvShard(ctx context.Context, wr *wrangler.Wrangler, subFlags *fl
if err != nil {
return err
}
return printJSON(wr, srvShard)
return printJSON(wr.Logger(), srvShard)
}
func commandGetEndPoints(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
@ -2121,7 +2128,7 @@ func commandGetEndPoints(ctx context.Context, wr *wrangler.Wrangler, subFlags *f
if err != nil {
return err
}
return printJSON(wr, endPoints)
return printJSON(wr.Logger(), endPoints)
}
func commandGetShardReplication(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
@ -2140,7 +2147,7 @@ func commandGetShardReplication(ctx context.Context, wr *wrangler.Wrangler, subF
if err != nil {
return err
}
return printJSON(wr, shardReplication)
return printJSON(wr.Logger(), shardReplication)
}
func commandHelp(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
@ -2221,14 +2228,13 @@ func sortReplicatingTablets(tablets []*topo.TabletInfo, stats []*replicationdata
return rtablets
}
// printJSON will print the JSON version of the structure to the logger
// console output, or an error to the logger's Error channel.
func printJSON(wr *wrangler.Wrangler, val interface{}) error {
// printJSON will print the JSON version of the structure to the logger.
func printJSON(logger logutil.Logger, val interface{}) error {
data, err := json.MarshalIndent(val, "", " ")
if err != nil {
return fmt.Errorf("cannot marshal data: %v", err)
}
wr.Logger().Printf("%v\n", string(data))
logger.Printf("%v\n", string(data))
return nil
}

Просмотреть файл

@ -138,6 +138,8 @@ func TestEmergencyReparentShard(t *testing.T) {
if goodSlave2.FakeMysqlDaemon.Replicating {
t.Errorf("goodSlave2.FakeMysqlDaemon.Replicating set")
}
checkSemiSyncEnabled(t, true, true, newMaster)
checkSemiSyncEnabled(t, false, true, goodSlave1, goodSlave2)
}
// TestEmergencyReparentShardMasterElectNotBest tries to emergency reparent

Просмотреть файл

@ -122,6 +122,8 @@ func TestInitMasterShard(t *testing.T) {
if err := goodSlave2.FakeMysqlDaemon.CheckSuperQueryList(); err != nil {
t.Fatalf("goodSlave2.FakeMysqlDaemon.CheckSuperQueryList failed: %v", err)
}
checkSemiSyncEnabled(t, true, true, master)
checkSemiSyncEnabled(t, false, true, goodSlave1, goodSlave2)
}
// TestInitMasterShardChecks makes sure the safety checks work

Просмотреть файл

@ -141,4 +141,7 @@ func TestPlannedReparentShard(t *testing.T) {
if goodSlave2.FakeMysqlDaemon.Replicating {
t.Errorf("goodSlave2.FakeMysqlDaemon.Replicating set")
}
checkSemiSyncEnabled(t, true, true, newMaster)
checkSemiSyncEnabled(t, false, true, goodSlave1, goodSlave2, oldMaster)
}

Просмотреть файл

@ -134,4 +134,5 @@ func TestReparentTablet(t *testing.T) {
if err := slave.FakeMysqlDaemon.CheckSuperQueryList(); err != nil {
t.Fatalf("slave.FakeMysqlDaemon.CheckSuperQueryList failed: %v", err)
}
checkSemiSyncEnabled(t, false, true, slave)
}

Просмотреть файл

@ -0,0 +1,28 @@
// Copyright 2016, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package testlib
import (
"testing"
"github.com/youtube/vitess/go/vt/tabletmanager"
"github.com/youtube/vitess/go/vt/topo/topoproto"
)
func init() {
// Enable semi-sync for all testlib tests.
*tabletmanager.EnableSemiSync = true
}
func checkSemiSyncEnabled(t *testing.T, master, slave bool, tablets ...*FakeTablet) {
for _, tablet := range tablets {
if got, want := tablet.FakeMysqlDaemon.SemiSyncMasterEnabled, master; got != want {
t.Errorf("%v: SemiSyncMasterEnabled = %v, want %v", topoproto.TabletAliasString(tablet.Tablet.Alias), got, want)
}
if got, want := tablet.FakeMysqlDaemon.SemiSyncSlaveEnabled, slave; got != want {
t.Errorf("%v: SemiSyncSlaveEnabled = %v, want %v", topoproto.TabletAliasString(tablet.Tablet.Alias), got, want)
}
}
}

Просмотреть файл

@ -13,12 +13,14 @@ import (
"google.golang.org/grpc"
logutilpb "github.com/youtube/vitess/go/vt/proto/logutil"
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/vtctl/grpcvtctlserver"
"github.com/youtube/vitess/go/vt/vtctl/vtctlclient"
"golang.org/x/net/context"
logutilpb "github.com/youtube/vitess/go/vt/proto/logutil"
// we need to import the grpcvtctlclient library so the gRPC
// vtctl client is registered and can be used.
_ "github.com/youtube/vitess/go/vt/vtctl/grpcvtctlclient"
@ -80,7 +82,7 @@ func (vp *VtctlPipe) Run(args []string) error {
return fmt.Errorf("VtctlPipe.Run() failed: %v", err)
}
for le := range c {
vp.t.Logf(le.String())
vp.t.Logf(logutil.EventString(le))
}
return errFunc()
}

Просмотреть файл

@ -165,15 +165,15 @@ public class DateTime {
public static String formatTime(Time value, Calendar cal) {
long millis = value.getTime();
// Adjust for time zone.
millis += cal.get(Calendar.ZONE_OFFSET);
String sign = "";
if (millis < 0) {
sign = "-";
millis = -millis;
}
// Adjust for time zone.
millis += cal.get(Calendar.ZONE_OFFSET);
long hours = millis / HOURS_TO_MILLIS;
millis -= hours * HOURS_TO_MILLIS;
long minutes = millis / MINUTES_TO_MILLIS;

Просмотреть файл

@ -20,6 +20,8 @@ import java.util.TimeZone;
@RunWith(JUnit4.class)
public class DateTimeTest {
private static final Calendar GMT = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
private static final Calendar PST = Calendar.getInstance(TimeZone.getTimeZone("GMT-8"));
private static final Calendar IST = Calendar.getInstance(TimeZone.getTimeZone("GMT+0530"));
private static final Map<String, Date> TEST_DATES =
new ImmutableMap.Builder<String, Date>()
@ -119,6 +121,40 @@ public class DateTimeTest {
}
}
@Test
public void testParseTimePST() throws Exception {
// Test absolute UNIX epoch values in a negative GMT offset.
final Map<String, Time> TEST_TIMES =
new ImmutableMap.Builder<String, Time>()
.put("-08:00:00", new Time(0))
.put("04:34:56", new Time(45296000L))
.put("-20:34:56", new Time(-45296000L))
.build();
for (Map.Entry<String, Time> entry : TEST_TIMES.entrySet()) {
String timeString = entry.getKey();
Time time = entry.getValue();
assertEquals(time, DateTime.parseTime(timeString, PST));
}
}
@Test
public void testParseTimeIST() throws Exception {
// Test absolute UNIX epoch values in a positive GMT offset.
final Map<String, Time> TEST_TIMES =
new ImmutableMap.Builder<String, Time>()
.put("05:30:00", new Time(0))
.put("18:04:56", new Time(45296000L))
.put("-07:04:56", new Time(-45296000L))
.build();
for (Map.Entry<String, Time> entry : TEST_TIMES.entrySet()) {
String timeString = entry.getKey();
Time time = entry.getValue();
assertEquals(time, DateTime.parseTime(timeString, IST));
}
}
@Test
public void testFormatTimeGMT() throws Exception {
// Test absolute UNIX epoch values in GMT,
@ -129,6 +165,7 @@ public class DateTimeTest {
.put("12:34:00", new Time(45240000L))
.put("01:23:00", new Time(4980000L))
.put("12:34:56", new Time(45296000L))
.put("-01:23:00", new Time(-4980000L))
.put("-12:34:56", new Time(-45296000L))
.put("812:34:56", new Time(2925296000L))
.put("-812:34:56", new Time(-2925296000L))
@ -142,6 +179,40 @@ public class DateTimeTest {
}
}
@Test
public void testFormatTimePST() throws Exception {
// Test absolute UNIX epoch values in a negative GMT offset.
final Map<String, Time> TEST_TIMES =
new ImmutableMap.Builder<String, Time>()
.put("-08:00:00", new Time(0))
.put("04:34:56", new Time(45296000L))
.put("-20:34:56", new Time(-45296000L))
.build();
for (Map.Entry<String, Time> entry : TEST_TIMES.entrySet()) {
String timeString = entry.getKey();
Time time = entry.getValue();
assertEquals(timeString, DateTime.formatTime(time, PST));
}
}
@Test
public void testFormatTimeIST() throws Exception {
// Test absolute UNIX epoch values in a positive GMT offset.
final Map<String, Time> TEST_TIMES =
new ImmutableMap.Builder<String, Time>()
.put("05:30:00", new Time(0))
.put("18:04:56", new Time(45296000L))
.put("-07:04:56", new Time(-45296000L))
.build();
for (Map.Entry<String, Time> entry : TEST_TIMES.entrySet()) {
String timeString = entry.getKey();
Time time = entry.getValue();
assertEquals(timeString, DateTime.formatTime(time, IST));
}
}
@Test
public void testParseTimestamp() throws Exception {
// Check that our default time zone matches valueOf().

Просмотреть файл

@ -74,6 +74,7 @@ class TestBackup(unittest.TestCase):
environment.topo_server().wipe()
for t in [tablet_master, tablet_replica1, tablet_replica2]:
t.reset_replication()
t.set_semi_sync_enabled(master=False)
t.clean_dbs()
_create_vt_insert_test = '''create table vt_insert_test (

Просмотреть файл

@ -15,24 +15,24 @@ import environment
import tablet
import utils
# shards
# shards need at least 1 replica for semi-sync ACK, and 1 rdonly for SplitQuery.
shard_0_master = tablet.Tablet()
shard_0_replica = tablet.Tablet()
shard_0_rdonly = tablet.Tablet()
shard_1_master = tablet.Tablet()
shard_1_replica = tablet.Tablet()
shard_1_rdonly = tablet.Tablet()
all_tablets = [shard_0_master, shard_0_replica, shard_0_rdonly,
shard_1_master, shard_1_replica, shard_1_rdonly]
def setUpModule():
try:
environment.topo_server().setup()
setup_procs = [
shard_0_master.init_mysql(),
shard_0_rdonly.init_mysql(),
shard_1_master.init_mysql(),
shard_1_rdonly.init_mysql(),
]
setup_procs = [t.init_mysql() for t in all_tablets]
utils.Vtctld().start()
utils.VtGate().start()
utils.wait_procs(setup_procs)
@ -46,22 +46,15 @@ def tearDownModule():
if utils.options.skip_teardown:
return
teardown_procs = [
shard_0_master.teardown_mysql(),
shard_0_rdonly.teardown_mysql(),
shard_1_master.teardown_mysql(),
shard_1_rdonly.teardown_mysql(),
]
teardown_procs = [t.teardown_mysql() for t in all_tablets]
utils.wait_procs(teardown_procs, raise_on_error=False)
environment.topo_server().teardown()
utils.kill_sub_processes()
utils.remove_tmp_files()
shard_0_master.remove_tree()
shard_0_rdonly.remove_tree()
shard_1_master.remove_tree()
shard_1_rdonly.remove_tree()
for t in all_tablets:
t.remove_tree()
class TestCustomSharding(unittest.TestCase):
@ -118,11 +111,12 @@ class TestCustomSharding(unittest.TestCase):
# start the first shard only for now
shard_0_master.init_tablet('master', 'test_keyspace', '0')
shard_0_replica.init_tablet('replica', 'test_keyspace', '0')
shard_0_rdonly.init_tablet('rdonly', 'test_keyspace', '0')
for t in [shard_0_master, shard_0_rdonly]:
for t in [shard_0_master, shard_0_replica, shard_0_rdonly]:
t.create_db('vt_test_keyspace')
t.start_vttablet(wait_for_state=None)
for t in [shard_0_master, shard_0_rdonly]:
for t in [shard_0_master, shard_0_replica, shard_0_rdonly]:
t.wait_for_vttablet_state('SERVING')
utils.run_vtctl(['InitShardMaster', 'test_keyspace/0',
@ -143,7 +137,7 @@ primary key (id)
auto_log=True)
# reload schema everywhere so the QueryService knows about the tables
for t in [shard_0_master, shard_0_rdonly]:
for t in [shard_0_master, shard_0_replica, shard_0_rdonly]:
utils.run_vtctl(['ReloadSchema', t.tablet_alias], auto_log=True)
# insert data on shard 0
@ -154,10 +148,11 @@ primary key (id)
# create shard 1
shard_1_master.init_tablet('master', 'test_keyspace', '1')
shard_1_replica.init_tablet('replica', 'test_keyspace', '1')
shard_1_rdonly.init_tablet('rdonly', 'test_keyspace', '1')
for t in [shard_1_master, shard_1_rdonly]:
for t in [shard_1_master, shard_1_replica, shard_1_rdonly]:
t.start_vttablet(wait_for_state=None)
for t in [shard_1_master, shard_1_rdonly]:
for t in [shard_1_master, shard_1_replica, shard_1_rdonly]:
t.wait_for_vttablet_state('NOT_SERVING')
s = utils.run_vtctl_json(['GetShard', 'test_keyspace/1'])
self.assertEqual(len(s['served_types']), 3)
@ -166,7 +161,7 @@ primary key (id)
shard_1_master.tablet_alias], auto_log=True)
utils.run_vtctl(['CopySchemaShard', shard_0_rdonly.tablet_alias,
'test_keyspace/1'], auto_log=True)
for t in [shard_1_master, shard_1_rdonly]:
for t in [shard_1_master, shard_1_replica, shard_1_rdonly]:
utils.run_vtctl(['RefreshState', t.tablet_alias], auto_log=True)
t.wait_for_vttablet_state('SERVING')
@ -189,7 +184,7 @@ primary key (id)
auto_log=True)
# reload schema everywhere so the QueryService knows about the tables
for t in [shard_0_master, shard_0_rdonly, shard_1_master, shard_1_rdonly]:
for t in all_tablets:
utils.run_vtctl(['ReloadSchema', t.tablet_alias], auto_log=True)
# insert and read data on all shards
@ -227,7 +222,7 @@ primary key (id)
q['query']['sql'],
'test_keyspace', ','.join(q['shard_part']['shards']),
tablet_type='master', bindvars=bindvars)
for r in qr['Rows']:
for r in qr['rows']:
rows[int(r[0])] = r[1]
self.assertEqual(len(rows), 20)
expected = {}
@ -240,7 +235,8 @@ primary key (id)
def _check_shards_count_in_srv_keyspace(self, shard_count):
ks = utils.run_vtctl_json(['GetSrvKeyspace', 'test_nj', 'test_keyspace'])
check_types = set([topodata_pb2.MASTER, topodata_pb2.RDONLY])
check_types = set([topodata_pb2.MASTER, topodata_pb2.REPLICA,
topodata_pb2.RDONLY])
for p in ks['partitions']:
if p['served_type'] in check_types:
self.assertEqual(len(p['shard_references']), shard_count)
@ -262,17 +258,21 @@ primary key (id)
bindvars=[id_val, name_val],
keyspace='test_keyspace', shard=str(shard))
want = ['Index\tid\tname', '0\t%d\t%s' % (id_val, name_val)]
want = {
u'fields': [u'id', u'name'],
u'rows': [[unicode(id_val), unicode(name_val)]]
}
# read non-streaming
out, _ = utils.vtgate.vtclient(
'select * from data where id = :v1', bindvars=[id_val],
keyspace='test_keyspace', shard=str(shard))
keyspace='test_keyspace', shard=str(shard), json_output=True)
self.assertEqual(out, want)
# read streaming
out, _ = utils.vtgate.vtclient(
'select * from data where id = :v1', bindvars=[id_val],
keyspace='test_keyspace', shard=str(shard), streaming=True)
keyspace='test_keyspace', shard=str(shard), streaming=True,
json_output=True)
self.assertEqual(out, want)

Просмотреть файл

@ -21,7 +21,12 @@ class TestEnv(object):
self.tablet_map = {}
def launch(
self, keyspace, shards=None, replica_count=0, rdonly_count=0, ddls=None):
self, keyspace, shards=None, replica_count=1, rdonly_count=0, ddls=None):
"""Launch test environment."""
if replica_count < 1:
raise Exception('replica_count=%d < 1; tests now use semi-sync'
' and must have at least one replica' % replica_count)
self.tablets = []
utils.run_vtctl(['CreateKeyspace', keyspace])
if not shards or shards[0] == '0':
@ -52,8 +57,6 @@ class TestEnv(object):
if t.tablet_type == 'master':
utils.run_vtctl(['InitShardMaster', keyspace+'/'+t.shard,
t.tablet_alias], auto_log=True)
# Force read-write even if there are no replicas.
utils.run_vtctl(['SetReadWrite', t.tablet_alias], auto_log=True)
for ddl in ddls:
fname = os.path.join(environment.tmproot, 'ddl.sql')
@ -70,6 +73,8 @@ class TestEnv(object):
t.remove_tree()
def _start_tablet(self, keyspace, shard, tablet_type, index):
"""Start a tablet."""
t = tablet.Tablet()
self.tablets.append(t)
if tablet_type == 'master':

Просмотреть файл

@ -1,4 +1,5 @@
#!/usr/bin/env python
"""Define abstractions for various MySQL flavors."""
import environment
import logging
@ -28,6 +29,15 @@ class MysqlFlavor(object):
def change_master_commands(self, host, port, pos):
raise NotImplementedError()
def set_semi_sync_enabled_commands(self, master=None, slave=None):
"""Returns commands to turn semi-sync on/off."""
cmds = []
if master is not None:
cmds.append("SET GLOBAL rpl_semi_sync_master_enabled = %d" % master)
if slave is not None:
cmds.append("SET GLOBAL rpl_semi_sync_slave_enabled = %d" % master)
return cmds
def extra_my_cnf(self):
"""Returns the path to an extra my_cnf file, or None."""
return None
@ -157,6 +167,11 @@ def mysql_flavor():
def set_mysql_flavor(flavor):
"""Set the object that will be returned by mysql_flavor().
If flavor is not specified, set it based on MYSQL_FLAVOR environment variable.
"""
global __mysql_flavor
if not flavor:

Просмотреть файл

@ -64,6 +64,7 @@ class TestMysqlctl(unittest.TestCase):
tablet.Tablet.check_vttablet_count()
for t in [master_tablet, replica_tablet]:
t.reset_replication()
t.set_semi_sync_enabled(master=False)
t.clean_dbs()
def test_mysqlctl_restart(self):

Просмотреть файл

@ -67,6 +67,7 @@ class TestReparent(unittest.TestCase):
environment.topo_server().wipe()
for t in [tablet_62344, tablet_62044, tablet_41983, tablet_31981]:
t.reset_replication()
t.set_semi_sync_enabled(master=False)
t.clean_dbs()
super(TestReparent, self).tearDown()
@ -320,13 +321,7 @@ class TestReparent(unittest.TestCase):
self._check_master_cell('test_nj', shard_id, 'test_nj')
self._check_master_cell('test_ny', shard_id, 'test_nj')
# Convert two replica to spare. That should leave only one node
# serving traffic, but still needs to appear in the replication
# graph.
utils.run_vtctl(['ChangeSlaveType', tablet_41983.tablet_alias, 'spare'])
utils.run_vtctl(['ChangeSlaveType', tablet_31981.tablet_alias, 'spare'])
utils.validate_topology()
self._check_db_addr(shard_id, 'replica', tablet_62044.port)
# Run this to make sure it succeeds.
utils.run_vtctl(['ShardReplicationPositions', 'test_keyspace/' + shard_id],
@ -569,13 +564,12 @@ class TestReparent(unittest.TestCase):
wait_for_start=False)
tablet_31981.init_tablet('replica', 'test_keyspace', shard_id, start=True,
wait_for_start=False)
tablet_41983.init_tablet('spare', 'test_keyspace', shard_id, start=True,
tablet_41983.init_tablet('replica', 'test_keyspace', shard_id, start=True,
wait_for_start=False)
# wait for all tablets to start
for t in [tablet_62344, tablet_62044, tablet_31981]:
for t in [tablet_62344, tablet_62044, tablet_31981, tablet_41983]:
t.wait_for_vttablet_state('SERVING')
tablet_41983.wait_for_vttablet_state('NOT_SERVING')
# Recompute the shard layout node - until you do that, it might not be
# valid.

Просмотреть файл

@ -472,7 +472,7 @@ primary key (name)
shard_0_ny_rdonly.init_tablet('rdonly', 'test_keyspace', '-80')
shard_1_master.init_tablet('master', 'test_keyspace', '80-')
shard_1_slave1.init_tablet('replica', 'test_keyspace', '80-')
shard_1_slave2.init_tablet('spare', 'test_keyspace', '80-')
shard_1_slave2.init_tablet('replica', 'test_keyspace', '80-')
shard_1_ny_rdonly.init_tablet('rdonly', 'test_keyspace', '80-')
shard_1_rdonly1.init_tablet('rdonly', 'test_keyspace', '80-')
@ -497,7 +497,7 @@ primary key (name)
shard_0_ny_rdonly.wait_for_vttablet_state('SERVING')
shard_1_master.wait_for_vttablet_state('SERVING')
shard_1_slave1.wait_for_vttablet_state('SERVING')
shard_1_slave2.wait_for_vttablet_state('NOT_SERVING') # spare
shard_1_slave2.wait_for_vttablet_state('SERVING')
shard_1_ny_rdonly.wait_for_vttablet_state('SERVING')
shard_1_rdonly1.wait_for_vttablet_state('SERVING')
@ -521,10 +521,10 @@ primary key (name)
# create the split shards
shard_2_master.init_tablet('master', 'test_keyspace', '80-c0')
shard_2_replica1.init_tablet('spare', 'test_keyspace', '80-c0')
shard_2_replica2.init_tablet('spare', 'test_keyspace', '80-c0')
shard_2_replica1.init_tablet('replica', 'test_keyspace', '80-c0')
shard_2_replica2.init_tablet('replica', 'test_keyspace', '80-c0')
shard_3_master.init_tablet('master', 'test_keyspace', 'c0-')
shard_3_replica.init_tablet('spare', 'test_keyspace', 'c0-')
shard_3_replica.init_tablet('replica', 'test_keyspace', 'c0-')
shard_3_rdonly1.init_tablet('rdonly', 'test_keyspace', 'c0-')
# start vttablet on the split shards (no db created,

Просмотреть файл

@ -19,6 +19,11 @@ warnings.simplefilter('ignore')
master_tablet = tablet.Tablet()
replica_tablet = tablet.Tablet()
# Second replica to provide semi-sync ACKs while testing
# scenarios when the first replica is down.
replica2_tablet = tablet.Tablet()
all_tablets = [master_tablet, replica_tablet, replica2_tablet]
create_vt_insert_test = '''create table vt_insert_test (
id bigint auto_increment,
@ -32,9 +37,7 @@ def setUpModule():
environment.topo_server().setup()
# start mysql instance external to the test
setup_procs = [master_tablet.init_mysql(),
replica_tablet.init_mysql()]
utils.wait_procs(setup_procs)
utils.wait_procs([t.init_mysql() for t in all_tablets])
# start a vtctld so the vtctl insert commands are just RPCs, not forks
utils.Vtctld().start()
@ -44,15 +47,16 @@ def setUpModule():
utils.run_vtctl(['CreateKeyspace', 'test_keyspace'])
master_tablet.init_tablet('master', 'test_keyspace', '0')
replica_tablet.init_tablet('replica', 'test_keyspace', '0')
replica2_tablet.init_tablet('replica', 'test_keyspace', '0')
utils.validate_topology()
master_tablet.populate('vt_test_keyspace', create_vt_insert_test)
replica_tablet.populate('vt_test_keyspace', create_vt_insert_test)
for t in all_tablets:
t.populate('vt_test_keyspace', create_vt_insert_test)
master_tablet.start_vttablet(memcache=True, wait_for_state=None)
replica_tablet.start_vttablet(memcache=True, wait_for_state=None)
master_tablet.wait_for_vttablet_state('SERVING')
replica_tablet.wait_for_vttablet_state('SERVING')
for t in all_tablets:
t.start_vttablet(memcache=True, wait_for_state=None)
for t in all_tablets:
t.wait_for_vttablet_state('SERVING')
utils.run_vtctl(['InitShardMaster', 'test_keyspace/0',
master_tablet.tablet_alias], auto_log=True)
@ -71,16 +75,15 @@ def tearDownModule():
if utils.options.skip_teardown:
return
logging.debug('Tearing down the servers and setup')
tablet.kill_tablets([master_tablet, replica_tablet])
teardown_procs = [master_tablet.teardown_mysql(),
replica_tablet.teardown_mysql()]
utils.wait_procs(teardown_procs, raise_on_error=False)
tablet.kill_tablets(all_tablets)
utils.wait_procs([t.teardown_mysql() for t in all_tablets],
raise_on_error=False)
environment.topo_server().teardown()
utils.kill_sub_processes()
utils.remove_tmp_files()
master_tablet.remove_tree()
replica_tablet.remove_tree()
for t in all_tablets:
t.remove_tree()
class MultiDict(dict):
@ -298,7 +301,7 @@ class RowCacheInvalidator(unittest.TestCase):
def _exec_replica_query(self, query):
result = replica_tablet.execute(query, auto_log=False)
return result['Rows']
return result['rows']
if __name__ == '__main__':

Просмотреть файл

@ -118,6 +118,8 @@ def _teardown_shard_2():
['DeleteShard', '-recursive', 'test_keyspace/2'], auto_log=True)
for t in shard_2_tablets:
t.reset_replication()
t.set_semi_sync_enabled(master=False)
t.clean_dbs()
@ -291,7 +293,7 @@ class TestSchema(unittest.TestCase):
shard_2_schema = self._get_schema(shard_2_master.tablet_alias)
self.assertIn('utf8', shard_2_schema['database_schema'])
utils.run_vtctl_json(
['ExecuteFetchAsDba', shard_2_master.tablet_alias,
['ExecuteFetchAsDba', '-json', shard_2_master.tablet_alias,
'ALTER DATABASE vt_test_keyspace CHARACTER SET latin1'])
_, stderr = utils.run_vtctl(['CopySchemaShard',

Просмотреть файл

@ -173,12 +173,12 @@ class TestSharded(unittest.TestCase):
sql = 'select id, msg from vt_select_test order by id'
qr = shard_0_master.execute(sql)
self.assertEqual(qr['Rows'], [[1, 'test 1'],])
self.assertEqual(qr['rows'], [[1, 'test 1'],])
qr = shard_1_master.execute(sql)
self.assertEqual(qr['Rows'], [[10, 'test 10'],])
self.assertEqual(qr['rows'], [[10, 'test 10'],])
_, stderr = utils.run_vtctl(['VtTabletExecute',
_, stderr = utils.run_vtctl(['VtTabletExecute', '-json',
'-keyspace', 'test_keyspace',
'-shard', '-90',
shard_0_master.tablet_alias, sql],

Просмотреть файл

@ -266,6 +266,12 @@ class Tablet(object):
def reset_replication(self):
self.mquery('', mysql_flavor().reset_replication_commands())
def set_semi_sync_enabled(self, master=None, slave=None):
logging.debug('mysql(%s): setting semi-sync mode: master=%s, slave=%s',
self.tablet_uid, master, slave)
self.mquery('',
mysql_flavor().set_semi_sync_enabled_commands(master, slave))
def populate(self, dbname, create_sql, insert_sqls=None):
self.create_db(dbname)
if isinstance(create_sql, basestring):
@ -342,6 +348,8 @@ class Tablet(object):
tablet_index=None,
start=False, dbname=None, parent=True, wait_for_start=True,
include_mysql_port=True, **kwargs):
"""Initialize a tablet's record in topology."""
self.tablet_type = tablet_type
self.keyspace = keyspace
self.shard = shard
@ -399,7 +407,7 @@ class Tablet(object):
extra_args=None, extra_env=None, include_mysql_port=True,
init_tablet_type=None, init_keyspace=None,
init_shard=None, init_db_name_override=None,
supports_backups=False, grace_period='1s'):
supports_backups=False, grace_period='1s', enable_semi_sync=True):
"""Starts a vttablet process, and returns it.
The process is also saved in self.proc, so it's easy to kill as well.
@ -422,6 +430,8 @@ class Tablet(object):
args.extend(['-binlog_player_healthcheck_retry_delay', '1s'])
args.extend(['-binlog_player_retry_delay', '1s'])
args.extend(['-pid_file', os.path.join(self.tablet_dir, 'vttablet.pid')])
if enable_semi_sync:
args.append('-enable_semi_sync')
if self.use_mysqlctld:
args.extend(
['-mysqlctl_socket', os.path.join(self.tablet_dir, 'mysqlctl.sock')])
@ -722,7 +732,7 @@ class Tablet(object):
the result of running vtctl command.
"""
args = [
'VtTabletExecute',
'VtTabletExecute', '-json',
'-keyspace', self.keyspace,
'-shard', self.shard,
]

Просмотреть файл

@ -66,6 +66,7 @@ class TestTabletManager(unittest.TestCase):
environment.topo_server().wipe()
for t in [tablet_62344, tablet_62044]:
t.reset_replication()
t.set_semi_sync_enabled(master=False)
t.clean_dbs()
def _check_srv_shard(self):
@ -98,12 +99,12 @@ class TestTabletManager(unittest.TestCase):
# make sure the query service is started right away
qr = tablet_62344.execute('select * from vt_select_test')
self.assertEqual(len(qr['Rows']), 4,
self.assertEqual(len(qr['rows']), 4,
'expected 4 rows in vt_select_test: %s' % str(qr))
# make sure direct dba queries work
query_result = utils.run_vtctl_json(
['ExecuteFetchAsDba', tablet_62344.tablet_alias,
['ExecuteFetchAsDba', '-json', tablet_62344.tablet_alias,
'select * from vt_test_keyspace.vt_select_test'])
self.assertEqual(
len(query_result['rows']), 4,

Просмотреть файл

@ -585,13 +585,15 @@ class VtGate(object):
def vtclient(self, sql, keyspace=None, shard=None, tablet_type='master',
bindvars=None, streaming=False,
verbose=False, raise_on_error=True):
verbose=False, raise_on_error=True, json_output=False):
"""Uses the vtclient binary to send a query to vtgate."""
protocol, addr = self.rpc_endpoint()
args = environment.binary_args('vtclient') + [
'-server', addr,
'-tablet_type', tablet_type,
'-vtgate_protocol', protocol]
if json_output:
args.append('-json')
if keyspace:
args.extend(['-keyspace', keyspace])
if shard:
@ -605,13 +607,14 @@ class VtGate(object):
args.append(sql)
out, err = run(args, raise_on_error=raise_on_error, trap_output=True)
out = out.splitlines()
if json_output:
return json.loads(out), err
return out, err
def execute(self, sql, tablet_type='master', bindvars=None):
"""Uses 'vtctl VtGateExecute' to execute a command."""
_, addr = self.rpc_endpoint()
args = ['VtGateExecute',
args = ['VtGateExecute', '-json',
'-server', addr,
'-tablet_type', tablet_type]
if bindvars:
@ -623,7 +626,7 @@ class VtGate(object):
bindvars=None):
"""Uses 'vtctl VtGateExecuteShards' to execute a command."""
_, addr = self.rpc_endpoint()
args = ['VtGateExecuteShards',
args = ['VtGateExecuteShards', '-json',
'-server', addr,
'-keyspace', keyspace,
'-shards', shards,

Просмотреть файл

@ -314,7 +314,7 @@ index by_msg (msg)
for table in ['moving1', 'moving2']:
if expected and 'moving.*' in expected:
# table is blacklisted, should get the error
_, stderr = utils.run_vtctl(['VtTabletExecute',
_, stderr = utils.run_vtctl(['VtTabletExecute', '-json',
'-keyspace', t.keyspace,
'-shard', t.shard,
t.tablet_alias,
@ -327,7 +327,7 @@ index by_msg (msg)
# table is not blacklisted, should just work
qr = t.execute('select count(1) from %s' % table)
logging.debug('Got %s rows from table %s on tablet %s',
qr['Rows'][0][0], table, t.tablet_alias)
qr['rows'][0][0], table, t.tablet_alias)
def _check_client_conn_redirection(
self, destination_ks, servedfrom_db_types,

Просмотреть файл

@ -15,13 +15,11 @@ import utils
# range '' - 80
shard_0_master = tablet.Tablet()
shard_0_replica = tablet.Tablet()
shard_0_spare = tablet.Tablet()
# range 80 - ''
shard_1_master = tablet.Tablet()
shard_1_replica = tablet.Tablet()
# all tablets
tablets = [shard_0_master, shard_0_replica, shard_1_master, shard_1_replica,
shard_0_spare]
tablets = [shard_0_master, shard_0_replica, shard_1_master, shard_1_replica]
def setUpModule():
@ -67,8 +65,7 @@ class TestVtctld(unittest.TestCase):
'redirected_keyspace'])
shard_0_master.init_tablet('master', 'test_keyspace', '-80')
shard_0_replica.init_tablet('spare', 'test_keyspace', '-80')
shard_0_spare.init_tablet('spare', 'test_keyspace', '-80')
shard_0_replica.init_tablet('replica', 'test_keyspace', '-80')
shard_1_master.init_tablet('master', 'test_keyspace', '80-')
shard_1_replica.init_tablet('replica', 'test_keyspace', '80-')
@ -86,18 +83,11 @@ class TestVtctld(unittest.TestCase):
target_tablet_type='replica',
wait_for_state=None)
shard_0_spare.start_vttablet(wait_for_state=None,
extra_args=utils.vtctld.process_args())
# wait for the right states
for t in [shard_0_master, shard_1_master, shard_1_replica]:
t.wait_for_vttablet_state('SERVING')
for t in [shard_0_replica, shard_0_spare]:
t.wait_for_vttablet_state('NOT_SERVING')
shard_0_replica.wait_for_vttablet_state('NOT_SERVING')
for t in [shard_0_master, shard_0_replica, shard_0_spare,
shard_1_master, shard_1_replica]:
t.reset_replication()
utils.run_vtctl(['InitShardMaster', 'test_keyspace/-80',
shard_0_master.tablet_alias], auto_log=True)
utils.run_vtctl(['InitShardMaster', 'test_keyspace/80-',
@ -174,5 +164,19 @@ class TestVtctld(unittest.TestCase):
break
vtctld_connection.close()
def test_execute_fetch_as_dba(self):
"""Make sure ExecuteFetchAsDba prints a human-readable table by default."""
# Use a simple example so we're not sensitive to alignment settings, etc.
# All we care is that it's the human-readable table, not JSON or protobuf.
out, _ = utils.run_vtctl(['ExecuteFetchAsDba', shard_0_replica.tablet_alias,
'SELECT 1 AS a'], trap_output=True)
want = """+---+
| a |
+---+
| 1 |
+---+
"""
self.assertEqual(want, out)
if __name__ == '__main__':
utils.main()

Просмотреть файл

@ -829,8 +829,6 @@ class TestFailures(BaseTestCase):
self.master_tablet = shard_1_master
self.master_tablet.kill_vttablet()
self.tablet_start(self.master_tablet, 'replica')
utils.run_vtctl(['InitShardMaster', KEYSPACE_NAME+'/-80',
shard_0_master.tablet_alias], auto_log=True)
self.master_tablet.wait_for_vttablet_state('SERVING')
self.replica_tablet = shard_1_replica1
self.replica_tablet.kill_vttablet()

Просмотреть файл

@ -859,8 +859,11 @@ class TestVTGateFunctions(unittest.TestCase):
out, _ = utils.vtgate.vtclient(
'select user_id, email from vt_user_extra where user_id = :v1',
bindvars=[10])
self.assertEqual(out, ['Index\tuser_id\temail', '0\t10\ttest 10'])
bindvars=[10], json_output=True)
self.assertEqual(out, {
u'fields': [u'user_id', u'email'],
u'rows': [[u'10', u'test 10']],
})
utils.vtgate.vtclient(
'update vt_user_extra set email=:v2 where user_id = :v1',
@ -868,17 +871,22 @@ class TestVTGateFunctions(unittest.TestCase):
out, _ = utils.vtgate.vtclient(
'select user_id, email from vt_user_extra where user_id = :v1',
bindvars=[10],
streaming=True)
self.assertEqual(out, ['Index\tuser_id\temail', '0\t10\ttest 1000'])
bindvars=[10], streaming=True, json_output=True)
self.assertEqual(out, {
u'fields': [u'user_id', u'email'],
u'rows': [[u'10', u'test 1000']],
})
utils.vtgate.vtclient(
'delete from vt_user_extra where user_id = :v1', bindvars=[10])
out, _ = utils.vtgate.vtclient(
'select user_id, email from vt_user_extra where user_id = :v1',
bindvars=[10])
self.assertEqual(out, ['Index\tuser_id\temail'])
bindvars=[10], json_output=True)
self.assertEqual(out, {
u'fields': [u'user_id', u'email'],
u'rows': None,
})
def test_vtctl_vtgate_execute(self):
"""This test uses 'vtctl VtGateExecute' to send and receive various queries.
@ -891,8 +899,8 @@ class TestVTGateFunctions(unittest.TestCase):
'select user_id, email from vt_user_extra where user_id = :user_id',
bindvars={'user_id': 11})
logging.debug('Original row: %s', str(qr))
self.assertEqual(len(qr['Rows']), 1)
v = qr['Rows'][0][1]
self.assertEqual(len(qr['rows']), 1)
v = qr['rows'][0][1]
self.assertEqual(v, 'test 11')
utils.vtgate.execute(
@ -903,8 +911,8 @@ class TestVTGateFunctions(unittest.TestCase):
'select user_id, email from vt_user_extra where user_id = :user_id',
bindvars={'user_id': 11})
logging.debug('Modified row: %s', str(qr))
self.assertEqual(len(qr['Rows']), 1)
v = qr['Rows'][0][1]
self.assertEqual(len(qr['rows']), 1)
v = qr['rows'][0][1]
self.assertEqual(v, 'test 1100')
utils.vtgate.execute(
@ -914,7 +922,7 @@ class TestVTGateFunctions(unittest.TestCase):
qr = utils.vtgate.execute(
'select user_id, email from vt_user_extra where user_id = :user_id',
bindvars={'user_id': 11})
self.assertEqual(len(qr['Rows'] or []), 0)
self.assertEqual(len(qr['rows'] or []), 0)
if __name__ == '__main__':
utils.main()

Просмотреть файл

@ -384,6 +384,7 @@ class TestBaseSplitClone(unittest.TestCase):
for shard_tablet in [all_shard_tablets, shard_0_tablets, shard_1_tablets]:
for t in shard_tablet.all_tablets:
t.reset_replication()
t.set_semi_sync_enabled(master=False)
t.clean_dbs()
t.kill_vttablet()
# we allow failures here as some tablets will be gone sometimes
@ -420,12 +421,18 @@ class TestBaseSplitCloneResiliency(TestBaseSplitClone):
6. Verify that the data was copied successfully to both new shards
Args:
mysql_down: boolean, True iff we expect the MySQL instances on the
destination masters to be down.
mysql_down: boolean. If True, we take down the MySQL instances on the
destination masters at first, then bring them back and reparent away.
Raises:
AssertionError if things didn't go as expected.
"""
if mysql_down:
logging.debug('Shutting down mysqld on destination masters.')
utils.wait_procs(
[shard_0_master.shutdown_mysql(),
shard_1_master.shutdown_mysql()])
worker_proc, worker_port, worker_rpc_port = utils.run_vtworker_bg(
['--cell', 'test_nj'],
auto_log=True)
@ -450,12 +457,21 @@ class TestBaseSplitCloneResiliency(TestBaseSplitClone):
"expected vtworker to retry, but it didn't")
logging.debug('Worker has resolved at least twice, starting reparent now')
# Original masters have no running MySQL, so need to force the reparent.
# Bring back masters. Since we test with semi-sync now, we need at least
# one replica for the new master. This test is already quite expensive,
# so we bring back the old master as a replica rather than having a third
# replica up the whole time.
logging.debug('Restarting mysqld on destination masters')
utils.wait_procs(
[shard_0_master.start_mysql(),
shard_1_master.start_mysql()])
# Reparent away from the old masters.
utils.run_vtctl(
['EmergencyReparentShard', 'test_keyspace/-80',
['PlannedReparentShard', 'test_keyspace/-80',
shard_0_replica.tablet_alias], auto_log=True)
utils.run_vtctl(
['EmergencyReparentShard', 'test_keyspace/80-',
['PlannedReparentShard', 'test_keyspace/80-',
shard_1_replica.tablet_alias], auto_log=True)
else:
@ -523,35 +539,6 @@ class TestReparentDuringWorkerCopy(TestBaseSplitCloneResiliency):
class TestMysqlDownDuringWorkerCopy(TestBaseSplitCloneResiliency):
def setUp(self):
"""Shuts down MySQL on the destination masters.
Also runs base setup.
"""
try:
logging.debug('Starting base setup for MysqlDownDuringWorkerCopy')
super(TestMysqlDownDuringWorkerCopy, self).setUp()
logging.debug('Starting MysqlDownDuringWorkerCopy-specific setup')
utils.wait_procs(
[shard_0_master.shutdown_mysql(),
shard_1_master.shutdown_mysql()])
logging.debug('Finished MysqlDownDuringWorkerCopy-specific setup')
except:
self.tearDown()
raise
def tearDown(self):
"""Restarts the MySQL processes that were killed during the setup."""
logging.debug('Starting MysqlDownDuringWorkerCopy-specific tearDown')
utils.wait_procs(
[shard_0_master.start_mysql(),
shard_1_master.start_mysql()])
logging.debug('Finished MysqlDownDuringWorkerCopy-specific tearDown')
super(TestMysqlDownDuringWorkerCopy, self).tearDown()
logging.debug('Finished base tearDown for MysqlDownDuringWorkerCopy')
def test_mysql_down_during_worker_copy(self):
"""This test simulates MySQL being down on the destination masters."""
self.verify_successful_worker_copy_with_reparent(mysql_down=True)