From ad9c110c799584d975d56ed8dcbeed305b240fa7 Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Thu, 21 Jun 2012 16:21:45 -0700 Subject: [PATCH] flags reorg: moved some flag initializations to their respective packages. port is now specified in the command line instead of config file. --- go/cmd/vtocc/vtocc.go | 125 +++------------------------------ go/vt/tabletserver/init.go | 85 ++++++++++++++++++++++ go/vt/tabletserver/queryctl.go | 4 +- go/vt/tabletserver/sqlquery.go | 24 ++++--- py/vttest/occ.json | 3 - py/vttest/occ_test.py | 2 +- 6 files changed, 112 insertions(+), 131 deletions(-) create mode 100644 go/vt/tabletserver/init.go delete mode 100644 py/vttest/occ.json diff --git a/go/cmd/vtocc/vtocc.go b/go/cmd/vtocc/vtocc.go index 93f741bb39..59f766b0bb 100644 --- a/go/cmd/vtocc/vtocc.go +++ b/go/cmd/vtocc/vtocc.go @@ -15,13 +15,11 @@ import ( ts "code.google.com/p/vitess/go/vt/tabletserver" "crypto/md5" "encoding/hex" - "encoding/json" "errors" "expvar" "flag" "fmt" "io" - "io/ioutil" "log" _ "net/http/pprof" "net/rpc" @@ -31,75 +29,11 @@ import ( "time" ) -const ( - Version = "vtocc/1.0" -) - const ( DefaultLameDuckPeriod = 30.0 DefaultRebindDelay = 0.0 ) -const ( - RLIMIT_CPU = 0 - RLIMIT_FSIZE = 1 - RLIMIT_DATA = 2 - RLIMIT_STACK = 3 - RLIMIT_CORE = 4 - RLIMIT_RSS = 5 - RLIMIT_NPROC = 6 - RLIMIT_NOFILE = 7 - RLIMIT_MEMLOCK = 8 - RLIMIT_AS = 9 - RLIMIT_LOCKS = 10 - RLIMIT_SIGPENDING = 11 - RLIMIT_MSGQUEUE = 12 - RLIMIT_NICE = 13 - RLIMIT_RTPRIO = 14 -) - -type configType struct { - Port int - UmgmtSocket string - CachePoolCap int - PoolSize int - TransactionCap int - TransactionTimeout float64 - MaxResultSize int - QueryCacheSize int - SchemaReloadTime float64 - QueryTimeout float64 - IdleTimeout float64 -} - -var config configType = configType{ - 6510, - "/tmp/vtocc-%08x-umgmt.sock", - 1000, - 16, - 20, - 30, - 10000, - 5000, - 30 * 60, - 0, - 30 * 60, -} - -var dbconfig map[string]interface{} = map[string]interface{}{ - "host": "localhost", - "port": 0, - "unix_socket": "", - "uname": "vt_app", - "pass": "", - "dbname": "", - "charset": "utf8", -} - -func HandleGracefulShutdown() { - relog.Info("HandleGracefulShutdown") -} - func exportBinaryVersion() { hasher := md5.New() exeFile, err := os.Open("/proc/self/exe") @@ -122,10 +56,9 @@ func exportBinaryVersion() { } func main() { + port := flag.Int("port", 6510, "tcp port to serve on") memProfileRate := flag.Int("mem-profile-rate", 512*1024, "profile every n bytes allocated") maxOpenFds := flag.Uint64("max-open-fds", 32768, "max open file descriptors") - configFile := flag.String("config", "", "config file name") - dbConfigFile := flag.String("dbconfig", "", "db config file name") lameDuckPeriod := flag.Float64("lame-duck-period", DefaultLameDuckPeriod, "how long to give in-flight transactions to finish") rebindDelay := flag.Float64("rebind-delay", DefaultRebindDelay, @@ -134,7 +67,6 @@ func main() { logFrequency := flag.Int64("logfile.frequency", 0, "rotation frequency in seconds") logMaxSize := flag.Int64("logfile.maxsize", 0, "max file size in bytes") logMaxFiles := flag.Int64("logfile.maxfiles", 0, "max number of log files") - queryLog := flag.String("querylog", "", "for testing: log all queries to this file") flag.Parse() exportBinaryVersion() @@ -148,48 +80,25 @@ func main() { logger := relog.New(f, "vtocc ", log.Ldate|log.Lmicroseconds|log.Lshortfile, relog.DEBUG) relog.SetLogger(logger) - if *queryLog != "" { - if f, err = os.OpenFile(*queryLog, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0644); err == nil { - ts.QueryLogger = relog.New(f, "", log.Ldate|log.Lmicroseconds, relog.DEBUG) - } - } - unmarshalFile(*configFile, &config) - unmarshalFile(*dbConfigFile, &dbconfig) - // work-around for jsonism - if v, ok := dbconfig["port"].(float64); ok { - dbconfig["port"] = int(v) - } fdLimit := &syscall.Rlimit{*maxOpenFds, *maxOpenFds} - if err = syscall.Setrlimit(RLIMIT_NOFILE, fdLimit); err != nil { + if err = syscall.Setrlimit(syscall.RLIMIT_NOFILE, fdLimit); err != nil { relog.Fatal("can't Setrlimit %#v: err %v", *fdLimit, err) } else { relog.Info("set max-open-fds = %v", *maxOpenFds) } + config, dbconfig := ts.Init() qm := &OccManager{config, dbconfig} rpc.Register(qm) - - ts.StartQueryService( - config.CachePoolCap, - config.PoolSize, - config.TransactionCap, - config.TransactionTimeout, - config.MaxResultSize, - config.QueryCacheSize, - config.SchemaReloadTime, - config.QueryTimeout, - config.IdleTimeout, - ) + ts.StartQueryService(config) ts.AllowQueries(dbconfig) - rpc.HandleHTTP() jsonrpc.ServeHTTP() jsonrpc.ServeRPC() bsonrpc.ServeHTTP() bsonrpc.ServeRPC() - - relog.Info("started vtocc %v", config.Port) + relog.Info("started vtocc %v", *port) // we delegate out startup to the micromanagement server so these actions // will occur after we have obtained our socket. @@ -202,7 +111,7 @@ func main() { umgmt.SetLameDuckPeriod(float32(*lameDuckPeriod)) umgmt.SetRebindDelay(float32(*rebindDelay)) umgmt.AddStartupCallback(func() { - umgmt.StartHttpServer(fmt.Sprintf(":%v", config.Port)) + umgmt.StartHttpServer(fmt.Sprintf(":%v", *port)) }) umgmt.AddStartupCallback(func() { sighandler.SetSignalHandler(syscall.SIGTERM, @@ -211,34 +120,16 @@ func main() { umgmt.AddCloseCallback(func() { ts.DisallowQueries() }) - umgmt.AddShutdownCallback(func() error { - HandleGracefulShutdown() - return nil - }) - umgmtSocket := fmt.Sprintf(config.UmgmtSocket, config.Port) + umgmtSocket := fmt.Sprintf("/tmp/vtocc-%08x-umgmt.sock", *port) if umgmtErr := umgmt.ListenAndServe(umgmtSocket); umgmtErr != nil { relog.Error("umgmt.ListenAndServe err: %v", umgmtErr) } relog.Info("done") } -func unmarshalFile(name string, val interface{}) { - if name != "" { - data, err := ioutil.ReadFile(name) - if err != nil { - relog.Fatal("could not read %v: %v", val, err) - } - if err = json.Unmarshal(data, val); err != nil { - relog.Fatal("could not read %s: %v", val, err) - } - } - data, _ := json.MarshalIndent(val, "", " ") - relog.Info("config: %s\n", data) -} - type OccManager struct { - config configType + config ts.Config dbconfig map[string]interface{} } diff --git a/go/vt/tabletserver/init.go b/go/vt/tabletserver/init.go new file mode 100644 index 0000000000..ee72a1dc6a --- /dev/null +++ b/go/vt/tabletserver/init.go @@ -0,0 +1,85 @@ + +// Copyright 2012, Google Inc. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package tabletserver + +import ( + "code.google.com/p/vitess/go/relog" + "encoding/json" + "flag" + "io/ioutil" + "log" + "os" +) + +type Config struct { + CachePoolCap int + PoolSize int + TransactionCap int + TransactionTimeout float64 + MaxResultSize int + QueryCacheSize int + SchemaReloadTime float64 + QueryTimeout float64 + IdleTimeout float64 +} + +var config Config = Config { + 1000, + 16, + 20, + 30, + 10000, + 5000, + 30 * 60, + 0, + 30 * 60, +} + +var dbconfig map[string]interface{} = map[string]interface{}{ + "host": "localhost", + "port": 0, + "unix_socket": "", + "uname": "vt_app", + "pass": "", + "dbname": "", + "charset": "utf8", +} + +var configFile = flag.String("config", "", "config file name") +var dbConfigFile = flag.String("dbconfig", "", "db config file name") +var queryLog = flag.String("querylog", "", "for testing: log all queries to this file") + +// Init should be called after the main function calls flag.Parse. +func Init() (Config, map[string]interface{}) { + if *queryLog != "" { + if f, err := os.OpenFile(*queryLog, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0644); err == nil { + QueryLogger = relog.New(f, "", log.Ldate|log.Lmicroseconds, relog.DEBUG) + } else { + relog.Fatal("Error opening file %v: %v", *queryLog, err) + } + } + unmarshalFile(*configFile, &config) + unmarshalFile(*dbConfigFile, &dbconfig) + // work-around for jsonism + if v, ok := dbconfig["port"].(float64); ok { + dbconfig["port"] = int(v) + } + return config, dbconfig +} + +func unmarshalFile(name string, val interface{}) { + if name != "" { + data, err := ioutil.ReadFile(name) + if err != nil { + relog.Fatal("could not read %v: %v", val, err) + } + if err = json.Unmarshal(data, val); err != nil { + relog.Fatal("could not read %s: %v", val, err) + } + } + data, _ := json.MarshalIndent(val, "", " ") + relog.Info("config: %s\n", data) +} diff --git a/go/vt/tabletserver/queryctl.go b/go/vt/tabletserver/queryctl.go index e0b16a9892..65e48a909d 100644 --- a/go/vt/tabletserver/queryctl.go +++ b/go/vt/tabletserver/queryctl.go @@ -11,12 +11,12 @@ import ( var SqlQueryRpcService *SqlQuery -func StartQueryService(cachePoolCap, poolSize, transactionCap int, transactionTimeout float64, maxResultSize, queryCacheSize int, schemaReloadTime, queryTimeout, idleTimeout float64) { +func StartQueryService(config Config) { if SqlQueryRpcService != nil { relog.Warning("RPC service already up %v", SqlQueryRpcService) return } - SqlQueryRpcService = NewSqlQuery(cachePoolCap, poolSize, transactionCap, transactionTimeout, maxResultSize, queryCacheSize, schemaReloadTime, queryTimeout, idleTimeout) + SqlQueryRpcService = NewSqlQuery(config) rpc.Register(SqlQueryRpcService) } diff --git a/go/vt/tabletserver/sqlquery.go b/go/vt/tabletserver/sqlquery.go index 7d711f631f..de667a2311 100644 --- a/go/vt/tabletserver/sqlquery.go +++ b/go/vt/tabletserver/sqlquery.go @@ -63,17 +63,17 @@ type CacheInvalidator interface { Delete(key string) bool } -func NewSqlQuery(cachePoolCap, poolSize, transactionCap int, transactionTimeout float64, maxResultSize, queryCacheSize int, schemaReloadTime, queryTimeout, idleTimeout float64) *SqlQuery { +func NewSqlQuery(config Config) *SqlQuery { self := &SqlQuery{} - self.cachePool = NewCachePool(cachePoolCap, time.Duration(queryTimeout*1e9), time.Duration(idleTimeout*1e9)) - self.schemaInfo = NewSchemaInfo(queryCacheSize, time.Duration(schemaReloadTime*1e9)) - self.connPool = NewConnectionPool(poolSize, time.Duration(idleTimeout*1e9)) + self.cachePool = NewCachePool(config.CachePoolCap, time.Duration(config.QueryTimeout*1e9), time.Duration(config.IdleTimeout*1e9)) + self.schemaInfo = NewSchemaInfo(config.QueryCacheSize, time.Duration(config.SchemaReloadTime*1e9)) + self.connPool = NewConnectionPool(config.PoolSize, time.Duration(config.IdleTimeout*1e9)) self.reservedPool = NewReservedPool() - self.txPool = NewConnectionPool(transactionCap, time.Duration(idleTimeout*1e9)) // connections in pool has to be > transactionCap - self.activeTxPool = NewActiveTxPool(time.Duration(transactionTimeout * 1e9)) - self.activePool = NewActivePool(time.Duration(queryTimeout*1e9), time.Duration(idleTimeout*1e9)) + self.txPool = NewConnectionPool(config.TransactionCap, time.Duration(config.IdleTimeout*1e9)) // connections in pool has to be > transactionCap + self.activeTxPool = NewActiveTxPool(time.Duration(config.TransactionTimeout * 1e9)) + self.activePool = NewActivePool(time.Duration(config.QueryTimeout*1e9), time.Duration(config.IdleTimeout*1e9)) self.consolidator = NewConsolidator() - self.maxResultSize = int32(maxResultSize) + self.maxResultSize = int32(config.MaxResultSize) expvar.Publish("Voltron", stats.StrFunc(func() string { return self.statsJSON() })) queryStats = stats.NewTimings("Queries") stats.NewRates("QPS", queryStats, 15, 60e9) @@ -150,6 +150,14 @@ func (self *SqlQuery) checkState(sessionId int64, allowShutdown bool) { } } +func (self *SqlQuery) GetSessionId(dbName *string, sessionId *int64) error { + if *dbName != self.dbName { + return NewTabletError(FATAL, "db name mismatch, expecting %v, received %v", self.dbName, *dbName) + } + *sessionId = self.sessionId + return nil +} + func (self *SqlQuery) Begin(session *Session, transactionId *int64) (err error) { defer handleError(&err) self.checkState(session.SessionId, false) diff --git a/py/vttest/occ.json b/py/vttest/occ.json deleted file mode 100644 index f2041a1808..0000000000 --- a/py/vttest/occ.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "Port": 9461 -} diff --git a/py/vttest/occ_test.py b/py/vttest/occ_test.py index 2873be5316..00cbdb1990 100755 --- a/py/vttest/occ_test.py +++ b/py/vttest/occ_test.py @@ -66,7 +66,7 @@ class TestEnv(object): self.memcached = subprocess.Popen(["memcached", "-s", self.cfg["memcache"]]) occ_args = [ vttop+"/go/cmd/vtocc/vtocc", - "-config", "occ.json", + "-port", "9461", "-dbconfig", options.dbconfig, "-logfile", LOGFILE, "-querylog", QUERYLOGFILE,