From 512a73ef69c2831c6236ea51a6e559b2778377c7 Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Fri, 2 Oct 2015 09:54:20 -0700 Subject: [PATCH 1/3] Revert "Revert "Merge branch 'suguwork'"" This reverts commit 8062499471015221e0cf5926c3dbc476d311e5b8. --- .../endtoend/framework/logtracker.go | 72 ++- .../tabletserver/endtoend/framework/server.go | 1 + go/vt/tabletserver/endtoend/nocache_test.go | 451 +++++++++++++++++- .../tabletserver/endtoend/transaction_test.go | 269 +++++------ go/vt/tabletserver/query_executor.go | 2 +- go/vt/tabletserver/schema_info.go | 9 +- go/vt/tabletserver/tabletserver.go | 49 ++ 7 files changed, 684 insertions(+), 169 deletions(-) diff --git a/go/vt/tabletserver/endtoend/framework/logtracker.go b/go/vt/tabletserver/endtoend/framework/logtracker.go index 1c312d6dca..7bec1b4eb4 100644 --- a/go/vt/tabletserver/endtoend/framework/logtracker.go +++ b/go/vt/tabletserver/endtoend/framework/logtracker.go @@ -5,12 +5,13 @@ package framework import ( + "errors" "time" "github.com/youtube/vitess/go/vt/tabletserver" ) -// TxFetcher alows you to capture and fetch info on the next transaction. +// TxFetcher allows you to capture and fetch info on the next transaction. type TxFetcher struct { start time.Time ch chan interface{} @@ -25,13 +26,72 @@ func NewTxFetcher() *TxFetcher { } // Fetch fetches the last captured transaction. It must be called only once. -func (fetcher *TxFetcher) Fetch() *tabletserver.TxConnection { +// If the wait is longer than one second, it returns an error. +func (fetcher *TxFetcher) Fetch() (*tabletserver.TxConnection, error) { + tmr := time.NewTimer(1 * time.Second) + defer tmr.Stop() for { - tx := (<-fetcher.ch).(*tabletserver.TxConnection) - // Skip any events that pre-date start time. - if tx.EndTime.After(fetcher.start) { + select { + case itx := <-fetcher.ch: + tx := itx.(*tabletserver.TxConnection) + // Skip any events that pre-date start time. + if tx.EndTime.Before(fetcher.start) { + continue + } tabletserver.TxLogger.Unsubscribe(fetcher.ch) - return tx + return tx, nil + case <-tmr.C: + return nil, errors.New("error waiting for transaction event") + } + } +} + +// QueryFetcher allows you to capture and fetch queries that are being +// executed by TabletServer. +type QueryFetcher struct { + start time.Time + ch chan interface{} + queries chan *tabletserver.LogStats +} + +// NewQueryFetcher sets up the capture and retuns a QueryFetcher. +// It has a buffer size of 20. You must call Close once done. +func NewQueryFetcher() *QueryFetcher { + fetcher := &QueryFetcher{ + start: time.Now(), + ch: tabletserver.StatsLogger.Subscribe("endtoend"), + queries: make(chan *tabletserver.LogStats, 20), + } + go func() { + for log := range fetcher.ch { + fetcher.queries <- log.(*tabletserver.LogStats) + } + close(fetcher.queries) + }() + return fetcher +} + +// Close closes the QueryFetcher. +func (fetcher *QueryFetcher) Close() { + tabletserver.StatsLogger.Unsubscribe(fetcher.ch) + close(fetcher.ch) +} + +// Next fetches the next captured query. +// If the wait is longer than one second, it returns an error. +func (fetcher *QueryFetcher) Next() (*tabletserver.LogStats, error) { + tmr := time.NewTimer(1 * time.Second) + defer tmr.Stop() + for { + select { + case query := <-fetcher.queries: + // Skip any events that pre-date start time. + if query.EndTime.Before(fetcher.start) { + continue + } + return query, nil + case <-tmr.C: + return nil, errors.New("error waiting for query event") } } } diff --git a/go/vt/tabletserver/endtoend/framework/server.go b/go/vt/tabletserver/endtoend/framework/server.go index 91c41d7a60..6db95b4f41 100644 --- a/go/vt/tabletserver/endtoend/framework/server.go +++ b/go/vt/tabletserver/endtoend/framework/server.go @@ -57,6 +57,7 @@ func StartDefaultServer(connParams sqldb.ConnParams) error { BaseConfig = tabletserver.DefaultQsConfig BaseConfig.RowCache.Binary = vttest.MemcachedPath() BaseConfig.RowCache.Socket = path.Join(os.TempDir(), "memcache.sock") + BaseConfig.RowCache.Connections = 100 BaseConfig.EnableAutoCommit = true Target = query.Target{ diff --git a/go/vt/tabletserver/endtoend/nocache_test.go b/go/vt/tabletserver/endtoend/nocache_test.go index 1d056507fb..686cddfeb7 100644 --- a/go/vt/tabletserver/endtoend/nocache_test.go +++ b/go/vt/tabletserver/endtoend/nocache_test.go @@ -5,9 +5,12 @@ package endtoend import ( + "fmt" "reflect" "strings" + "sync" "testing" + "time" "github.com/youtube/vitess/go/mysql" mproto "github.com/youtube/vitess/go/mysql/proto" @@ -15,6 +18,105 @@ import ( "github.com/youtube/vitess/go/vt/tabletserver/endtoend/framework" ) +// compareIntDiff returns an error if end[tag] != start[tag]+diff. +func compareIntDiff(end map[string]interface{}, tag string, start map[string]interface{}, diff int) error { + return verifyIntValue(end, tag, framework.FetchInt(start, tag)+diff) +} + +// verifyIntValue retuns an error if values[tag] != want. +func verifyIntValue(values map[string]interface{}, tag string, want int) error { + got := framework.FetchInt(values, tag) + if got != want { + return fmt.Errorf("%s: %d, want %d", tag, got, want) + } + return nil +} + +func TestConfigVars(t *testing.T) { + vars := framework.DebugVars() + cases := []struct { + tag string + val int + }{{ + tag: "BeginTimeout", + val: int(framework.BaseConfig.TxPoolTimeout * 1e9), + }, { + tag: "ConnPoolAvailable", + val: framework.BaseConfig.PoolSize, + }, { + tag: "ConnPoolCapacity", + val: framework.BaseConfig.PoolSize, + }, { + tag: "ConnPoolIdleTimeout", + val: int(framework.BaseConfig.IdleTimeout * 1e9), + }, { + tag: "ConnPoolMaxCap", + val: framework.BaseConfig.PoolSize, + }, { + tag: "MaxDMLRows", + val: framework.BaseConfig.MaxDMLRows, + }, { + tag: "MaxResultSize", + val: framework.BaseConfig.MaxResultSize, + }, { + tag: "QueryCacheCapacity", + val: framework.BaseConfig.QueryCacheSize, + }, { + tag: "QueryTimeout", + val: int(framework.BaseConfig.QueryTimeout * 1e9), + }, { + tag: "RowcacheConnPoolAvailable", + val: framework.BaseConfig.RowCache.Connections - 50, + }, { + tag: "RowcacheConnPoolCapacity", + val: framework.BaseConfig.RowCache.Connections - 50, + }, { + tag: "RowcacheConnPoolIdleTimeout", + val: int(framework.BaseConfig.IdleTimeout * 1e9), + }, { + tag: "RowcacheConnPoolMaxCap", + val: framework.BaseConfig.RowCache.Connections - 50, + }, { + tag: "SchemaReloadTime", + val: int(framework.BaseConfig.SchemaReloadTime * 1e9), + }, { + tag: "StreamBufferSize", + val: framework.BaseConfig.StreamBufferSize, + }, { + tag: "StreamConnPoolAvailable", + val: framework.BaseConfig.StreamPoolSize, + }, { + tag: "StreamConnPoolCapacity", + val: framework.BaseConfig.StreamPoolSize, + }, { + tag: "StreamConnPoolIdleTimeout", + val: int(framework.BaseConfig.IdleTimeout * 1e9), + }, { + tag: "StreamConnPoolMaxCap", + val: framework.BaseConfig.StreamPoolSize, + }, { + tag: "TransactionPoolAvailable", + val: framework.BaseConfig.TransactionCap, + }, { + tag: "TransactionPoolCapacity", + val: framework.BaseConfig.TransactionCap, + }, { + tag: "TransactionPoolIdleTimeout", + val: int(framework.BaseConfig.IdleTimeout * 1e9), + }, { + tag: "TransactionPoolMaxCap", + val: framework.BaseConfig.TransactionCap, + }, { + tag: "TransactionPoolTimeout", + val: int(framework.BaseConfig.TransactionTimeout * 1e9), + }} + for _, tcase := range cases { + if err := verifyIntValue(vars, tcase.tag, tcase.val); err != nil { + t.Error(err) + } + } +} + func TestSimpleRead(t *testing.T) { vstart := framework.DebugVars() _, err := framework.NewDefaultClient().Execute("select * from vtocc_test where intval=1", nil) @@ -23,15 +125,11 @@ func TestSimpleRead(t *testing.T) { return } vend := framework.DebugVars() - v1 := framework.FetchInt(vstart, "Queries.TotalCount") - v2 := framework.FetchInt(vend, "Queries.TotalCount") - if v1+1 != v2 { - t.Errorf("Queries.TotalCount: %d, want %d", v2, v1+1) + if err := compareIntDiff(vend, "Queries.TotalCount", vstart, 1); err != nil { + t.Error(err) } - v1 = framework.FetchInt(vstart, "Queries.Histograms.PASS_SELECT.Count") - v2 = framework.FetchInt(vend, "Queries.Histograms.PASS_SELECT.Count") - if v1+1 != v2 { - t.Errorf("Queries...Count: %d, want %d", v2, v1+1) + if err := compareIntDiff(vend, "Queries.Histograms.PASS_SELECT.Count", vstart, 1); err != nil { + t.Error(err) } } @@ -154,17 +252,342 @@ func TestNocacheListArgs(t *testing.T) { } func TestIntegrityError(t *testing.T) { - client := framework.NewDefaultClient() vstart := framework.DebugVars() + client := framework.NewDefaultClient() _, err := client.Execute("insert into vtocc_test values(1, null, null, null)", nil) want := "error: Duplicate entry '1'" if err == nil || !strings.HasPrefix(err.Error(), want) { t.Errorf("Error: %v, want prefix %s", err, want) } - vend := framework.DebugVars() - v1 := framework.FetchInt(vstart, "InfoErrors.DupKey") - v2 := framework.FetchInt(vend, "InfoErrors.DupKey") - if v1+1 != v2 { - t.Errorf("InfoErrors.DupKey: %d, want %d", v2, v1+1) + if err := compareIntDiff(framework.DebugVars(), "InfoErrors.DupKey", vstart, 1); err != nil { + t.Error(err) + } +} + +func TestTrailingComment(t *testing.T) { + vstart := framework.DebugVars() + v1 := framework.FetchInt(vstart, "QueryCacheLength") + + bindVars := map[string]interface{}{"ival": 1} + client := framework.NewDefaultClient() + + for _, query := range []string{ + "select * from vtocc_test where intval=:ival", + "select * from vtocc_test where intval=:ival /* comment */", + "select * from vtocc_test where intval=:ival /* comment1 */ /* comment2 */", + } { + _, err := client.Execute(query, bindVars) + if err != nil { + t.Error(err) + return + } + v2 := framework.FetchInt(framework.DebugVars(), "QueryCacheLength") + if v2 != v1+1 { + t.Errorf("QueryCacheLength(%s): %d, want %d", query, v2, v1+1) + } + } +} + +func TestStrictMode(t *testing.T) { + queries := []string{ + "insert into vtocc_a(eid, id, name, foo) values (7, 1+1, '', '')", + "insert into vtocc_d(eid, id) values (1, 1)", + "update vtocc_a set eid = 1+1 where eid = 1 and id = 1", + "insert into vtocc_d(eid, id) values (1, 1)", + "insert into upsert_test(id1, id2) values " + + "(1, 1), (2, 2) on duplicate key update id1 = 1", + "insert into upsert_test(id1, id2) select eid, id " + + "from vtocc_a limit 1 on duplicate key update id2 = id1", + "insert into upsert_test(id1, id2) values " + + "(1, 1) on duplicate key update id1 = 2+1", + } + + // Strict mode on. + func() { + client := framework.NewDefaultClient() + err := client.Begin() + if err != nil { + t.Error(err) + return + } + defer client.Rollback() + + want := "error: DML too complex" + for _, query := range queries { + _, err = client.Execute(query, nil) + if err == nil || err.Error() != want { + t.Errorf("Execute(%s): %v, want %s", query, err, want) + } + } + }() + + // Strict mode off. + func() { + framework.DefaultServer.SetStrictMode(false) + defer framework.DefaultServer.SetStrictMode(true) + + for _, query := range queries { + client := framework.NewDefaultClient() + err := client.Begin() + if err != nil { + t.Error(err) + return + } + _, err = client.Execute(query, nil) + if err != nil { + t.Error(err) + } + client.Rollback() + } + }() +} + +func TestUpsertNonPKHit(t *testing.T) { + client := framework.NewDefaultClient() + err := client.Begin() + if err != nil { + t.Error(err) + return + } + defer client.Rollback() + + _, err = client.Execute("insert into upsert_test(id1, id2) values (1, 1)", nil) + if err != nil { + t.Error(err) + return + } + _, err = client.Execute( + "insert into upsert_test(id1, id2) values "+ + "(2, 1) on duplicate key update id2 = 2", + nil, + ) + want := "error: Duplicate entry '1' for key 'id2_idx'" + if err == nil || !strings.HasPrefix(err.Error(), want) { + t.Errorf("Execute: %v, must start with %s", err, want) + } +} + +func TestPoolSize(t *testing.T) { + vstart := framework.DebugVars() + defer framework.DefaultServer.SetPoolSize(framework.DefaultServer.PoolSize()) + framework.DefaultServer.SetPoolSize(1) + + var wg sync.WaitGroup + wg.Add(2) + go func() { + framework.NewDefaultClient().Execute("select sleep(1) from dual", nil) + wg.Done() + }() + // The queries have to be different so consolidator doesn't kick in. + go func() { + framework.NewDefaultClient().Execute("select sleep(0.5) from dual", nil) + wg.Done() + }() + wg.Wait() + + vend := framework.DebugVars() + if err := verifyIntValue(vend, "ConnPoolCapacity", 1); err != nil { + t.Error(err) + } + if err := compareIntDiff(vend, "ConnPoolWaitCount", vstart, 1); err != nil { + t.Error(err) + } +} + +func TestQueryCache(t *testing.T) { + defer framework.DefaultServer.SetQueryCacheCap(framework.DefaultServer.QueryCacheCap()) + framework.DefaultServer.SetQueryCacheCap(1) + + bindVars := map[string]interface{}{"ival1": 1, "ival2": 1} + client := framework.NewDefaultClient() + _, _ = client.Execute("select * from vtocc_test where intval=:ival1", bindVars) + _, _ = client.Execute("select * from vtocc_test where intval=:ival2", bindVars) + vend := framework.DebugVars() + if err := verifyIntValue(vend, "QueryCacheLength", 1); err != nil { + t.Error(err) + } + if err := verifyIntValue(vend, "QueryCacheSize", 1); err != nil { + t.Error(err) + } + if err := verifyIntValue(vend, "QueryCacheCapacity", 1); err != nil { + t.Error(err) + } + + framework.DefaultServer.SetQueryCacheCap(10) + _, _ = client.Execute("select * from vtocc_test where intval=:ival1", bindVars) + vend = framework.DebugVars() + if err := verifyIntValue(vend, "QueryCacheLength", 2); err != nil { + t.Error(err) + } + if err := verifyIntValue(vend, "QueryCacheSize", 2); err != nil { + t.Error(err) + } + + _, _ = client.Execute("select * from vtocc_test where intval=1", bindVars) + vend = framework.DebugVars() + if err := verifyIntValue(vend, "QueryCacheLength", 3); err != nil { + t.Error(err) + } + if err := verifyIntValue(vend, "QueryCacheSize", 3); err != nil { + t.Error(err) + } +} + +func TestSchemaReload(t *testing.T) { + conn, err := mysql.Connect(connParams) + if err != nil { + t.Error(err) + return + } + _, err = conn.ExecuteFetch("create table vtocc_temp(intval int)", 10, false) + if err != nil { + t.Error(err) + return + } + defer func() { + _, _ = conn.ExecuteFetch("drop table vtocc_temp", 10, false) + conn.Close() + }() + framework.DefaultServer.ReloadSchema() + client := framework.NewDefaultClient() + waitTime := 50 * time.Millisecond + for i := 0; i < 10; i++ { + time.Sleep(waitTime) + waitTime += 50 * time.Millisecond + _, err = client.Execute("select * from vtocc_temp", nil) + if err == nil { + return + } + want := "error: table vtocc_temp not found in schema" + if err.Error() != want { + t.Errorf("Error: %v, want %s", err, want) + return + } + } + t.Error("schema did not reload") +} + +func TestMexResultSize(t *testing.T) { + defer framework.DefaultServer.SetMaxResultSize(framework.DefaultServer.MaxResultSize()) + framework.DefaultServer.SetMaxResultSize(2) + + client := framework.NewDefaultClient() + query := "select * from vtocc_test" + _, err := client.Execute(query, nil) + want := "error: Row count exceeded" + if err == nil || !strings.HasPrefix(err.Error(), want) { + t.Errorf("Error: %v, must start with %s", err, want) + } + if err := verifyIntValue(framework.DebugVars(), "MaxResultSize", 2); err != nil { + t.Error(err) + } + + framework.DefaultServer.SetMaxResultSize(10) + _, err = client.Execute(query, nil) + if err != nil { + t.Error(err) + return + } +} + +func TestMaxDMLRows(t *testing.T) { + client := framework.NewDefaultClient() + _, err := client.Execute( + "insert into vtocc_a(eid, id, name, foo) values "+ + "(3, 1, '', ''), (3, 2, '', ''), (3, 3, '', '')", + nil, + ) + fetcher := framework.NewQueryFetcher() + defer fetcher.Close() + + // Verify all three rows are updated in a single DML. + _, err = client.Execute("update vtocc_a set foo='fghi' where eid = 3", nil) + if err != nil { + t.Error(err) + return + } + queryInfo, err := fetcher.Next() + if err != nil { + t.Error(err) + return + } + want := "begin; " + + "select eid, id from vtocc_a where eid = 3 limit 10001 for update; " + + "update vtocc_a set foo = 'fghi' where " + + "(eid = 3 and id = 1) or (eid = 3 and id = 2) or (eid = 3 and id = 3) " + + "/* _stream vtocc_a (eid id ) (3 1 ) (3 2 ) (3 3 ); */; " + + "commit" + if queryInfo.RewrittenSQL() != want { + t.Errorf("Query info: \n%s, want \n%s", queryInfo.RewrittenSQL(), want) + } + + // Verify that rows get split, and if pk changes, those values are also + // split correctly. + defer framework.DefaultServer.SetMaxDMLRows(framework.DefaultServer.MaxDMLRows()) + framework.DefaultServer.SetMaxDMLRows(2) + _, err = client.Execute("update vtocc_a set eid=2 where eid = 3", nil) + if err != nil { + t.Error(err) + return + } + queryInfo, err = fetcher.Next() + if err != nil { + t.Error(err) + return + } + want = "begin; " + + "select eid, id from vtocc_a where eid = 3 limit 10001 for update; " + + "update vtocc_a set eid = 2 where " + + "(eid = 3 and id = 1) or (eid = 3 and id = 2) " + + "/* _stream vtocc_a (eid id ) (3 1 ) (3 2 ) (2 1 ) (2 2 ); */; " + + "update vtocc_a set eid = 2 where (eid = 3 and id = 3) " + + "/* _stream vtocc_a (eid id ) (3 3 ) (2 3 ); */; " + + "commit" + if queryInfo.RewrittenSQL() != want { + t.Errorf("Query info: \n%s, want \n%s", queryInfo.RewrittenSQL(), want) + } + + // Verify that a normal update is split correctly. + _, err = client.Execute("update vtocc_a set foo='fghi' where eid = 2", nil) + if err != nil { + t.Error(err) + return + } + queryInfo, err = fetcher.Next() + if err != nil { + t.Error(err) + return + } + want = "begin; " + + "select eid, id from vtocc_a where eid = 2 limit 10001 for update; " + + "update vtocc_a set foo = 'fghi' where (eid = 2 and id = 1) or " + + "(eid = 2 and id = 2) /* _stream vtocc_a (eid id ) (2 1 ) (2 2 ); */; " + + "update vtocc_a set foo = 'fghi' where (eid = 2 and id = 3) " + + "/* _stream vtocc_a (eid id ) (2 3 ); */; " + + "commit" + if queryInfo.RewrittenSQL() != want { + t.Errorf("Query info: \n%s, want \n%s", queryInfo.RewrittenSQL(), want) + } + + // Verufy that a delete is split correctly. + _, err = client.Execute("delete from vtocc_a where eid = 2", nil) + if err != nil { + t.Error(err) + return + } + queryInfo, err = fetcher.Next() + if err != nil { + t.Error(err) + return + } + want = "begin; " + + "select eid, id from vtocc_a where eid = 2 limit 10001 for update; " + + "delete from vtocc_a where (eid = 2 and id = 1) or (eid = 2 and id = 2) " + + "/* _stream vtocc_a (eid id ) (2 1 ) (2 2 ); */; " + + "delete from vtocc_a where (eid = 2 and id = 3) " + + "/* _stream vtocc_a (eid id ) (2 3 ); */; " + + "commit" + if queryInfo.RewrittenSQL() != want { + t.Errorf("Query info: \n%s, want \n%s", queryInfo.RewrittenSQL(), want) } } diff --git a/go/vt/tabletserver/endtoend/transaction_test.go b/go/vt/tabletserver/endtoend/transaction_test.go index ec0cb4da5c..fc24c53100 100644 --- a/go/vt/tabletserver/endtoend/transaction_test.go +++ b/go/vt/tabletserver/endtoend/transaction_test.go @@ -38,7 +38,11 @@ func TestCommit(t *testing.T) { t.Error(err) return } - tx := fetcher.Fetch() + tx, err := fetcher.Fetch() + if err != nil { + t.Error(err) + return + } want := []string{query} if !reflect.DeepEqual(tx.Queries, want) { t.Errorf("queries: %v, want %v", tx.Queries, want) @@ -71,46 +75,39 @@ func TestCommit(t *testing.T) { t.Errorf("rows affected: %d, want 4", qr.RowsAffected) } + expectedDiffs := []struct { + tag string + diff int + }{{ + tag: "Transactions.TotalCount", + diff: 2, + }, { + tag: "Transactions.Histograms.Completed.Count", + diff: 2, + }, { + tag: "Queries.TotalCount", + diff: 6, + }, { + tag: "Queries.Histograms.BEGIN.Count", + diff: 1, + }, { + tag: "Queries.Histograms.COMMIT.Count", + diff: 1, + }, { + tag: "Queries.Histograms.INSERT_PK.Count", + diff: 1, + }, { + tag: "Queries.Histograms.DML_PK.Count", + diff: 1, + }, { + tag: "Queries.Histograms.PASS_SELECT.Count", + diff: 2, + }} vend := framework.DebugVars() - v1 := framework.FetchInt(vstart, "Transactions.TotalCount") - v2 := framework.FetchInt(vend, "Transactions.TotalCount") - if v1+2 != v2 { - t.Errorf("Transactions.TotalCount: %d, want %d", v2, v1+2) - } - v1 = framework.FetchInt(vstart, "Transactions.Histograms.Completed.Count") - v2 = framework.FetchInt(vend, "Transactions.Histograms.Completed.Count") - if v1+2 != v2 { - t.Errorf("Transactions.Histograms.Completed.Count: %d, want %d", v2, v1+2) - } - v1 = framework.FetchInt(vstart, "Queries.TotalCount") - v2 = framework.FetchInt(vend, "Queries.TotalCount") - if v1+6 != v2 { - t.Errorf("Queries.TotalCount: %d, want %d", v2, v1+6) - } - v1 = framework.FetchInt(vstart, "Queries.Histograms.BEGIN.Count") - v2 = framework.FetchInt(vend, "Queries.Histograms.BEGIN.Count") - if v1+1 != v2 { - t.Errorf("Queries.Histograms.BEGIN.Count: %d, want %d", v2, v1+1) - } - v1 = framework.FetchInt(vstart, "Queries.Histograms.COMMIT.Count") - v2 = framework.FetchInt(vend, "Queries.Histograms.COMMIT.Count") - if v1+1 != v2 { - t.Errorf("Queries.Histograms.COMMIT.Count: %d, want %d", v2, v1+1) - } - v1 = framework.FetchInt(vstart, "Queries.Histograms.INSERT_PK.Count") - v2 = framework.FetchInt(vend, "Queries.Histograms.INSERT_PK.Count") - if v1+1 != v2 { - t.Errorf("Queries.Histograms.INSERT_PK.Count: %d, want %d", v2, v1+1) - } - v1 = framework.FetchInt(vstart, "Queries.Histograms.DML_PK.Count") - v2 = framework.FetchInt(vend, "Queries.Histograms.DML_PK.Count") - if v1+1 != v2 { - t.Errorf("Queries.Histograms.DML_PK.Count: %d, want %d", v2, v1+1) - } - v1 = framework.FetchInt(vstart, "Queries.Histograms.PASS_SELECT.Count") - v2 = framework.FetchInt(vend, "Queries.Histograms.PASS_SELECT.Count") - if v1+2 != v2 { - t.Errorf("Queries.Histograms.PASS_SELECT.Count: %d, want %d", v2, v1+2) + for _, expected := range expectedDiffs { + if err := compareIntDiff(vend, expected.tag, vstart, expected.diff); err != nil { + t.Error(err) + } } } @@ -136,7 +133,11 @@ func TestRollback(t *testing.T) { t.Error(err) return } - tx := fetcher.Fetch() + tx, err := fetcher.Fetch() + if err != nil { + t.Error(err) + return + } want := []string{query} if !reflect.DeepEqual(tx.Queries, want) { t.Errorf("queries: %v, want %v", tx.Queries, want) @@ -154,31 +155,30 @@ func TestRollback(t *testing.T) { t.Errorf("rows affected: %d, want 3", qr.RowsAffected) } + expectedDiffs := []struct { + tag string + diff int + }{{ + tag: "Transactions.TotalCount", + diff: 1, + }, { + tag: "Transactions.Histograms.Aborted.Count", + diff: 1, + }, { + tag: "Queries.Histograms.BEGIN.Count", + diff: 1, + }, { + tag: "Queries.Histograms.ROLLBACK.Count", + diff: 1, + }, { + tag: "Queries.Histograms.INSERT_PK.Count", + diff: 1, + }} vend := framework.DebugVars() - v1 := framework.FetchInt(vstart, "Transactions.TotalCount") - v2 := framework.FetchInt(vend, "Transactions.TotalCount") - if v1+1 != v2 { - t.Errorf("Transactions.TotalCount: %d, want %d", v2, v1+1) - } - v1 = framework.FetchInt(vstart, "Transactions.Histograms.Aborted.Count") - v2 = framework.FetchInt(vend, "Transactions.Histograms.Aborted.Count") - if v1+1 != v2 { - t.Errorf("Transactions.Histograms.Aborted.Count: %d, want %d", v2, v1+1) - } - v1 = framework.FetchInt(vstart, "Queries.Histograms.BEGIN.Count") - v2 = framework.FetchInt(vend, "Queries.Histograms.BEGIN.Count") - if v1+1 != v2 { - t.Errorf("Queries.Histograms.BEGIN.Count: %d, want %d", v2, v1+1) - } - v1 = framework.FetchInt(vstart, "Queries.Histograms.ROLLBACK.Count") - v2 = framework.FetchInt(vend, "Queries.Histograms.ROLLBACK.Count") - if v1+1 != v2 { - t.Errorf("Queries.Histograms.ROLLBACK.Count: %d, want %d", v2, v1+1) - } - v1 = framework.FetchInt(vstart, "Queries.Histograms.INSERT_PK.Count") - v2 = framework.FetchInt(vend, "Queries.Histograms.INSERT_PK.Count") - if v1+1 != v2 { - t.Errorf("Queries.Histograms.INSERT_PK.Count: %d, want %d", v2, v1+1) + for _, expected := range expectedDiffs { + if err := compareIntDiff(vend, expected.tag, vstart, expected.diff); err != nil { + t.Error(err) + } } } @@ -196,7 +196,11 @@ func TestAutoCommit(t *testing.T) { t.Error(err) return } - tx := fetcher.Fetch() + tx, err := fetcher.Fetch() + if err != nil { + t.Error(err) + return + } want := []string{query} if !reflect.DeepEqual(tx.Queries, want) { t.Errorf("queries: %v, want %v", tx.Queries, want) @@ -229,59 +233,44 @@ func TestAutoCommit(t *testing.T) { t.Errorf("rows affected: %d, want 4", qr.RowsAffected) } + expectedDiffs := []struct { + tag string + diff int + }{{ + tag: "Transactions.TotalCount", + diff: 2, + }, { + tag: "Transactions.Histograms.Completed.Count", + diff: 2, + }, { + tag: "Queries.TotalCount", + diff: 4, + }, { + tag: "Queries.Histograms.BEGIN.Count", + diff: 0, + }, { + tag: "Queries.Histograms.COMMIT.Count", + diff: 0, + }, { + tag: "Queries.Histograms.INSERT_PK.Count", + diff: 1, + }, { + tag: "Queries.Histograms.DML_PK.Count", + diff: 1, + }, { + tag: "Queries.Histograms.PASS_SELECT.Count", + diff: 2, + }} vend := framework.DebugVars() - v1 := framework.FetchInt(vstart, "Transactions.TotalCount") - v2 := framework.FetchInt(vend, "Transactions.TotalCount") - if v1+2 != v2 { - t.Errorf("Transactions.TotalCount: %d, want %d", v2, v1+2) - } - v1 = framework.FetchInt(vstart, "Transactions.Histograms.Completed.Count") - v2 = framework.FetchInt(vend, "Transactions.Histograms.Completed.Count") - if v1+2 != v2 { - t.Errorf("Transactions.Histograms.Completed.Count: %d, want %d", v2, v1+2) - } - v1 = framework.FetchInt(vstart, "Queries.TotalCount") - v2 = framework.FetchInt(vend, "Queries.TotalCount") - if v1+4 != v2 { - t.Errorf("Queries.TotalCount: %d, want %d", v2, v1+6) - } - v1 = framework.FetchInt(vstart, "Queries.Histograms.BEGIN.Count") - v2 = framework.FetchInt(vend, "Queries.Histograms.BEGIN.Count") - if v1 != v2 { - t.Errorf("Queries.Histograms.BEGIN.Count: %d, want %d", v2, v1) - } - v1 = framework.FetchInt(vstart, "Queries.Histograms.COMMIT.Count") - v2 = framework.FetchInt(vend, "Queries.Histograms.COMMIT.Count") - if v1 != v2 { - t.Errorf("Queries.Histograms.COMMIT.Count: %d, want %d", v2, v1) - } - v1 = framework.FetchInt(vstart, "Queries.Histograms.INSERT_PK.Count") - v2 = framework.FetchInt(vend, "Queries.Histograms.INSERT_PK.Count") - if v1+1 != v2 { - t.Errorf("Queries.Histograms.INSERT_PK.Count: %d, want %d", v2, v1+1) - } - v1 = framework.FetchInt(vstart, "Queries.Histograms.DML_PK.Count") - v2 = framework.FetchInt(vend, "Queries.Histograms.DML_PK.Count") - if v1+1 != v2 { - t.Errorf("Queries.Histograms.DML_PK.Count: %d, want %d", v2, v1+1) - } - v1 = framework.FetchInt(vstart, "Queries.Histograms.PASS_SELECT.Count") - v2 = framework.FetchInt(vend, "Queries.Histograms.PASS_SELECT.Count") - if v1+2 != v2 { - t.Errorf("Queries.Histograms.PASS_SELECT.Count: %d, want %d", v2, v1+2) + for _, expected := range expectedDiffs { + if err := compareIntDiff(vend, expected.tag, vstart, expected.diff); err != nil { + t.Error(err) + } } } func TestTxPoolSize(t *testing.T) { vstart := framework.DebugVars() - v1 := framework.FetchInt(vstart, "TransactionPoolCapacity") - if v1 != framework.BaseConfig.TransactionCap { - t.Errorf("TransactionPoolCapacity: %d, want %d", v1, framework.BaseConfig.TransactionCap) - } - v1 = framework.FetchInt(vstart, "TransactionPoolAvailable") - if v1 != framework.BaseConfig.TransactionCap { - t.Errorf("TransactionPoolAvailable: %d, want %d", v1, framework.BaseConfig.TransactionCap) - } client1 := framework.NewDefaultClient() err := client1.Begin() @@ -290,10 +279,8 @@ func TestTxPoolSize(t *testing.T) { return } defer client1.Rollback() - vend := framework.DebugVars() - v2 := framework.FetchInt(vend, "TransactionPoolAvailable") - if v2 != framework.BaseConfig.TransactionCap-1 { - t.Errorf("TransactionPoolAvailable: %d, want %d", v2, framework.BaseConfig.TransactionCap-1) + if err := verifyIntValue(framework.DebugVars(), "TransactionPoolAvailable", framework.BaseConfig.TransactionCap-1); err != nil { + t.Error(err) } defer framework.DefaultServer.SetTxPoolSize(framework.DefaultServer.TxPoolSize()) @@ -301,14 +288,15 @@ func TestTxPoolSize(t *testing.T) { defer framework.DefaultServer.BeginTimeout.Set(framework.DefaultServer.BeginTimeout.Get()) timeout := 1 * time.Millisecond framework.DefaultServer.BeginTimeout.Set(timeout) - vend = framework.DebugVars() - v2 = framework.FetchInt(vend, "TransactionPoolCapacity") - if v2 != 1 { - t.Errorf("TransactionPoolCapacity: %d, want 1", v2) + vend := framework.DebugVars() + if err := verifyIntValue(vend, "TransactionPoolAvailable", 0); err != nil { + t.Error(err) } - v2 = framework.FetchInt(vend, "BeginTimeout") - if v2 != int(timeout) { - t.Errorf("BeginTimeout: %d, want %d", v2, int(timeout)) + if err := verifyIntValue(vend, "TransactionPoolCapacity", 1); err != nil { + t.Error(err) + } + if err := verifyIntValue(vend, "BeginTimeout", int(timeout)); err != nil { + t.Error(err) } client2 := framework.NewDefaultClient() @@ -317,30 +305,18 @@ func TestTxPoolSize(t *testing.T) { if err == nil || !strings.Contains(err.Error(), want) { t.Errorf("Error: %v, must contain %s", err, want) } - - vend = framework.DebugVars() - v1 = framework.FetchInt(vstart, "Errors.TxPoolFull") - v2 = framework.FetchInt(vend, "Errors.TxPoolFull") - if v2 != v1+1 { - t.Errorf("Errors.TxPoolFull: %d, want %d", v2, v1+1) + if err := compareIntDiff(framework.DebugVars(), "Errors.TxPoolFull", vstart, 1); err != nil { + t.Error(err) } } func TestTxTimeout(t *testing.T) { vstart := framework.DebugVars() - v1 := framework.FetchInt(vstart, "TransactionPoolTimeout") - timeout := int(framework.BaseConfig.TransactionTimeout * 1e9) - if v1 != timeout { - t.Errorf("Timeout: %d, want %d", v1, timeout) - } defer framework.DefaultServer.SetTxTimeout(framework.DefaultServer.TxTimeout()) framework.DefaultServer.SetTxTimeout(1 * time.Millisecond) - vend := framework.DebugVars() - v2 := framework.FetchInt(vend, "TransactionPoolTimeout") - timeout = int(1 * time.Millisecond) - if v2 != timeout { - t.Errorf("Timeout: %d, want %d", v2, timeout) + if err := verifyIntValue(framework.DebugVars(), "TransactionPoolTimeout", int(1*time.Millisecond)); err != nil { + t.Error(err) } fetcher := framework.NewTxFetcher() @@ -356,15 +332,16 @@ func TestTxTimeout(t *testing.T) { if err == nil || !strings.HasPrefix(err.Error(), want) { t.Errorf("Error: %v, must contain %s", err, want) } - tx := fetcher.Fetch() + tx, err := fetcher.Fetch() + if err != nil { + t.Error(err) + return + } if tx.Conclusion != "kill" { t.Errorf("Conclusion: %s, want kill", tx.Conclusion) } - vend = framework.DebugVars() - v1 = framework.FetchInt(vstart, "Kills.Transactions") - v2 = framework.FetchInt(vend, "Kills.Transactions") - if v2 != v1+1 { - t.Errorf("Kills.Transactions: %d, want %d", v2, v1+1) + if err := compareIntDiff(framework.DebugVars(), "Kills.Transactions", vstart, 1); err != nil { + t.Error(err) } } diff --git a/go/vt/tabletserver/query_executor.go b/go/vt/tabletserver/query_executor.go index 84ae9706c8..ccd7170eff 100644 --- a/go/vt/tabletserver/query_executor.go +++ b/go/vt/tabletserver/query_executor.go @@ -642,7 +642,7 @@ func (qre *QueryExecutor) execSet() (*mproto.QueryResult, error) { if err != nil { return nil, NewTabletError(ErrFail, vtrpc.ErrorCode_BAD_INPUT, "got set vt_query_cache_size = %v, want int64", err) } - qre.qe.schemaInfo.SetQueryCacheSize(int(val)) + qre.qe.schemaInfo.SetQueryCacheCap(int(val)) case "vt_max_result_size": val, err := parseInt64(qre.plan.SetValue) if err != nil { diff --git a/go/vt/tabletserver/schema_info.go b/go/vt/tabletserver/schema_info.go index 20ba5b831f..eb57ca3137 100644 --- a/go/vt/tabletserver/schema_info.go +++ b/go/vt/tabletserver/schema_info.go @@ -519,14 +519,19 @@ func (si *SchemaInfo) peekQuery(sql string) *ExecPlan { return nil } -// SetQueryCacheSize sets the query cache size. -func (si *SchemaInfo) SetQueryCacheSize(size int) { +// SetQueryCacheCap sets the query cache capacity. +func (si *SchemaInfo) SetQueryCacheCap(size int) { if size <= 0 { panic(NewTabletError(ErrFail, vtrpc.ErrorCode_BAD_INPUT, "cache size %v out of range", size)) } si.queries.SetCapacity(int64(size)) } +// QueryCacheCap returns the capacity of the query cache. +func (si *SchemaInfo) QueryCacheCap() int { + return int(si.queries.Capacity()) +} + // SetReloadTime changes how often the schema is reloaded. This // call also triggers an immediate reload. func (si *SchemaInfo) SetReloadTime(reloadTime time.Duration) { diff --git a/go/vt/tabletserver/tabletserver.go b/go/vt/tabletserver/tabletserver.go index 247f2339b6..4f82e53978 100644 --- a/go/vt/tabletserver/tabletserver.go +++ b/go/vt/tabletserver/tabletserver.go @@ -948,6 +948,16 @@ func (tsv *TabletServer) registerSchemazHandler() { }) } +// SetPoolSize changes the pool size to the specified value. +func (tsv *TabletServer) SetPoolSize(val int) { + tsv.qe.connPool.SetCapacity(val) +} + +// PoolSize returns the pool size. +func (tsv *TabletServer) PoolSize() int { + return int(tsv.qe.connPool.Capacity()) +} + // SetTxPoolSize changes the tx pool size to the specified value. func (tsv *TabletServer) SetTxPoolSize(val int) { tsv.qe.txPool.pool.SetCapacity(val) @@ -968,6 +978,45 @@ func (tsv *TabletServer) TxTimeout() time.Duration { return tsv.qe.txPool.Timeout() } +// SetQueryCacheCap changes the pool size to the specified value. +func (tsv *TabletServer) SetQueryCacheCap(val int) { + tsv.qe.schemaInfo.SetQueryCacheCap(val) +} + +// QueryCacheCap returns the pool size. +func (tsv *TabletServer) QueryCacheCap() int { + return int(tsv.qe.schemaInfo.QueryCacheCap()) +} + +// SetStrictMode sets strict mode on or off. +func (tsv *TabletServer) SetStrictMode(strict bool) { + if strict { + tsv.qe.strictMode.Set(1) + } else { + tsv.qe.strictMode.Set(0) + } +} + +// SetMaxResultSize changes the max result size to the specified value. +func (tsv *TabletServer) SetMaxResultSize(val int) { + tsv.qe.maxResultSize.Set(int64(val)) +} + +// MaxResultSize returns the max result size. +func (tsv *TabletServer) MaxResultSize() int { + return int(tsv.qe.maxResultSize.Get()) +} + +// SetMaxDMLRows changes the max result size to the specified value. +func (tsv *TabletServer) SetMaxDMLRows(val int) { + tsv.qe.maxDMLRows.Set(int64(val)) +} + +// MaxDMLRows returns the max result size. +func (tsv *TabletServer) MaxDMLRows() int { + return int(tsv.qe.maxDMLRows.Get()) +} + func init() { rand.Seed(time.Now().UnixNano()) } From 40e25e9ec6d5ea40007fb9fb38a56face8077102 Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Fri, 2 Oct 2015 10:21:23 -0700 Subject: [PATCH 2/3] tabletserver: fix broken build --- go/vt/tabletserver/schema_info_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/vt/tabletserver/schema_info_test.go b/go/vt/tabletserver/schema_info_test.go index 543ab8dc76..4038971ebd 100644 --- a/go/vt/tabletserver/schema_info_test.go +++ b/go/vt/tabletserver/schema_info_test.go @@ -400,7 +400,7 @@ func TestSchemaInfoQueryCacheFailDueToInvalidCacheSize(t *testing.T) { "schema info SetQueryCacheSize should use a positive size", ErrFail, ) - schemaInfo.SetQueryCacheSize(0) + schemaInfo.SetQueryCacheCap(0) } func TestSchemaInfoQueryCache(t *testing.T) { @@ -427,7 +427,7 @@ func TestSchemaInfoQueryCache(t *testing.T) { ctx := context.Background() logStats := newLogStats("GetPlanStats", ctx) - schemaInfo.SetQueryCacheSize(1) + schemaInfo.SetQueryCacheCap(1) firstPlan := schemaInfo.GetPlan(ctx, logStats, firstQuery) if firstPlan == nil { t.Fatalf("plan should not be nil") From 1604b5d6f9c0a1a9fe6256a851065c1fc82a7bb7 Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Fri, 2 Oct 2015 10:32:11 -0700 Subject: [PATCH 3/3] tabletserver endtoend: fix logtracker Looks like unit_race is too slow. So, the wait for events becomes flaky with a 1 second timeout. Increased it to 5s. --- go/vt/tabletserver/endtoend/framework/logtracker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/tabletserver/endtoend/framework/logtracker.go b/go/vt/tabletserver/endtoend/framework/logtracker.go index 7bec1b4eb4..58fbc55783 100644 --- a/go/vt/tabletserver/endtoend/framework/logtracker.go +++ b/go/vt/tabletserver/endtoend/framework/logtracker.go @@ -80,7 +80,7 @@ func (fetcher *QueryFetcher) Close() { // Next fetches the next captured query. // If the wait is longer than one second, it returns an error. func (fetcher *QueryFetcher) Next() (*tabletserver.LogStats, error) { - tmr := time.NewTimer(1 * time.Second) + tmr := time.NewTimer(5 * time.Second) defer tmr.Stop() for { select {