Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
This commit is contained in:
Sugu Sougoumarane 2020-04-14 19:29:45 -07:00
Родитель 7adcba505c
Коммит ea22e4e67b
11 изменённых файлов: 87 добавлений и 62 удалений

Просмотреть файл

@ -98,17 +98,18 @@ messagePostponeParallelism: 4 # queryserver-config-message-postpone-c
cacheResultFields: true # enable-query-plan-field-caching
enforce_strict_trans_tables
queryserver-config-strict-table-acl
queryserver-config-enable-table-acl-dry-run
queryserver-config-acl-exempt-acl
enable-tx-throttler
tx-throttler-config
tx-throttler-healthcheck-cells
enable_transaction_limit
enable_transaction_limit_dry_run
transaction_limit_per_user
transaction_limit_by_username
transaction_limit_by_principal
transaction_limit_by_component
transaction_limit_by_subcomponent
# The following flags are currently not supported.
# enforce_strict_trans_tables
# queryserver-config-strict-table-acl
# queryserver-config-enable-table-acl-dry-run
# queryserver-config-acl-exempt-acl
# enable-tx-throttler
# tx-throttler-config
# tx-throttler-healthcheck-cells
# enable_transaction_limit
# enable_transaction_limit_dry_run
# transaction_limit_per_user
# transaction_limit_by_username
# transaction_limit_by_principal
# transaction_limit_by_component
# transaction_limit_by_subcomponent

Просмотреть файл

@ -72,13 +72,13 @@ func TestConfigVars(t *testing.T) {
val: currentConfig.Oltp.WarnRows,
}, {
tag: "QueryCacheCapacity",
val: currentConfig.QueryPlanCacheSize,
val: currentConfig.QueryCacheSize,
}, {
tag: "QueryTimeout",
val: int(currentConfig.Oltp.QueryTimeoutSeconds * 1e9),
}, {
tag: "SchemaReloadTime",
val: int(currentConfig.SchemaReloadTime * 1e9),
val: int(currentConfig.SchemaReloadIntervalSeconds * 1e9),
}, {
tag: "StreamBufferSize",
val: currentConfig.StreamBufferSize,

Просмотреть файл

@ -66,7 +66,7 @@ func NewEngine(tsv TabletService, se *schema.Engine, vs VStreamer) *Engine {
tsv: tsv,
se: se,
vs: vs,
postponeSema: sync2.NewSemaphore(tsv.Config().MessagePostponeCap, 0),
postponeSema: sync2.NewSemaphore(tsv.Config().MessagePostponeParallelism, 0),
managers: make(map[string]*messageManager),
}
}

Просмотреть файл

