Merge pull request #1194 from alainjobart/resharding

R.I.P. vtocc.
This commit is contained in:
Alain Jobart 2015-10-09 16:21:22 -07:00
Родитель 6517c9b78b 965140c42f
Коммит 7ee787d822
19 изменённых файлов: 134 добавлений и 572 удалений

Просмотреть файл

@ -67,7 +67,7 @@ unit_test_goveralls: build
travis/goveralls.sh
queryservice_test:
go run test.go -docker=false queryservice_vtocc queryservice_vttablet
go run test.go -docker=false queryservice
.ONESHELL:
SHELL = /bin/bash

Просмотреть файл

@ -1,148 +0,0 @@
# vtocc
vtocc is a smart proxy to mysql/mariadb. It's basically a subset of
vttablet that concerns itself with just serving queries for a
single database. It's a standalone program/server that can be
launched without the need of the rest of the vitess ecosystem.
It can be pointed to an existing database, and you should be able
to send queries through it.
## vtocc features
* Connection pooling.
* SQL parser: Although very close, the vtocc SQL parser is not SQL-92
compliant. It has left out constructs that are deemed uncommon or
OLTP-unfriendly. It should, however, allow most queries used by a
typical web application.
* Query rewrite and sanitation (adding limits, avoiding non-deterministic updates).
* Query de-duping: reuse the results of an in-flight query to any
subsequent requests that were received while the query was still
executing.
* Rowcache: the mysql buffer cache is optimized for range scans over
indices and tables. Unfortunately, its not good for random access
by primary key. The rowcache will instead maintain a row based cache
(using [memcached](http://memcached.org/) as its backend) and keep it
consistent by fielding all DMLs that could potentially affect them.
* Query blacklisting: You can specify a set of rules to blacklist queries
that are potentially problematic.
* Table ACLs: Allow you to specify ACLs for tables based on the connected
user.
* Update stream: A server that streams the list of rows that are changing
in the database, which can be used as a mechanism to continuously export
the data to another data store.
* Query killer for queries that take too long to return data.
* Transaction management: Ability to limit the number of concurrent
transactions and manage deadlines.
* Streaming queries to serve OLAP workloads.
* A rich set of monitoring features to watch over, diagnose or analyze performance.
## Protocol
vtocc uses the bsonrpc protocol. This means that it uses [bson encoding](http://bsonspec.org)
to receive and send messages. There is currently a [python client](https://github.com/youtube/vitess/blob/master/py/vtdb/tablet.py). A java client is
also getting implemented.
If you are familiar with go, you can actually plug in any protocol you desire, like json,
thrift or protobufs.
## Data types
vtocc has not been well tested with exotic data types. Specifically, we don't know how it
will handle boolean and timestamp columns. Otherwise, we have [tests](https://github.com/youtube/vitess/blob/master/test/test_data/test_schema.sql#L45) for
the commonly used data types.
vtocc can work with latin-1 or utf-8 encoded databases. It's highly recommended that you match
client and server side encoding. vtocc does not try to do any character set conversions.
vtocc will enable rowcache only for tables that have numbers or binary data types as primary
key columns. This is because other column types are not bitwise comparable. For example,
varchar comparison in MySQL is collation dependent. So, those types are not supported.
## Bind variables
One major differentiator with vtocc is its use of bind variables. When you send a query,
you build it like this:
query = "select a, b, c from t where id1 = :id1 and id2 = :id2"
bindVars = {"id1": 1, "id2": 2}
vtocc parses the query string and caches it against an execution plan. Subsequent requests
with the same query string will cause vtocc to reuse the cached query plan to efficiently
handle the request.
vtocc also accepts old-style positional variables. In such cases the bind vars are to be
named as v1, v2, etc:
query = "select a, b, c from t where id1 = ? and id2 = ?"
bindVars = {"v1": 1, "v2": 2}
vtocc also accepts list bind variables (TO BE IMPLEMENTED):
query = "select a, b, c from t where id1 in ::list"
bindVars = {"list": [1, 2, 3]}
Additionally, vtocc tracks statistics grouped by these query strings, which are
useful for analysis and troubleshooting.
## Execute functions
There are three Execute functions:
* **Execute**: This is an OLTP execute function that is expected to return a limited set
of rows. If the number of rows exceeds the max allowed limit, an error is returned.
* **BatchExecute** executes a set of OLTP statements as a single round-trip request. If you
are affected by network latency, this function can be used to group your requests.
You can call this function from within a transaction, or you can begin and commit a
transaction from within a batch.
* **StreamExecute**: This function is used for returning large result sets that could
potentially require full table scans.
## Command line arguments
'vtocc -h' should print the full set of command line arguments. Here is an explanation
of what they mean:
#### DB connection parameters
There are four types of db-config parameters. db-config-app-* specify the connection parameters
that will be used to serve the app. The 'repl' parameters will be used by the rowcache to connect
to the server as a replica to fetch binlog events for invalidation. The 'dba' and 'filtered'
parameters are only used when running as vttablet.
* **db-config-app-charset="utf8"**: Only utf8 or latin1 are currently supported.
* **db-config-app-dbname=""**: Name of the MySQL database to serve queries for.
* **db-config-app-keyspace=""**: Its recommended that this value be set to the same as dbname. Clients connecting to vtocc will need to specify the keyspace name, which will be used as sanity check.
* **db-config-app-uname="vt_app"**: Set this to the username vtocc should connect as.
* **db-config-app-unixsocket=""**: Socket file name. This is the recommended mode of connection (vs host-port).
* **db-credentials-server="file"**: db credentials server type (use 'file' for the file implementation).
* **db-credentials-file**: Specifies the file where db credentials are stored.
TODO: Document the rest of the flags.
#### Query server parameters
All timeout related parameters below are specified in seconds. A value of zero means never.
* **port=0**: Server port.
* **queryserver-config-idle-timeout=1800**: vtocc has many connection pools to connect to mysql. If any connection in the pool has been idle for longer than the specified time, then vtocc discards the connection and creates a new one instead. This value should be less than the MySQL idle timeout.
* **queryserver-config-max-result-size=10000**: vtocc adds a limit clause to all unbounded queries. If the result returned exceeds this number, it returns an error instead.
* **queryserver-config-pool-size=16**: This is the generic read pool. This pool gets used if you issue read queries outside of transactions.
* **queryserver-config-query-cache-size=5000**: This is the number of unique query strings that vtocc caches. You can start off with the default value and adjust up or down based on what you see.
* **queryserver-config-query-timeout=0**: This timeout specifies how long a query is allowed to run before its killed.
* **queryserver-config-schema-reload-time=1800**: This specifies how often the schema is reloaded by vtocc. If you rollout schema changes to a live serving database, you can be sure that it has been read by vtocc after the reload time has elapsed. If needed, there are ways to make vtocc reload the schema immediately.
* **queryserver-config-stream-buffer-size=32768**: This is the buffer size for streaming queries.
* **queryserver-config-stream-pool-size=750**: Connection pool size for streaming queries.
* **queryserver-config-strict-mode=true**: If this is turned off, vtocc allows all DMLs and does not enforce MySQL's `STRICT_TRANS_TABLES`. This setting can be turned off for migration purposes if the database is not already configured with these settings.
* **queryserver-config-transaction-cap=20**: This value limits the number of allowed concurrent transactions.
* **queryserver-config-transaction-timeout=30**: The amount of time to allow a transaction to complete before killing it.
#### Logging parameters
* **alsologtostderr=false**: log to standard error as well as files.
* **keep_logs=0**: keep logs for this long (zero to keep forever).
* **log_backtrace_at=:0**: when logging hits line file:N, emit a stack trace.
* **log_dir=""**: If non-empty, write log files in this directory.
* **logtostderr=false**: log to standard error instead of files.
* **stderrthreshold=WARNING**: logs at or above this threshold go to stderr.
* **purge_logs_interval=1h0m0s**: how often try to remove old logs.

Просмотреть файл

@ -25,7 +25,7 @@ import (
)
var (
port = flag.Int("port", 6612, "vtocc port")
port = flag.Int("port", 6612, "vttablet port")
mysqlPort = flag.Int("mysql_port", 3306, "mysql port")
tabletUID = flag.Uint("tablet_uid", 41983, "tablet uid")
mysqlSocket = flag.String("mysql_socket", "", "path to the mysql socket")

Просмотреть файл

@ -1,11 +0,0 @@
// Copyright 2014, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
// Imports and register the file custom rule source
import (
_ "github.com/youtube/vitess/go/vt/tabletserver/customrule/filecustomrule"
)

Просмотреть файл

@ -1,11 +0,0 @@
// Copyright 2013, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
// Imports and register the gorpc queryservice server
import (
_ "github.com/youtube/vitess/go/vt/tabletserver/gorpcqueryservice"
)

Просмотреть файл

@ -1,16 +0,0 @@
// Copyright 2013, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
// Imports and register the gRPC queryservice server
import (
"github.com/youtube/vitess/go/vt/servenv"
_ "github.com/youtube/vitess/go/vt/tabletserver/grpcqueryservice"
)
func init() {
servenv.RegisterGRPCFlags()
}

Просмотреть файл

@ -1,11 +0,0 @@
// Copyright 2014, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
// This plugin imports influxdbbackend to register the influxdbbackend stats backend.
import (
_ "github.com/youtube/vitess/go/stats/influxdbbackend"
)

Просмотреть файл

@ -1,13 +0,0 @@
package main
import "github.com/youtube/vitess/go/vt/tabletserver"
// For use by plugins which wish to avoid racing when registering status page parts.
var onStatusRegistered func()
func addStatusParts(qsc tabletserver.Controller) {
qsc.AddStatusPart()
if onStatusRegistered != nil {
onStatusRegistered()
}
}

Просмотреть файл

@ -1,118 +0,0 @@
// Copyright 2012, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
import (
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"time"
log "github.com/golang/glog"
"github.com/youtube/vitess/go/exit"
"github.com/youtube/vitess/go/vt/dbconfigs"
"github.com/youtube/vitess/go/vt/mysqlctl"
"github.com/youtube/vitess/go/vt/servenv"
"github.com/youtube/vitess/go/vt/tableacl"
"github.com/youtube/vitess/go/vt/tableacl/simpleacl"
"github.com/youtube/vitess/go/vt/tabletserver"
// import mysql to register mysql connection function
_ "github.com/youtube/vitess/go/mysql"
// import memcache to register memcache connection function
_ "github.com/youtube/vitess/go/memcache"
)
var (
overridesFile = flag.String("schema-override", "", "schema overrides file")
enableRowcache = flag.Bool("enable-rowcache", false, "enable rowcacche")
enableInvalidator = flag.Bool("enable-invalidator", false, "enable rowcache invalidator")
binlogPath = flag.String("binlog-path", "", "binlog path used by rowcache invalidator")
tableAclConfig = flag.String("table-acl-config", "", "path to table access checker config file")
)
var schemaOverrides []tabletserver.SchemaOverride
func init() {
servenv.RegisterDefaultFlags()
}
func main() {
defer exit.Recover()
flags := dbconfigs.AppConfig | dbconfigs.DbaConfig |
dbconfigs.FilteredConfig | dbconfigs.ReplConfig
dbconfigs.RegisterFlags(flags)
flag.Parse()
tabletserver.Init()
if len(flag.Args()) > 0 {
flag.Usage()
log.Errorf("vtocc doesn't take any positional arguments")
exit.Return(1)
}
servenv.Init()
dbConfigs, err := dbconfigs.Init("", flags)
if err != nil {
log.Errorf("Cannot initialize App dbconfig: %v", err)
exit.Return(1)
}
if *enableRowcache {
dbConfigs.App.EnableRowcache = true
if *enableInvalidator {
dbConfigs.App.EnableInvalidator = true
}
}
mycnf := &mysqlctl.Mycnf{BinLogPath: *binlogPath}
mysqld := mysqlctl.NewMysqld("Dba", "App", mycnf, &dbConfigs.Dba, &dbConfigs.App.ConnParams, &dbConfigs.Repl)
if err := unmarshalFile(*overridesFile, &schemaOverrides); err != nil {
log.Error(err)
exit.Return(1)
}
data, _ := json.MarshalIndent(schemaOverrides, "", " ")
log.Infof("schemaOverrides: %s\n", data)
if *tableAclConfig != "" {
tableacl.Register("simpleacl", &simpleacl.Factory{})
tableacl.Init(*tableAclConfig)
}
qsc := tabletserver.NewServer()
qsc.Register()
// Query service can go into NOT_SERVING state if mysql goes down.
// So, continuously retry starting the service. So, it tries to come
// back up if it went down.
go func() {
for {
_ = qsc.StartService(nil, dbConfigs, schemaOverrides, mysqld)
time.Sleep(30 * time.Second)
}
}()
log.Infof("starting vtocc %v", *servenv.Port)
servenv.OnRun(func() {
addStatusParts(qsc)
})
servenv.OnTerm(func() {
qsc.StopService()
mysqld.Close()
})
servenv.RunDefault()
}
func unmarshalFile(name string, val interface{}) error {
if name != "" {
data, err := ioutil.ReadFile(name)
if err != nil {
return fmt.Errorf("unmarshalFile: could not read %v: %v", val, err)
}
if err = json.Unmarshal(data, val); err != nil {
return fmt.Errorf("unmarshalFile: could not read %s: %v", val, err)
}
}
return nil
}

Просмотреть файл

@ -114,7 +114,7 @@ func (evs *EventStreamer) transactionToEvent(trans *proto.BinlogTransaction) err
buildDMLEvent parses the tuples of the full stream comment.
The _stream comment is extracted into a StreamEvent.
*/
// Example query: insert into vtocc_e(foo) values ('foo') /* _stream vtocc_e (eid id name ) (null 1 'bmFtZQ==' ); */
// Example query: insert into _table_(foo) values ('foo') /* _stream _table_ (eid id name ) (null 1 'bmFtZQ==' ); */
// the "null" value is used for auto-increment columns.
func (evs *EventStreamer) buildDMLEvent(sql string, insertid int64) (*proto.StreamEvent, int64, error) {
// first extract the comment

Просмотреть файл

@ -15,13 +15,13 @@ import (
var dmlErrorCases = []string{
"query",
"query /* _stream 10 (eid id name ) (null 1 'bmFtZQ==' ); */",
"query /* _stream vtocc_e eid id name ) (null 1 'bmFtZQ==' ); */",
"query /* _stream vtocc_e (10 id name ) (null 1 'bmFtZQ==' ); */",
"query /* _stream vtocc_e (eid id name (null 1 'bmFtZQ==' ); */",
"query /* _stream vtocc_e (eid id name) (null 'aaa' 'bmFtZQ==' ); */",
"query /* _stream vtocc_e (eid id name) (null 'bmFtZQ==' ); */",
"query /* _stream vtocc_e (eid id name) (null 1.1 'bmFtZQ==' ); */",
"query /* _stream vtocc_e (eid id name) (null a 'bmFtZQ==' ); */",
"query /* _stream _table_ eid id name ) (null 1 'bmFtZQ==' ); */",
"query /* _stream _table_ (10 id name ) (null 1 'bmFtZQ==' ); */",
"query /* _stream _table_ (eid id name (null 1 'bmFtZQ==' ); */",
"query /* _stream _table_ (eid id name) (null 'aaa' 'bmFtZQ==' ); */",
"query /* _stream _table_ (eid id name) (null 'bmFtZQ==' ); */",
"query /* _stream _table_ (eid id name) (null 1.1 'bmFtZQ==' ); */",
"query /* _stream _table_ (eid id name) (null a 'bmFtZQ==' ); */",
}
func TestEventErrors(t *testing.T) {
@ -94,7 +94,7 @@ func TestDMLEvent(t *testing.T) {
Sql: "SET INSERT_ID=10",
}, {
Category: proto.BL_DML,
Sql: "query /* _stream vtocc_e (eid id name) (null -1 'bmFtZQ==' ) (null 18446744073709551615 'bmFtZQ==' ); */",
Sql: "query /* _stream _table_ (eid id name) (null -1 'bmFtZQ==' ) (null 18446744073709551615 'bmFtZQ==' ); */",
}, {
Category: proto.BL_DML,
Sql: "query",
@ -107,7 +107,7 @@ func TestDMLEvent(t *testing.T) {
sendEvent: func(event *proto.StreamEvent) error {
switch event.Category {
case "DML":
want := `&{DML vtocc_e [{eid 8 0} {id 8 0} {name 15 0}] [[10 -1 name] [11 18446744073709551615 name]] 1 }`
want := `&{DML _table_ [{eid 8 0} {id 8 0} {name 15 0}] [[10 -1 name] [11 18446744073709551615 name]] 1 }`
got := fmt.Sprintf("%v", event)
if got != want {
t.Errorf("got \n%s, want \n%s", got, want)

Просмотреть файл

@ -12,7 +12,7 @@ Naming is disconnected from the backend discovery and is used for
front end clients.
The common query is "resolve keyspace.shard.db_type" and return a list
of host:port tuples that export our default server (vtocc). You can
of host:port tuples that export our default server (vttablet). You can
get all shards with "keyspace.*.db_type".
In zk, this is in /zk/local/vt/ns/<keyspace>/<shard>/<db type>

Просмотреть файл

@ -164,7 +164,7 @@ func (wr *Wrangler) exportVtnsToZkns(ctx context.Context, zconn zk.Conn, vtnsAdd
}
// Write the individual endpoints and compute the SRV entries.
vtoccAddrs := LegacyZknsAddrs{make([]string, 0, 8)}
tabletAddrs := LegacyZknsAddrs{make([]string, 0, 8)}
defaultAddrs := LegacyZknsAddrs{make([]string, 0, 8)}
for i, entry := range addrs.Entries {
zknsAddrPath := fmt.Sprintf("%v/%v", zknsAddrPath, i)
@ -178,7 +178,7 @@ func (wr *Wrangler) exportVtnsToZkns(ctx context.Context, zconn zk.Conn, vtnsAdd
return nil, err
}
defaultAddrs.Endpoints = append(defaultAddrs.Endpoints, zknsAddrPath)
vtoccAddrs.Endpoints = append(vtoccAddrs.Endpoints, zknsAddrPath+":vt")
tabletAddrs.Endpoints = append(tabletAddrs.Endpoints, zknsAddrPath+":vt")
}
// Prune any zkns entries that are no longer referenced by the
@ -199,10 +199,10 @@ func (wr *Wrangler) exportVtnsToZkns(ctx context.Context, zconn zk.Conn, vtnsAdd
deleteIdx++
}
// Write the VDNS entries for both vtocc and mysql
vtoccVdnsPath := fmt.Sprintf("%v/vt.vdns", zknsAddrPath)
zknsPaths = append(zknsPaths, vtoccVdnsPath)
if err = writeAddrs(zconn, vtoccVdnsPath, &vtoccAddrs); err != nil {
// Write the VDNS entries for both tablet and mysql
tabletVdnsPath := fmt.Sprintf("%v/vt.vdns", zknsAddrPath)
zknsPaths = append(zknsPaths, tabletVdnsPath)
if err = writeAddrs(zconn, tabletVdnsPath, &tabletAddrs); err != nil {
return nil, err
}

Просмотреть файл

@ -26,7 +26,7 @@ This set of helpers has two purposes:
It is expected that Vitess users may change py/vttest/environment.py to match their setup. For instance, YouTube internally does this to use different scripts to bring up a MySQL instance.
These tests need to be as light weight as possible, so the developers who use Vitess can run as many unit tests as they want with minimal resources. We are currently running a single MySQL, multiple vtocc, and one vtgate. The plan is to switch to vtcombo to replace vtgate+vtocc processes with a single process. The MySQL process has multiple databases, one per keyspace / shard. It is the smallest setup with a real MySQL.
These tests need to be as light weight as possible, so the developers who use Vitess can run as many unit tests as they want with minimal resources. We are running a single MySQL and a single vtcombo instance (that encapsulates one vtgate and multiple vttablets). The MySQL process has multiple databases, one per keyspace / shard, and replication is not enabled. It is the smallest setup with a real MySQL.
This framework supports an initial schema to apply to all keyspaces / shards, per keyspace. This is also the base for testing schema changes before rolling them out to production.
@ -52,16 +52,10 @@ We have made a lot of progress for the Google internal sandbox, however the exte
The following action items exist to make it all consistent:
* Consolidate to use use vttest everywhere, instead of the old google3-only run\_local\_database.py. Note vttest library is working in Google3 and passes unit test scenarios. Haven't had time to clean up the old stuff yet.
* switch the end-to-end google3 tests that require a production setup from vtocc to vtgate. This is a small subset of tests (about 100, not the thousands of YouTube tests that use run\_local\_database.py), so using vttablet is not too bad.
* Switch query\_service.py tests to unit tests using vttest. Sugu is on it, first thing is to let vttest only launch a MySQL (and nothing else), and fix its MySQL parameters.
* Switch query\_service.py tests to unit tests using vttest. Sugu is making good progress on this.
* Switch java tests to use vttest if they need a full cluster, and not java\_vtgate\_test\_helper.py, retire java\_vtgate\_test\_helper.py. Client unit tests should already use vtgateclienttest.
* Same for google3 C++ client.
* We are removing direct vttablet access to python. This in turn will remove a lot of code and tests, like vtclient.py, tablet.py, zkocc.py, ... Less surface area is good, we just need to make sure we maintain good code coverage. As part of this:
* test/vtdb\_test.py needs to go away. But we need to make sure what it tests is covered in other places. So if it's just testing the python vttablet library, that should go. If it's also testing vttablet timeout behaviors, that should stay.

Просмотреть файл

@ -120,24 +120,10 @@
"Shard": 4,
"RetryMax": 0
},
"queryservice_vtocc": {
"queryservice": {
"File": "queryservice_test.py",
"Args": [
"-m",
"-e",
"vtocc"
],
"Command": [],
"Manual": false,
"Shard": 4,
"RetryMax": 0
},
"queryservice_vttablet": {
"File": "queryservice_test.py",
"Args": [
"-m",
"-e",
"vttablet"
"-m"
],
"Command": [],
"Manual": false,
@ -310,4 +296,4 @@
"RetryMax": 0
}
}
}
}

Просмотреть файл

@ -26,9 +26,6 @@ def main():
parser = optparse.OptionParser(usage='usage: %prog [options] [test_names]')
parser.add_option('-m', '--memcache', action='store_true', default=False,
help='starts a memcache d, and tests rowcache')
parser.add_option(
'-e', '--env', default='vttablet',
help='Environment that will be used. Valid options: vttablet, vtocc')
utils.add_options(parser)
options, args = parser.parse_args()
@ -42,7 +39,7 @@ def run_tests(options, args):
suite = unittest.TestSuite()
if args:
if args[0] == 'teardown':
test_env.TestEnv(options.env).tearDown()
test_env.TestEnv().tearDown()
exit(0)
for arg in args:
if hasattr(nocache_tests.TestNocache, arg):
@ -62,11 +59,11 @@ def run_tests(options, args):
for m in modules:
suite.addTests(unittest.TestLoader().loadTestsFromModule(m))
env = test_env.TestEnv(options.env)
env = test_env.TestEnv()
try:
env.memcache = options.memcache
env.setUp()
print 'Starting queryservice_test.py: %s' % options.env
print 'Starting queryservice_test.py'
sys.stdout.flush()
framework.TestCase.setenv(env)
result = unittest.TextTestRunner(

Просмотреть файл

@ -39,29 +39,27 @@ class TestStream(framework.TestCase):
self.fail('Bindvar asdfg should not be allowed by custom rule')
except dbexceptions.DatabaseError as e:
self.assertContains(str(e), 'error: Query disallowed')
# Test dynamic custom rule for vttablet
if self.env.env == 'vttablet':
if environment.topo_server().flavor() == 'zookeeper':
# Make a change to the rule
self.env.change_customrules()
time.sleep(3)
try:
self.env.execute(
'select * from vtocc_test where intval=:asdfg', bv,
cursorclass=cursor.StreamCursor)
except dbexceptions.DatabaseError as e:
self.fail(
'Bindvar asdfg should be allowed after a change of custom rule, '
'Err=' + str(e))
self.env.restore_customrules()
time.sleep(3)
try:
self.env.execute(
'select * from vtocc_test where intval=:asdfg', bv,
cursorclass=cursor.StreamCursor)
self.fail('Bindvar asdfg should not be allowed by custom rule')
except dbexceptions.DatabaseError as e:
self.assertContains(str(e), 'error: Query disallowed')
if environment.topo_server().flavor() == 'zookeeper':
# Make a change to the rule
self.env.change_customrules()
time.sleep(3)
try:
self.env.execute(
'select * from vtocc_test where intval=:asdfg', bv,
cursorclass=cursor.StreamCursor)
except dbexceptions.DatabaseError as e:
self.fail(
'Bindvar asdfg should be allowed after a change of custom rule, '
'Err=' + str(e))
self.env.restore_customrules()
time.sleep(3)
try:
self.env.execute(
'select * from vtocc_test where intval=:asdfg', bv,
cursorclass=cursor.StreamCursor)
self.fail('Bindvar asdfg should not be allowed by custom rule')
except dbexceptions.DatabaseError as e:
self.assertContains(str(e), 'error: Query disallowed')
def test_basic_stream(self):
self._populate_vtocc_big_table(100)

Просмотреть файл

@ -40,11 +40,6 @@ class TestEnv(object):
vttop = environment.vttop
vtroot = environment.vtroot
def __init__(self, env):
if env not in ['vttablet', 'vtocc']:
raise EnvironmentError('unexptected env', env)
self.env = env
@property
def port(self):
return self.tablet.port
@ -110,14 +105,13 @@ class TestEnv(object):
"Operator": "NOOP"
}]
}]""")
if self.env == 'vttablet':
if environment.topo_server().flavor() == 'zookeeper':
utils.run(
environment.binary_argstr('zk') +
' touch -p /zk/test_ca/config/customrules/testrules')
utils.run(
environment.binary_argstr('zk') + ' cp ' + filename +
' /zk/test_ca/config/customrules/testrules')
if environment.topo_server().flavor() == 'zookeeper':
utils.run(
environment.binary_argstr('zk') +
' touch -p /zk/test_ca/config/customrules/testrules')
utils.run(
environment.binary_argstr('zk') + ' cp ' + filename +
' /zk/test_ca/config/customrules/testrules')
def change_customrules(self):
customrules = os.path.join(environment.tmproot, 'customrules.json')
@ -131,20 +125,18 @@ class TestEnv(object):
"Operator": "NOOP"
}]
}]""")
if self.env == 'vttablet':
if environment.topo_server().flavor() == 'zookeeper':
utils.run(
environment.binary_argstr('zk') + ' cp ' + customrules +
' /zk/test_ca/config/customrules/testrules')
if environment.topo_server().flavor() == 'zookeeper':
utils.run(
environment.binary_argstr('zk') + ' cp ' + customrules +
' /zk/test_ca/config/customrules/testrules')
def restore_customrules(self):
customrules = os.path.join(environment.tmproot, 'customrules.json')
self.create_customrules(customrules)
if self.env == 'vttablet':
if environment.topo_server().flavor() == 'zookeeper':
utils.run(
environment.binary_argstr('zk') + ' cp ' + customrules +
' /zk/test_ca/config/customrules/testrules')
if environment.topo_server().flavor() == 'zookeeper':
utils.run(
environment.binary_argstr('zk') + ' cp ' + customrules +
' /zk/test_ca/config/customrules/testrules')
def create_schema_override(self, filename):
with open(filename, 'w') as f:
@ -222,33 +214,23 @@ class TestEnv(object):
table_acl_config = os.path.join(
environment.vttop, 'test', 'test_data', 'table_acl_config.json')
if self.env == 'vttablet':
environment.topo_server().setup()
self.create_customrules(customrules);
utils.run_vtctl('CreateKeyspace -force test_keyspace')
self.tablet.init_tablet('master', 'test_keyspace', '0')
if environment.topo_server().flavor() == 'zookeeper':
self.tablet.start_vttablet(
memcache=self.memcache,
zkcustomrules='/zk/test_ca/config/customrules/testrules',
schema_override=schema_override,
table_acl_config=table_acl_config,
)
else:
self.tablet.start_vttablet(
memcache=self.memcache,
filecustomrules=customrules,
schema_override=schema_override,
table_acl_config=table_acl_config,
)
environment.topo_server().setup()
self.create_customrules(customrules);
utils.run_vtctl('CreateKeyspace -force test_keyspace')
self.tablet.init_tablet('master', 'test_keyspace', '0')
if environment.topo_server().flavor() == 'zookeeper':
self.tablet.start_vttablet(
memcache=self.memcache,
zkcustomrules='/zk/test_ca/config/customrules/testrules',
schema_override=schema_override,
table_acl_config=table_acl_config,
)
else:
self.create_customrules(customrules);
self.tablet.start_vtocc(
self.tablet.start_vttablet(
memcache=self.memcache,
filecustomrules=customrules,
schema_override=schema_override,
table_acl_config=table_acl_config,
keyspace='test_keyspace', shard='0',
)
self.conn = self.connect()
self.txlogger = utils.curl(
@ -256,7 +238,7 @@ class TestEnv(object):
stdout=open(self.txlog_file, 'w'))
self.txlog = framework.Tailer(self.txlog_file, flush=self.tablet.flush)
self.log = framework.Tailer(
os.path.join(environment.vtlogroot, '%s.INFO' % self.env),
os.path.join(environment.vtlogroot, 'vttablet.INFO'),
flush=self.tablet.flush)
self.querylog = Querylog(self)
@ -278,8 +260,7 @@ class TestEnv(object):
pass
if getattr(self, 'txlogger', None):
self.txlogger.terminate()
if self.env == 'vttablet':
environment.topo_server().teardown()
environment.topo_server().teardown()
utils.kill_sub_processes()
utils.remove_tmp_files()
self.tablet.remove_tree()

Просмотреть файл

@ -44,12 +44,10 @@ def get_all_extra_my_cnf(extra_my_cnf):
class Tablet(object):
"""This class helps manage a vttablet or vtocc instance.
"""This class helps manage a vttablet instance.
To use it for vttablet, you need to use init_tablet and/or
start_vttablet. For vtocc, you can just call start_vtocc.
If you use it to start as vtocc, many of the support functions
that are meant for vttablet will not work.
start_vttablet.
"""
default_uid = 62344
seq = 0
@ -378,80 +376,6 @@ class Tablet(object):
(self.port, environment.flush_logs_url),
stderr=utils.devnull, stdout=utils.devnull)
def _start_prog(
self, binary, port=None, memcache=False,
wait_for_state='SERVING', filecustomrules=None, zkcustomrules=None,
schema_override=None,
repl_extra_flags=None, table_acl_config=None,
lameduck_period=None, security_policy=None,
extra_args=None, extra_env=None):
if repl_extra_flags is None:
repl_extra_flags = {}
environment.prog_compile(binary)
args = environment.binary_args(binary)
args.extend(['-port', '%s' % (port or self.port),
'-log_dir', environment.vtlogroot])
self._add_dbconfigs(args, repl_extra_flags)
if memcache:
args.extend(['-rowcache-bin', environment.memcached_bin()])
memcache_socket = os.path.join(self.tablet_dir, 'memcache.sock')
args.extend(['-rowcache-socket', memcache_socket])
args.extend(['-enable-rowcache'])
if filecustomrules:
args.extend(['-filecustomrules', filecustomrules])
if zkcustomrules:
args.extend(['-zkcustomrules', zkcustomrules])
if schema_override:
args.extend(['-schema-override', schema_override])
if table_acl_config:
args.extend(['-table-acl-config', table_acl_config])
args.extend(['-queryserver-config-strict-table-acl'])
if protocols_flavor().service_map():
args.extend(['-service_map', ','.join(protocols_flavor().service_map())])
if self.grpc_enabled():
args.extend(['-grpc_port', str(self.grpc_port)])
if lameduck_period:
args.extend(['-lameduck-period', lameduck_period])
if security_policy:
args.extend(['-security_policy', security_policy])
if extra_args:
args.extend(extra_args)
args.extend(['-enable-autocommit'])
stderr_fd = open(
os.path.join(environment.vtlogroot, '%s-%d.stderr' %
(binary, self.tablet_uid)), 'w')
# increment count only the first time
if not self.proc:
Tablet.tablets_running += 1
self.proc = utils.run_bg(args, stderr=stderr_fd, extra_env=extra_env)
log_message = (
'Started vttablet: %s (%s) with pid: %s - Log files: '
'%s/vttablet.*.{INFO,WARNING,ERROR,FATAL}.*.%s' %
(self.tablet_uid, self.tablet_alias, self.proc.pid,
environment.vtlogroot, self.proc.pid))
# This may race with the stderr output from the process (though
# that's usually empty).
stderr_fd.write(log_message + '\n')
stderr_fd.close()
logging.debug(log_message)
# wait for query service to be in the right state
if wait_for_state:
if binary == 'vttablet':
self.wait_for_vttablet_state(wait_for_state, port=port)
else:
self.wait_for_vtocc_state(wait_for_state, port=port)
return self.proc
def start_vttablet(
self, port=None, memcache=False,
wait_for_state='SERVING', filecustomrules=None, zkcustomrules=None,
@ -467,9 +391,7 @@ class Tablet(object):
The process is also saved in self.proc, so it's easy to kill as well.
"""
if repl_extra_flags is None:
repl_extra_flags = {}
args = []
args = environment.binary_args('vttablet')
# Use 'localhost' as hostname because Travis CI worker hostnames
# are too long for MySQL replication.
args.extend(['-tablet_hostname', 'localhost'])
@ -543,56 +465,68 @@ class Tablet(object):
if extra_args:
args.extend(extra_args)
return self._start_prog(
binary='vttablet', port=port,
memcache=memcache, wait_for_state=wait_for_state,
filecustomrules=filecustomrules,
zkcustomrules=zkcustomrules,
schema_override=schema_override,
repl_extra_flags=repl_extra_flags,
table_acl_config=table_acl_config,
lameduck_period=lameduck_period, extra_args=args,
security_policy=security_policy, extra_env=extra_env)
args.extend(['-port', '%s' % (port or self.port),
'-log_dir', environment.vtlogroot])
def start_vtocc(self, port=None, memcache=False,
wait_for_state='SERVING', filecustomrules=None,
schema_override=None,
repl_extra_flags=None, table_acl_config=None,
lameduck_period=None, security_policy=None,
keyspace=None, shard=False,
extra_args=None):
"""Starts a vtocc process, and returns it.
self._add_dbconfigs(args, repl_extra_flags)
The process is also saved in self.proc, so it's easy to kill as well.
"""
if repl_extra_flags is None:
repl_extra_flags = {}
self.keyspace = keyspace
self.shard = shard
self.dbname = 'vt_' + (self.keyspace or 'database')
args = []
args.extend(['-db-config-app-unixsocket', self.tablet_dir + '/mysql.sock'])
args.extend(['-db-config-dba-unixsocket', self.tablet_dir + '/mysql.sock'])
args.extend(['-db-config-app-keyspace', keyspace])
args.extend(['-db-config-app-shard', shard])
args.extend(['-binlog-path', 'foo'])
if memcache:
args.extend(['-rowcache-bin', environment.memcached_bin()])
memcache_socket = os.path.join(self.tablet_dir, 'memcache.sock')
args.extend(['-rowcache-socket', memcache_socket])
args.extend(['-enable-rowcache'])
if filecustomrules:
args.extend(['-filecustomrules', filecustomrules])
if zkcustomrules:
args.extend(['-zkcustomrules', zkcustomrules])
if schema_override:
args.extend(['-schema-override', schema_override])
if table_acl_config:
args.extend(['-table-acl-config', table_acl_config])
args.extend(['-queryserver-config-strict-table-acl'])
if protocols_flavor().service_map():
args.extend(['-service_map', ','.join(protocols_flavor().service_map())])
if self.grpc_enabled():
args.extend(['-grpc_port', str(self.grpc_port)])
if lameduck_period:
args.extend(['-lameduck-period', lameduck_period])
if security_policy:
args.extend(['-security_policy', security_policy])
if extra_args:
args.extend(extra_args)
return self._start_prog(binary='vtocc', port=port,
memcache=memcache, wait_for_state=wait_for_state,
filecustomrules=filecustomrules,
schema_override=schema_override,
repl_extra_flags=repl_extra_flags,
table_acl_config=table_acl_config,
lameduck_period=lameduck_period, extra_args=args,
security_policy=security_policy)
args.extend(['-enable-autocommit'])
stderr_fd = open(
os.path.join(environment.vtlogroot, 'vttablet-%d.stderr' %
self.tablet_uid), 'w')
# increment count only the first time
if not self.proc:
Tablet.tablets_running += 1
self.proc = utils.run_bg(args, stderr=stderr_fd, extra_env=extra_env)
log_message = (
'Started vttablet: %s (%s) with pid: %s - Log files: '
'%s/vttablet.*.{INFO,WARNING,ERROR,FATAL}.*.%s' %
(self.tablet_uid, self.tablet_alias, self.proc.pid,
environment.vtlogroot, self.proc.pid))
# This may race with the stderr output from the process (though
# that's usually empty).
stderr_fd.write(log_message + '\n')
stderr_fd.close()
logging.debug(log_message)
# wait for query service to be in the right state
if wait_for_state:
self.wait_for_vttablet_state(wait_for_state, port=port)
return self.proc
def wait_for_vttablet_state(self, expected, timeout=60.0, port=None):
self.wait_for_vtocc_state(expected, timeout=timeout, port=port)
def wait_for_vtocc_state(self, expected, timeout=60.0, port=None):
while True:
v = utils.get_vars(port or self.port)
last_seen_state = '?'