flags reorg: moved some flag initializations to their respective packages.

port is now specified in the command line instead of config file.
This commit is contained in:
Sugu Sougoumarane 2012-06-21 16:21:45 -07:00
Родитель 93669dff3b
Коммит ad9c110c79
6 изменённых файлов: 112 добавлений и 131 удалений

Просмотреть файл

@ -15,13 +15,11 @@ import (
ts "code.google.com/p/vitess/go/vt/tabletserver"
"crypto/md5"
"encoding/hex"
"encoding/json"
"errors"
"expvar"
"flag"
"fmt"
"io"
"io/ioutil"
"log"
_ "net/http/pprof"
"net/rpc"
@ -31,75 +29,11 @@ import (
"time"
)
const (
Version = "vtocc/1.0"
)
const (
DefaultLameDuckPeriod = 30.0
DefaultRebindDelay = 0.0
)
const (
RLIMIT_CPU = 0
RLIMIT_FSIZE = 1
RLIMIT_DATA = 2
RLIMIT_STACK = 3
RLIMIT_CORE = 4
RLIMIT_RSS = 5
RLIMIT_NPROC = 6
RLIMIT_NOFILE = 7
RLIMIT_MEMLOCK = 8
RLIMIT_AS = 9
RLIMIT_LOCKS = 10
RLIMIT_SIGPENDING = 11
RLIMIT_MSGQUEUE = 12
RLIMIT_NICE = 13
RLIMIT_RTPRIO = 14
)
type configType struct {
Port int
UmgmtSocket string
CachePoolCap int
PoolSize int
TransactionCap int
TransactionTimeout float64
MaxResultSize int
QueryCacheSize int
SchemaReloadTime float64
QueryTimeout float64
IdleTimeout float64
}
var config configType = configType{
6510,
"/tmp/vtocc-%08x-umgmt.sock",
1000,
16,
20,
30,
10000,
5000,
30 * 60,
0,
30 * 60,
}
var dbconfig map[string]interface{} = map[string]interface{}{
"host": "localhost",
"port": 0,
"unix_socket": "",
"uname": "vt_app",
"pass": "",
"dbname": "",
"charset": "utf8",
}
func HandleGracefulShutdown() {
relog.Info("HandleGracefulShutdown")
}
func exportBinaryVersion() {
hasher := md5.New()
exeFile, err := os.Open("/proc/self/exe")
@ -122,10 +56,9 @@ func exportBinaryVersion() {
}
func main() {
port := flag.Int("port", 6510, "tcp port to serve on")
memProfileRate := flag.Int("mem-profile-rate", 512*1024, "profile every n bytes allocated")
maxOpenFds := flag.Uint64("max-open-fds", 32768, "max open file descriptors")
configFile := flag.String("config", "", "config file name")
dbConfigFile := flag.String("dbconfig", "", "db config file name")
lameDuckPeriod := flag.Float64("lame-duck-period", DefaultLameDuckPeriod,
"how long to give in-flight transactions to finish")
rebindDelay := flag.Float64("rebind-delay", DefaultRebindDelay,
@ -134,7 +67,6 @@ func main() {
logFrequency := flag.Int64("logfile.frequency", 0, "rotation frequency in seconds")
logMaxSize := flag.Int64("logfile.maxsize", 0, "max file size in bytes")
logMaxFiles := flag.Int64("logfile.maxfiles", 0, "max number of log files")
queryLog := flag.String("querylog", "", "for testing: log all queries to this file")
flag.Parse()
exportBinaryVersion()
@ -148,48 +80,25 @@ func main() {
logger := relog.New(f, "vtocc ",
log.Ldate|log.Lmicroseconds|log.Lshortfile, relog.DEBUG)
relog.SetLogger(logger)
if *queryLog != "" {
if f, err = os.OpenFile(*queryLog, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0644); err == nil {
ts.QueryLogger = relog.New(f, "", log.Ldate|log.Lmicroseconds, relog.DEBUG)
}
}
unmarshalFile(*configFile, &config)
unmarshalFile(*dbConfigFile, &dbconfig)
// work-around for jsonism
if v, ok := dbconfig["port"].(float64); ok {
dbconfig["port"] = int(v)
}
fdLimit := &syscall.Rlimit{*maxOpenFds, *maxOpenFds}
if err = syscall.Setrlimit(RLIMIT_NOFILE, fdLimit); err != nil {
if err = syscall.Setrlimit(syscall.RLIMIT_NOFILE, fdLimit); err != nil {
relog.Fatal("can't Setrlimit %#v: err %v", *fdLimit, err)
} else {
relog.Info("set max-open-fds = %v", *maxOpenFds)
}
config, dbconfig := ts.Init()
qm := &OccManager{config, dbconfig}
rpc.Register(qm)
ts.StartQueryService(
config.CachePoolCap,
config.PoolSize,
config.TransactionCap,
config.TransactionTimeout,
config.MaxResultSize,
config.QueryCacheSize,
config.SchemaReloadTime,
config.QueryTimeout,
config.IdleTimeout,
)
ts.StartQueryService(config)
ts.AllowQueries(dbconfig)
rpc.HandleHTTP()
jsonrpc.ServeHTTP()
jsonrpc.ServeRPC()
bsonrpc.ServeHTTP()
bsonrpc.ServeRPC()
relog.Info("started vtocc %v", config.Port)
relog.Info("started vtocc %v", *port)
// we delegate out startup to the micromanagement server so these actions
// will occur after we have obtained our socket.
@ -202,7 +111,7 @@ func main() {
umgmt.SetLameDuckPeriod(float32(*lameDuckPeriod))
umgmt.SetRebindDelay(float32(*rebindDelay))
umgmt.AddStartupCallback(func() {
umgmt.StartHttpServer(fmt.Sprintf(":%v", config.Port))
umgmt.StartHttpServer(fmt.Sprintf(":%v", *port))
})
umgmt.AddStartupCallback(func() {
sighandler.SetSignalHandler(syscall.SIGTERM,
@ -211,34 +120,16 @@ func main() {
umgmt.AddCloseCallback(func() {
ts.DisallowQueries()
})
umgmt.AddShutdownCallback(func() error {
HandleGracefulShutdown()
return nil
})
umgmtSocket := fmt.Sprintf(config.UmgmtSocket, config.Port)
umgmtSocket := fmt.Sprintf("/tmp/vtocc-%08x-umgmt.sock", *port)
if umgmtErr := umgmt.ListenAndServe(umgmtSocket); umgmtErr != nil {
relog.Error("umgmt.ListenAndServe err: %v", umgmtErr)
}
relog.Info("done")
}
func unmarshalFile(name string, val interface{}) {
if name != "" {
data, err := ioutil.ReadFile(name)
if err != nil {
relog.Fatal("could not read %v: %v", val, err)
}
if err = json.Unmarshal(data, val); err != nil {
relog.Fatal("could not read %s: %v", val, err)
}
}
data, _ := json.MarshalIndent(val, "", " ")
relog.Info("config: %s\n", data)
}
type OccManager struct {
config configType
config ts.Config
dbconfig map[string]interface{}
}

Просмотреть файл

@ -0,0 +1,85 @@
// Copyright 2012, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package tabletserver
import (
"code.google.com/p/vitess/go/relog"
"encoding/json"
"flag"
"io/ioutil"
"log"
"os"
)
type Config struct {
CachePoolCap int
PoolSize int
TransactionCap int
TransactionTimeout float64
MaxResultSize int
QueryCacheSize int
SchemaReloadTime float64
QueryTimeout float64
IdleTimeout float64
}
var config Config = Config {
1000,
16,
20,
30,
10000,
5000,
30 * 60,
0,
30 * 60,
}
var dbconfig map[string]interface{} = map[string]interface{}{
"host": "localhost",
"port": 0,
"unix_socket": "",
"uname": "vt_app",
"pass": "",
"dbname": "",
"charset": "utf8",
}
var configFile = flag.String("config", "", "config file name")
var dbConfigFile = flag.String("dbconfig", "", "db config file name")
var queryLog = flag.String("querylog", "", "for testing: log all queries to this file")
// Init should be called after the main function calls flag.Parse.
func Init() (Config, map[string]interface{}) {
if *queryLog != "" {
if f, err := os.OpenFile(*queryLog, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0644); err == nil {
QueryLogger = relog.New(f, "", log.Ldate|log.Lmicroseconds, relog.DEBUG)
} else {
relog.Fatal("Error opening file %v: %v", *queryLog, err)
}
}
unmarshalFile(*configFile, &config)
unmarshalFile(*dbConfigFile, &dbconfig)
// work-around for jsonism
if v, ok := dbconfig["port"].(float64); ok {
dbconfig["port"] = int(v)
}
return config, dbconfig
}
func unmarshalFile(name string, val interface{}) {
if name != "" {
data, err := ioutil.ReadFile(name)
if err != nil {
relog.Fatal("could not read %v: %v", val, err)
}
if err = json.Unmarshal(data, val); err != nil {
relog.Fatal("could not read %s: %v", val, err)
}
}
data, _ := json.MarshalIndent(val, "", " ")
relog.Info("config: %s\n", data)
}

Просмотреть файл

@ -11,12 +11,12 @@ import (
var SqlQueryRpcService *SqlQuery
func StartQueryService(cachePoolCap, poolSize, transactionCap int, transactionTimeout float64, maxResultSize, queryCacheSize int, schemaReloadTime, queryTimeout, idleTimeout float64) {
func StartQueryService(config Config) {
if SqlQueryRpcService != nil {
relog.Warning("RPC service already up %v", SqlQueryRpcService)
return
}
SqlQueryRpcService = NewSqlQuery(cachePoolCap, poolSize, transactionCap, transactionTimeout, maxResultSize, queryCacheSize, schemaReloadTime, queryTimeout, idleTimeout)
SqlQueryRpcService = NewSqlQuery(config)
rpc.Register(SqlQueryRpcService)
}

Просмотреть файл

@ -63,17 +63,17 @@ type CacheInvalidator interface {
Delete(key string) bool
}
func NewSqlQuery(cachePoolCap, poolSize, transactionCap int, transactionTimeout float64, maxResultSize, queryCacheSize int, schemaReloadTime, queryTimeout, idleTimeout float64) *SqlQuery {
func NewSqlQuery(config Config) *SqlQuery {
self := &SqlQuery{}
self.cachePool = NewCachePool(cachePoolCap, time.Duration(queryTimeout*1e9), time.Duration(idleTimeout*1e9))
self.schemaInfo = NewSchemaInfo(queryCacheSize, time.Duration(schemaReloadTime*1e9))
self.connPool = NewConnectionPool(poolSize, time.Duration(idleTimeout*1e9))
self.cachePool = NewCachePool(config.CachePoolCap, time.Duration(config.QueryTimeout*1e9), time.Duration(config.IdleTimeout*1e9))
self.schemaInfo = NewSchemaInfo(config.QueryCacheSize, time.Duration(config.SchemaReloadTime*1e9))
self.connPool = NewConnectionPool(config.PoolSize, time.Duration(config.IdleTimeout*1e9))
self.reservedPool = NewReservedPool()
self.txPool = NewConnectionPool(transactionCap, time.Duration(idleTimeout*1e9)) // connections in pool has to be > transactionCap
self.activeTxPool = NewActiveTxPool(time.Duration(transactionTimeout * 1e9))
self.activePool = NewActivePool(time.Duration(queryTimeout*1e9), time.Duration(idleTimeout*1e9))
self.txPool = NewConnectionPool(config.TransactionCap, time.Duration(config.IdleTimeout*1e9)) // connections in pool has to be > transactionCap
self.activeTxPool = NewActiveTxPool(time.Duration(config.TransactionTimeout * 1e9))
self.activePool = NewActivePool(time.Duration(config.QueryTimeout*1e9), time.Duration(config.IdleTimeout*1e9))
self.consolidator = NewConsolidator()
self.maxResultSize = int32(maxResultSize)
self.maxResultSize = int32(config.MaxResultSize)
expvar.Publish("Voltron", stats.StrFunc(func() string { return self.statsJSON() }))
queryStats = stats.NewTimings("Queries")
stats.NewRates("QPS", queryStats, 15, 60e9)
@ -150,6 +150,14 @@ func (self *SqlQuery) checkState(sessionId int64, allowShutdown bool) {
}
}
func (self *SqlQuery) GetSessionId(dbName *string, sessionId *int64) error {
if *dbName != self.dbName {
return NewTabletError(FATAL, "db name mismatch, expecting %v, received %v", self.dbName, *dbName)
}
*sessionId = self.sessionId
return nil
}
func (self *SqlQuery) Begin(session *Session, transactionId *int64) (err error) {
defer handleError(&err)
self.checkState(session.SessionId, false)

Просмотреть файл

@ -1,3 +0,0 @@
{
"Port": 9461
}

Просмотреть файл

@ -66,7 +66,7 @@ class TestEnv(object):
self.memcached = subprocess.Popen(["memcached", "-s", self.cfg["memcache"]])
occ_args = [
vttop+"/go/cmd/vtocc/vtocc",
"-config", "occ.json",
"-port", "9461",
"-dbconfig", options.dbconfig,
"-logfile", LOGFILE,
"-querylog", QUERYLOGFILE,