@ -169,14 +169,14 @@ func NewQueryEngine(env tabletenv.Env, se *schema.Engine) *QueryEngine {
env: env,
se: se,
tables: make(map[string]*schema.Table),
plans: cache.NewLRUCache(int64(config.QueryPlanCacheSize)),
plans: cache.NewLRUCache(int64(config.QueryCacheSize)),
queryRuleSources: rules.NewMap(),
}
qe.conns = connpool.New(env, "ConnPool", config.OltpReadPool)
qe.streamConns = connpool.New(env, "StreamConnPool", config.OlapReadPool)
qe.consolidatorMode = config.Consolidator
qe.enableQueryPlanFieldCaching = config.EnableQueryPlanFieldCaching
qe.enableQueryPlanFieldCaching = config.CacheResultFields
qe.consolidator = sync2.NewConsolidator()
qe.txSerializer = txserializer.New(env)
qe.streamQList = NewQueryList()
@ -203,7 +203,7 @@ func NewQueryEngine(env tabletenv.Env, se *schema.Engine) *QueryEngine {
qe.warnResultSize = sync2.NewAtomicInt64(int64(config.Oltp.WarnRows))
qe.streamBufferSize = sync2.NewAtomicInt64(int64(config.StreamBufferSize))
planbuilder.PassthroughDMLs = config.PassthroughDMLs
planbuilder.PassthroughDMLs = config.PassthroughDML
qe.accessCheckerLogger = logutil.NewThrottledLogger("accessChecker", 1*time.Second)

Просмотреть файл

@ -272,9 +272,9 @@ func TestStatsURL(t *testing.T) {
qe.handleHTTPQueryRules(response, request)
}
func newTestQueryEngine(queryPlanCacheSize int, idleTimeout time.Duration, strict bool, dbcfgs *dbconfigs.DBConfigs) *QueryEngine {
func newTestQueryEngine(queryCacheSize int, idleTimeout time.Duration, strict bool, dbcfgs *dbconfigs.DBConfigs) *QueryEngine {
config := tabletenv.NewDefaultConfig()
config.QueryPlanCacheSize = queryPlanCacheSize
config.QueryCacheSize = queryCacheSize
config.OltpReadPool.IdleTimeoutSeconds = int(idleTimeout / 1e9)
config.OlapReadPool.IdleTimeoutSeconds = int(idleTimeout / 1e9)
config.TxPool.IdleTimeoutSeconds = int(idleTimeout / 1e9)

Просмотреть файл

@ -67,7 +67,7 @@ type Engine struct {
// NewEngine creates a new Engine.
func NewEngine(env tabletenv.Env) *Engine {
reloadTime := time.Duration(env.Config().SchemaReloadTime * 1e9)
reloadTime := time.Duration(env.Config().SchemaReloadIntervalSeconds * 1e9)
se := &Engine{
env: env,
// We need only one connection because the reloader is

Просмотреть файл

@ -298,10 +298,10 @@ func TestStatsURL(t *testing.T) {
se.handleDebugSchema(response, request)
}
func newEngine(queryPlanCacheSize int, reloadTime time.Duration, idleTimeout time.Duration, strict bool, db *fakesqldb.DB) *Engine {
func newEngine(queryCacheSize int, reloadTime time.Duration, idleTimeout time.Duration, strict bool, db *fakesqldb.DB) *Engine {
config := tabletenv.NewDefaultConfig()
config.QueryPlanCacheSize = queryPlanCacheSize
config.SchemaReloadTime = float64(reloadTime) / 1e9
config.QueryCacheSize = queryCacheSize
config.SchemaReloadIntervalSeconds = int(reloadTime) / 1e9
config.OltpReadPool.IdleTimeoutSeconds = int(idleTimeout / 1e9)
config.OlapReadPool.IdleTimeoutSeconds = int(idleTimeout / 1e9)
config.TxPool.IdleTimeoutSeconds = int(idleTimeout / 1e9)

Просмотреть файл

@ -80,19 +80,19 @@ func init() {
flag.IntVar(&deprecatedMessagePoolPrefillParallelism, "queryserver-config-message-conn-pool-prefill-parallelism", 0, "DEPRECATED: Unused.")
flag.IntVar(&currentConfig.TxPool.Size, "queryserver-config-transaction-cap", defaultConfig.TxPool.Size, "query server transaction cap is the maximum number of transactions allowed to happen at any given point of a time for a single vttablet. E.g. by setting transaction cap to 100, there are at most 100 transactions will be processed by a vttablet and the 101th transaction will be blocked (and fail if it cannot get connection within specified timeout)")
flag.IntVar(&currentConfig.TxPool.PrefillParallelism, "queryserver-config-transaction-prefill-parallelism", defaultConfig.TxPool.PrefillParallelism, "query server transaction prefill parallelism, a non-zero value will prefill the pool using the specified parallism.")
flag.IntVar(&currentConfig.MessagePostponeCap, "queryserver-config-message-postpone-cap", defaultConfig.MessagePostponeCap, "query server message postpone cap is the maximum number of messages that can be postponed at any given time. Set this number to substantially lower than transaction cap, so that the transaction pool isn't exhausted by the message subsystem.")
flag.IntVar(&currentConfig.MessagePostponeParallelism, "queryserver-config-message-postpone-cap", defaultConfig.MessagePostponeParallelism, "query server message postpone cap is the maximum number of messages that can be postponed at any given time. Set this number to substantially lower than transaction cap, so that the transaction pool isn't exhausted by the message subsystem.")
flag.IntVar(&deprecatedFoundRowsPoolSize, "client-found-rows-pool-size", 0, "DEPRECATED: queryserver-config-transaction-cap will be used instead.")
flag.IntVar(&currentConfig.Oltp.TxTimeoutSeconds, "queryserver-config-transaction-timeout", defaultConfig.Oltp.TxTimeoutSeconds, "query server transaction timeout (in seconds), a transaction will be killed if it takes longer than this value")
flag.Float64Var(&currentConfig.TxShutDownGracePeriod, "transaction_shutdown_grace_period", defaultConfig.TxShutDownGracePeriod, "how long to wait (in seconds) for transactions to complete during graceful shutdown.")
flag.IntVar(&currentConfig.ShutdownGracePeriodSeconds, "transaction_shutdown_grace_period", defaultConfig.ShutdownGracePeriodSeconds, "how long to wait (in seconds) for transactions to complete during graceful shutdown.")
flag.IntVar(&currentConfig.Oltp.MaxRows, "queryserver-config-max-result-size", defaultConfig.Oltp.MaxRows, "query server max result size, maximum number of rows allowed to return from vttablet for non-streaming queries.")
flag.IntVar(&currentConfig.Oltp.WarnRows, "queryserver-config-warn-result-size", defaultConfig.Oltp.WarnRows, "query server result size warning threshold, warn if number of rows returned from vttablet for non-streaming queries exceeds this")
flag.IntVar(&deprecatedMaxDMLRows, "queryserver-config-max-dml-rows", 0, "query server max dml rows per statement, maximum number of rows allowed to return at a time for an update or delete with either 1) an equality where clauses on primary keys, or 2) a subselect statement. For update and delete statements in above two categories, vttablet will split the original query into multiple small queries based on this configuration value. ")
flag.BoolVar(&currentConfig.PassthroughDMLs, "queryserver-config-passthrough-dmls", defaultConfig.PassthroughDMLs, "query server pass through all dml statements without rewriting")
flag.BoolVar(&currentConfig.PassthroughDML, "queryserver-config-passthrough-dmls", defaultConfig.PassthroughDML, "query server pass through all dml statements without rewriting")
flag.BoolVar(&deprecateAllowUnsafeDMLs, "queryserver-config-allowunsafe-dmls", false, "deprecated")
flag.IntVar(&currentConfig.StreamBufferSize, "queryserver-config-stream-buffer-size", defaultConfig.StreamBufferSize, "query server stream buffer size, the maximum number of bytes sent from vttablet for each stream call. It's recommended to keep this value in sync with vtgate's stream_buffer_size.")
flag.IntVar(&currentConfig.QueryPlanCacheSize, "queryserver-config-query-cache-size", defaultConfig.QueryPlanCacheSize, "query server query cache size, maximum number of queries to be cached. vttablet analyzes every incoming query and generate a query plan, these plans are being cached in a lru cache. This config controls the capacity of the lru cache.")
flag.Float64Var(&currentConfig.SchemaReloadTime, "queryserver-config-schema-reload-time", defaultConfig.SchemaReloadTime, "query server schema reload time, how often vttablet reloads schemas from underlying MySQL instance in seconds. vttablet keeps table schemas in its own memory and periodically refreshes it from MySQL. This config controls the reload time.")
flag.IntVar(&currentConfig.QueryCacheSize, "queryserver-config-query-cache-size", defaultConfig.QueryCacheSize, "query server query cache size, maximum number of queries to be cached. vttablet analyzes every incoming query and generate a query plan, these plans are being cached in a lru cache. This config controls the capacity of the lru cache.")
flag.IntVar(&currentConfig.SchemaReloadIntervalSeconds, "queryserver-config-schema-reload-time", defaultConfig.SchemaReloadIntervalSeconds, "query server schema reload time, how often vttablet reloads schemas from underlying MySQL instance in seconds. vttablet keeps table schemas in its own memory and periodically refreshes it from MySQL. This config controls the reload time.")
flag.IntVar(&currentConfig.Oltp.QueryTimeoutSeconds, "queryserver-config-query-timeout", defaultConfig.Oltp.QueryTimeoutSeconds, "query server query timeout (in seconds), this is the query timeout in vttablet side. If a query takes more than this timeout, it will be killed.")
flag.IntVar(&currentConfig.OltpReadPool.TimeoutSeconds, "queryserver-config-query-pool-timeout", defaultConfig.OltpReadPool.TimeoutSeconds, "query server query pool timeout (in seconds), it is how long vttablet waits for a connection from the query pool. If set to 0 (default) then the overall query timeout is used instead.")
flag.IntVar(&currentConfig.TxPool.TimeoutSeconds, "queryserver-config-txpool-timeout", defaultConfig.TxPool.TimeoutSeconds, "query server transaction pool timeout, it is how long vttablet waits if tx pool is full")
@ -134,7 +134,7 @@ func init() {
flag.BoolVar(&currentConfig.EnforceStrictTransTables, "enforce_strict_trans_tables", defaultConfig.EnforceStrictTransTables, "If true, vttablet requires MySQL to run with STRICT_TRANS_TABLES or STRICT_ALL_TABLES on. It is recommended to not turn this flag off. Otherwise MySQL may alter your supplied values before saving them to the database.")
flag.BoolVar(&enableConsolidator, "enable-consolidator", true, "This option enables the query consolidator.")
flag.BoolVar(&enableConsolidatorReplicas, "enable-consolidator-replicas", false, "This option enables the query consolidator only on replicas.")
flag.BoolVar(&currentConfig.EnableQueryPlanFieldCaching, "enable-query-plan-field-caching", defaultConfig.EnableQueryPlanFieldCaching, "This option fetches & caches fields (columns) when storing query plans")
flag.BoolVar(&currentConfig.CacheResultFields, "enable-query-plan-field-caching", defaultConfig.CacheResultFields, "This option fetches & caches fields (columns) when storing query plans")
}
// Init must be called after flag.Parse, and before doing any other operations.
@ -191,18 +191,19 @@ type TabletConfig struct {
HotRowProtection HotRowProtectionConfig `json:"hotRowProtection,omitempty"`
Consolidator string `json:"consolidator,omitempty"`
HeartbeatIntervalMilliseconds int `json:"heartbeatIntervalMilliseconds,omitempty"`
ShutdownGracePeriodSeconds int `json:"shutdownGracePeriodSeconds,omitempty"`
PassthroughDML bool `json:"passthroughDML,omitempty"`
StreamBufferSize int `json:"streamBufferSize,omitempty"`
QueryCacheSize int `json:"queryCacheSize,omitempty"`
SchemaReloadIntervalSeconds int `json:"schemaReloadIntervalSeconds,omitempty"`
WatchReplication bool `json:"watchReplication,omitempty"`
TerseErrors bool `json:"terseErrors,omitempty"`
MessagePostponeParallelism int `json:"messagePostponeParallelism,omitempty"`
CacheResultFields bool `json:"cacheResultFields,omitempty"`
MessagePostponeCap int `json:"-"`
TxShutDownGracePeriod float64 `json:"-"`
PassthroughDMLs bool `json:"-"`
StreamBufferSize int `json:"-"`
QueryPlanCacheSize int `json:"-"`
SchemaReloadTime float64 `json:"-"`
StrictTableACL bool `json:"-"`
TerseErrors bool `json:"-"`
EnableTableACLDryRun bool `json:"-"`
TableACLExemptACL string `json:"-"`
WatchReplication bool `json:"-"`
TwoPCEnable bool `json:"-"`
TwoPCCoordinatorAddress string `json:"-"`
TwoPCAbandonAge float64 `json:"-"`
@ -213,8 +214,7 @@ type TabletConfig struct {
TransactionLimitConfig `json:"-"`
EnforceStrictTransTables bool `json:"-"`
EnableQueryPlanFieldCaching bool `json:"-"`
EnforceStrictTransTables bool `json:"-"`
}
// ConnPoolConfig contains the config for a conn pool.
@ -356,21 +356,12 @@ var defaultConfig = TabletConfig{
// of them ready in MySQL and profit from a pipelining effect.
MaxConcurrency: 5,
},
Consolidator: Enable,
MessagePostponeCap: 4,
TxShutDownGracePeriod: 0,
PassthroughDMLs: false,
QueryPlanCacheSize: 5000,
SchemaReloadTime: 30 * 60,
StreamBufferSize: 32 * 1024,
StrictTableACL: false,
TerseErrors: false,
EnableTableACLDryRun: false,
TableACLExemptACL: "",
WatchReplication: false,
TwoPCEnable: false,
TwoPCCoordinatorAddress: "",
TwoPCAbandonAge: 0,
Consolidator: Enable,
StreamBufferSize: 32 * 1024,
QueryCacheSize: 5000,
SchemaReloadIntervalSeconds: 30 * 60,
MessagePostponeParallelism: 4,
CacheResultFields: true,
EnableTxThrottler: false,
TxThrottlerConfig: defaultTxThrottlerConfig(),
@ -378,8 +369,7 @@ var defaultConfig = TabletConfig{
TransactionLimitConfig: defaultTransactionLimitConfig(),
EnforceStrictTransTables: true,
EnableQueryPlanFieldCaching: true,
EnforceStrictTransTables: true,
}
// defaultTxThrottlerConfig formats the default throttlerdata.Configuration

Просмотреть файл

@ -61,3 +61,37 @@ txPool: {}
require.NoError(t, err)
assert.Equal(t, cfg, gotCfg)
}
func TestDefaultConfig(t *testing.T) {
gotBytes, err := yaml.Marshal(NewDefaultConfig())
require.NoError(t, err)
want := `cacheResultFields: true
consolidator: enable
hotRowProtection:
maxConcurrency: 5
maxGlobalQueueSize: 1000
maxQueueSize: 20
mode: disable
messagePostponeParallelism: 4
olapReadPool:
idleTimeoutSeconds: 1800
size: 200
oltp:
maxRpws: 10000
queryTimeoutSeconds: 30
txTimeoutSeconds: 30
oltpReadPool:
idleTimeoutSeconds: 1800
maxWaiters: 5000
size: 16
queryCacheSize: 5000
schemaReloadIntervalSeconds: 1800
streamBufferSize: 32768
txPool:
idleTimeoutSeconds: 1800
maxWaiters: 5000
size: 20
timeoutSeconds: 1
`
assert.Equal(t, want, string(gotBytes))
}

Просмотреть файл

@ -100,7 +100,7 @@ func NewTxEngine(env tabletenv.Env) *TxEngine {
config := env.Config()
te := &TxEngine{
env: env,
shutdownGracePeriod: time.Duration(config.TxShutDownGracePeriod * 1e9),
shutdownGracePeriod: time.Duration(config.ShutdownGracePeriodSeconds * 1e9),
}
limiter := txlimiter.New(env)
te.txPool = NewTxPool(env, limiter)

Просмотреть файл

@ -40,7 +40,7 @@ func TestTxEngineClose(t *testing.T) {
config := tabletenv.NewDefaultConfig()
config.TxPool.Size = 10
config.Oltp.TxTimeoutSeconds = 1
config.TxShutDownGracePeriod = 0
config.ShutdownGracePeriodSeconds = 0
te := NewTxEngine(tabletenv.NewTestEnv(config, dbcfgs, "TabletServerTest"))
// Normal close.
@ -463,7 +463,7 @@ func setupTxEngine(db *fakesqldb.DB) *TxEngine {
config := tabletenv.NewDefaultConfig()
config.TxPool.Size = 10
config.Oltp.TxTimeoutSeconds = 1
config.TxShutDownGracePeriod = 0
config.ShutdownGracePeriodSeconds = 0
te := NewTxEngine(tabletenv.NewTestEnv(config, dbcfgs, "TabletServerTest"))
return te
}