зеркало из https://github.com/github/vitess-gh.git
yaml: start transitioning: PoolSize
Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
This commit is contained in:
Родитель
5c9eaf12ca
Коммит
9e29feb390
20
Makefile
20
Makefile
|
@ -144,28 +144,12 @@ java_test:
|
|||
install_protoc-gen-go:
|
||||
go install github.com/golang/protobuf/protoc-gen-go
|
||||
|
||||
# Find protoc compiler.
|
||||
# NOTE: We are *not* using the "protoc" binary (as suggested by the grpc Go
|
||||
# quickstart for example). Instead, we run "protoc" via the Python
|
||||
# wrapper script which is provided by the "grpcio-tools" PyPi package.
|
||||
# (The package includes the compiler as library, but not as binary.
|
||||
# Therefore, we have to use the wrapper script they provide.)
|
||||
ifneq ($(wildcard $(VTROOT)/dist/grpc/usr/local/lib/python2.7/site-packages/grpc_tools/protoc.py),)
|
||||
# IMPORTANT: The next line must not be indented.
|
||||
PROTOC_COMMAND := python -m grpc_tools.protoc
|
||||
endif
|
||||
|
||||
PROTO_SRCS = $(wildcard proto/*.proto)
|
||||
PROTO_SRC_NAMES = $(basename $(notdir $(PROTO_SRCS)))
|
||||
PROTO_GO_OUTS = $(foreach name, $(PROTO_SRC_NAMES), go/vt/proto/$(name)/$(name).pb.go)
|
||||
|
||||
# This rule rebuilds all the go and python files from the proto definitions for gRPC.
|
||||
proto: proto_banner $(PROTO_GO_OUTS)
|
||||
|
||||
proto_banner:
|
||||
ifeq (,$(PROTOC_COMMAND))
|
||||
$(error "Cannot find protoc compiler. Did bootstrap.sh succeed, and did you execute 'source dev.env'?")
|
||||
endif
|
||||
# This rule rebuilds all the go files from the proto definitions for gRPC.
|
||||
proto: $(PROTO_GO_OUTS)
|
||||
|
||||
ifndef NOBANNER
|
||||
echo $$(date): Compiling proto definitions
|
||||
|
|
|
@ -4,7 +4,6 @@ init:
|
|||
dbName: # init_db_name_override
|
||||
keyspace: # init_keyspace
|
||||
shard: # init_shard
|
||||
tags: {} # init_tags
|
||||
tabletType: # init_tablet_type
|
||||
timeoutSeconds: 60 # init_timeout
|
||||
|
||||
|
@ -12,6 +11,7 @@ db:
|
|||
socket: # db_socket
|
||||
host: # db_host
|
||||
port: 0 # db_port
|
||||
charSet: # db_charset
|
||||
flags: 0 # db_flags
|
||||
flavor: # db_flavor
|
||||
sslCa: # db_ssl_ca
|
||||
|
|
2
go.mod
2
go.mod
|
@ -157,11 +157,13 @@ require (
|
|||
gopkg.in/mgo.v2 v2.0.0-20160818020120-3f83fa500528 // indirect
|
||||
gopkg.in/ory-am/dockertest.v3 v3.3.4 // indirect
|
||||
gopkg.in/square/go-jose.v2 v2.3.1 // indirect
|
||||
gopkg.in/yaml.v2 v2.2.8
|
||||
honnef.co/go/tools v0.0.1-2019.2.3
|
||||
k8s.io/apiextensions-apiserver v0.17.3
|
||||
k8s.io/apimachinery v0.17.3
|
||||
k8s.io/client-go v0.17.3
|
||||
mvdan.cc/unparam v0.0.0-20191111180625-960b1ec0f2c2 // indirect
|
||||
sigs.k8s.io/yaml v1.1.0
|
||||
sourcegraph.com/sqs/pbtypes v1.0.0 // indirect
|
||||
vitess.io/vitess/examples/are-you-alive v0.0.0-20200302220708-6b7695375ce9 // indirect
|
||||
)
|
||||
|
|
|
@ -50,67 +50,67 @@ func TestConfigVars(t *testing.T) {
|
|||
vars := framework.DebugVars()
|
||||
cases := []struct {
|
||||
tag string
|
||||
val int
|
||||
val int64
|
||||
}{{
|
||||
tag: "ConnPoolAvailable",
|
||||
val: tabletenv.Config.PoolSize,
|
||||
val: tabletenv.Config.OltpReadPool.Size,
|
||||
}, {
|
||||
tag: "ConnPoolCapacity",
|
||||
val: tabletenv.Config.PoolSize,
|
||||
val: tabletenv.Config.OltpReadPool.Size,
|
||||
}, {
|
||||
tag: "ConnPoolIdleTimeout",
|
||||
val: int(tabletenv.Config.IdleTimeout * 1e9),
|
||||
val: int64(tabletenv.Config.IdleTimeout * 1e9),
|
||||
}, {
|
||||
tag: "ConnPoolMaxCap",
|
||||
val: tabletenv.Config.PoolSize,
|
||||
val: tabletenv.Config.OltpReadPool.Size,
|
||||
}, {
|
||||
tag: "MaxResultSize",
|
||||
val: tabletenv.Config.MaxResultSize,
|
||||
val: int64(tabletenv.Config.MaxResultSize),
|
||||
}, {
|
||||
tag: "WarnResultSize",
|
||||
val: tabletenv.Config.WarnResultSize,
|
||||
val: int64(tabletenv.Config.WarnResultSize),
|
||||
}, {
|
||||
tag: "QueryCacheCapacity",
|
||||
val: tabletenv.Config.QueryPlanCacheSize,
|
||||
val: int64(tabletenv.Config.QueryPlanCacheSize),
|
||||
}, {
|
||||
tag: "QueryTimeout",
|
||||
val: int(tabletenv.Config.QueryTimeout * 1e9),
|
||||
val: int64(tabletenv.Config.QueryTimeout * 1e9),
|
||||
}, {
|
||||
tag: "SchemaReloadTime",
|
||||
val: int(tabletenv.Config.SchemaReloadTime * 1e9),
|
||||
val: int64(tabletenv.Config.SchemaReloadTime * 1e9),
|
||||
}, {
|
||||
tag: "StreamBufferSize",
|
||||
val: tabletenv.Config.StreamBufferSize,
|
||||
val: int64(tabletenv.Config.StreamBufferSize),
|
||||
}, {
|
||||
tag: "StreamConnPoolAvailable",
|
||||
val: tabletenv.Config.StreamPoolSize,
|
||||
val: int64(tabletenv.Config.StreamPoolSize),
|
||||
}, {
|
||||
tag: "StreamConnPoolCapacity",
|
||||
val: tabletenv.Config.StreamPoolSize,
|
||||
val: int64(tabletenv.Config.StreamPoolSize),
|
||||
}, {
|
||||
tag: "StreamConnPoolIdleTimeout",
|
||||
val: int(tabletenv.Config.IdleTimeout * 1e9),
|
||||
val: int64(tabletenv.Config.IdleTimeout * 1e9),
|
||||
}, {
|
||||
tag: "StreamConnPoolMaxCap",
|
||||
val: tabletenv.Config.StreamPoolSize,
|
||||
val: int64(tabletenv.Config.StreamPoolSize),
|
||||
}, {
|
||||
tag: "TransactionPoolAvailable",
|
||||
val: tabletenv.Config.TransactionCap,
|
||||
val: int64(tabletenv.Config.TransactionCap),
|
||||
}, {
|
||||
tag: "TransactionPoolCapacity",
|
||||
val: tabletenv.Config.TransactionCap,
|
||||
val: int64(tabletenv.Config.TransactionCap),
|
||||
}, {
|
||||
tag: "TransactionPoolIdleTimeout",
|
||||
val: int(tabletenv.Config.IdleTimeout * 1e9),
|
||||
val: int64(tabletenv.Config.IdleTimeout * 1e9),
|
||||
}, {
|
||||
tag: "TransactionPoolMaxCap",
|
||||
val: tabletenv.Config.TransactionCap,
|
||||
val: int64(tabletenv.Config.TransactionCap),
|
||||
}, {
|
||||
tag: "TransactionTimeout",
|
||||
val: int(tabletenv.Config.TransactionTimeout * 1e9),
|
||||
val: int64(tabletenv.Config.TransactionTimeout * 1e9),
|
||||
}}
|
||||
for _, tcase := range cases {
|
||||
if err := verifyIntValue(vars, tcase.tag, tcase.val); err != nil {
|
||||
if err := verifyIntValue(vars, tcase.tag, int(tcase.val)); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -179,8 +179,7 @@ func NewQueryEngine(env tabletenv.Env, se *schema.Engine) *QueryEngine {
|
|||
queryPoolWaiterCap: sync2.NewAtomicInt64(int64(config.QueryPoolWaiterCap)),
|
||||
}
|
||||
|
||||
qe.conns = connpool.New(env, "ConnPool", config.PoolSize, config.PoolPrefillParallelism, time.Duration(config.QueryPoolTimeout*1e9), time.Duration(config.IdleTimeout*1e9))
|
||||
|
||||
qe.conns = connpool.New(env, "ConnPool", int(config.OltpReadPool.Size), config.PoolPrefillParallelism, time.Duration(config.QueryPoolTimeout*1e9), time.Duration(config.IdleTimeout*1e9))
|
||||
qe.streamConns = connpool.New(env, "StreamConnPool", config.StreamPoolSize, config.StreamPoolPrefillParallelism, time.Duration(config.QueryPoolTimeout*1e9), time.Duration(config.IdleTimeout*1e9))
|
||||
qe.enableConsolidator = config.EnableConsolidator
|
||||
qe.enableConsolidatorReplicas = config.EnableConsolidatorReplicas
|
||||
|
|
|
@ -1041,7 +1041,7 @@ const (
|
|||
// newTestQueryExecutor uses a package level variable testTabletServer defined in tabletserver_test.go
|
||||
func newTestTabletServer(ctx context.Context, flags executorFlags, db *fakesqldb.DB) *TabletServer {
|
||||
config := tabletenv.DefaultQsConfig
|
||||
config.PoolSize = 100
|
||||
config.OltpReadPool.Size = 100
|
||||
if flags&smallTxPool > 0 {
|
||||
config.TransactionCap = 3
|
||||
} else {
|
||||
|
|
|
@ -43,7 +43,7 @@ var (
|
|||
StatsLogger = streamlog.New("TabletServer", 50)
|
||||
|
||||
// Placeholder for deprecated variable.
|
||||
// TODO(sougou): deprecate the flag after release 7.0.
|
||||
// TODO(sougou): deprecate the flags after release 7.0.
|
||||
deprecatedMessagePoolPrefillParallelism int
|
||||
deprecatedAutocommit bool
|
||||
deprecateAllowUnsafeDMLs bool
|
||||
|
@ -53,7 +53,7 @@ var (
|
|||
)
|
||||
|
||||
func init() {
|
||||
flag.IntVar(&Config.PoolSize, "queryserver-config-pool-size", DefaultQsConfig.PoolSize, "query server read pool size, connection pool is used by regular queries (non streaming, not in a transaction)")
|
||||
flag.Int64Var(&Config.OltpReadPool.Size, "queryserver-config-pool-size", DefaultQsConfig.OltpReadPool.Size, "query server read pool size, connection pool is used by regular queries (non streaming, not in a transaction)")
|
||||
flag.IntVar(&Config.PoolPrefillParallelism, "queryserver-config-pool-prefill-parallelism", DefaultQsConfig.PoolPrefillParallelism, "query server read pool prefill parallelism, a non-zero value will prefill the pool using the specified parallism.")
|
||||
flag.IntVar(&Config.StreamPoolSize, "queryserver-config-stream-pool-size", DefaultQsConfig.StreamPoolSize, "query server stream connection pool size, stream pool is used by stream queries: queries that return results to client in a streaming fashion")
|
||||
flag.IntVar(&Config.StreamPoolPrefillParallelism, "queryserver-config-stream-pool-prefill-parallelism", DefaultQsConfig.StreamPoolPrefillParallelism, "query server stream pool prefill parallelism, a non-zero value will prefill the pool using the specified parallelism")
|
||||
|
@ -138,56 +138,66 @@ func Init() {
|
|||
|
||||
// TabletConfig contains all the configuration for query service
|
||||
type TabletConfig struct {
|
||||
PoolSize int
|
||||
PoolPrefillParallelism int
|
||||
StreamPoolSize int
|
||||
StreamPoolPrefillParallelism int
|
||||
TransactionCap int
|
||||
MessagePostponeCap int
|
||||
FoundRowsPoolSize int
|
||||
TxPoolPrefillParallelism int
|
||||
TransactionTimeout float64
|
||||
TxShutDownGracePeriod float64
|
||||
MaxResultSize int
|
||||
WarnResultSize int
|
||||
PassthroughDMLs bool
|
||||
StreamBufferSize int
|
||||
QueryPlanCacheSize int
|
||||
SchemaReloadTime float64
|
||||
QueryTimeout float64
|
||||
QueryPoolTimeout float64
|
||||
TxPoolTimeout float64
|
||||
IdleTimeout float64
|
||||
QueryPoolWaiterCap int
|
||||
TxPoolWaiterCap int
|
||||
StrictTableACL bool
|
||||
TerseErrors bool
|
||||
EnableTableACLDryRun bool
|
||||
TableACLExemptACL string
|
||||
WatchReplication bool
|
||||
TwoPCEnable bool
|
||||
TwoPCCoordinatorAddress string
|
||||
TwoPCAbandonAge float64
|
||||
OltpReadPool ConnPoolConfig `json:"oltpReadPool,omitempty"`
|
||||
PoolSize int `json:"-"`
|
||||
PoolPrefillParallelism int `json:"-"`
|
||||
StreamPoolSize int `json:"-"`
|
||||
StreamPoolPrefillParallelism int `json:"-"`
|
||||
TransactionCap int `json:"-"`
|
||||
MessagePostponeCap int `json:"-"`
|
||||
FoundRowsPoolSize int `json:"-"`
|
||||
TxPoolPrefillParallelism int `json:"-"`
|
||||
TransactionTimeout float64 `json:"-"`
|
||||
TxShutDownGracePeriod float64 `json:"-"`
|
||||
MaxResultSize int `json:"-"`
|
||||
WarnResultSize int `json:"-"`
|
||||
PassthroughDMLs bool `json:"-"`
|
||||
StreamBufferSize int `json:"-"`
|
||||
QueryPlanCacheSize int `json:"-"`
|
||||
SchemaReloadTime float64 `json:"-"`
|
||||
QueryTimeout float64 `json:"-"`
|
||||
QueryPoolTimeout float64 `json:"-"`
|
||||
TxPoolTimeout float64 `json:"-"`
|
||||
IdleTimeout float64 `json:"-"`
|
||||
QueryPoolWaiterCap int `json:"-"`
|
||||
TxPoolWaiterCap int `json:"-"`
|
||||
StrictTableACL bool `json:"-"`
|
||||
TerseErrors bool `json:"-"`
|
||||
EnableTableACLDryRun bool `json:"-"`
|
||||
TableACLExemptACL string `json:"-"`
|
||||
WatchReplication bool `json:"-"`
|
||||
TwoPCEnable bool `json:"-"`
|
||||
TwoPCCoordinatorAddress string `json:"-"`
|
||||
TwoPCAbandonAge float64 `json:"-"`
|
||||
|
||||
EnableTxThrottler bool
|
||||
TxThrottlerConfig string
|
||||
TxThrottlerHealthCheckCells []string
|
||||
EnableTxThrottler bool `json:"-"`
|
||||
TxThrottlerConfig string `json:"-"`
|
||||
TxThrottlerHealthCheckCells []string `json:"-"`
|
||||
|
||||
EnableHotRowProtection bool
|
||||
EnableHotRowProtectionDryRun bool
|
||||
HotRowProtectionMaxQueueSize int
|
||||
HotRowProtectionMaxGlobalQueueSize int
|
||||
HotRowProtectionConcurrentTransactions int
|
||||
EnableHotRowProtection bool `json:"-"`
|
||||
EnableHotRowProtectionDryRun bool `json:"-"`
|
||||
HotRowProtectionMaxQueueSize int `json:"-"`
|
||||
HotRowProtectionMaxGlobalQueueSize int `json:"-"`
|
||||
HotRowProtectionConcurrentTransactions int `json:"-"`
|
||||
|
||||
TransactionLimitConfig
|
||||
TransactionLimitConfig `json:"-"`
|
||||
|
||||
HeartbeatEnable bool
|
||||
HeartbeatInterval time.Duration
|
||||
HeartbeatEnable bool `json:"-"`
|
||||
HeartbeatInterval time.Duration `json:"-"`
|
||||
|
||||
EnforceStrictTransTables bool
|
||||
EnableConsolidator bool
|
||||
EnableConsolidatorReplicas bool
|
||||
EnableQueryPlanFieldCaching bool
|
||||
EnforceStrictTransTables bool `json:"-"`
|
||||
EnableConsolidator bool `json:"-"`
|
||||
EnableConsolidatorReplicas bool `json:"-"`
|
||||
EnableQueryPlanFieldCaching bool `json:"-"`
|
||||
}
|
||||
|
||||
// ConnPoolConfig contains the conig parameters for a conn pool.
|
||||
type ConnPoolConfig struct {
|
||||
Size int64 `json:"size,omitempty"`
|
||||
TimeoutSeconds int64 `json:"timeoutSeconds,omitempty"`
|
||||
IdleTimeoutSeconds int64 `json:"idleTimeoutSeconds,omitempty"`
|
||||
PrefillParallelism int64 `json:"prefillParallelism,omitempty"`
|
||||
MaxWaiters int64 `json:"maxWaiters,omitempty"`
|
||||
}
|
||||
|
||||
// TransactionLimitConfig captures configuration of transaction pool slots
|
||||
|
@ -210,7 +220,9 @@ type TransactionLimitConfig struct {
|
|||
// great (the overhead makes the final packets on the wire about twice
|
||||
// bigger than this).
|
||||
var DefaultQsConfig = TabletConfig{
|
||||
PoolSize: 16,
|
||||
OltpReadPool: ConnPoolConfig{
|
||||
Size: 16,
|
||||
},
|
||||
PoolPrefillParallelism: 0,
|
||||
StreamPoolSize: 200,
|
||||
StreamPoolPrefillParallelism: 0,
|
||||
|
|
|
@ -0,0 +1,59 @@
|
|||
/*
|
||||
Copyright 2020 The Vitess Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package tabletenv
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"sigs.k8s.io/yaml"
|
||||
)
|
||||
|
||||
func TestConfigParse(t *testing.T) {
|
||||
cfg := TabletConfig{
|
||||
OltpReadPool: ConnPoolConfig{
|
||||
Size: 16,
|
||||
TimeoutSeconds: 10,
|
||||
IdleTimeoutSeconds: 20,
|
||||
PrefillParallelism: 30,
|
||||
MaxWaiters: 40,
|
||||
},
|
||||
}
|
||||
gotBytes, err := yaml.Marshal(&cfg)
|
||||
require.NoError(t, err)
|
||||
wantBytes := `oltpReadPool:
|
||||
idleTimeoutSeconds: 20
|
||||
maxWaiters: 40
|
||||
prefillParallelism: 30
|
||||
size: 16
|
||||
timeoutSeconds: 10
|
||||
`
|
||||
assert.Equal(t, wantBytes, string(gotBytes))
|
||||
|
||||
// Make sure TimeoutSeconds doesn't get overwritten.
|
||||
inBytes := []byte(`oltpReadPool:
|
||||
size: 16
|
||||
idleTimeoutSeconds: 20
|
||||
prefillParallelism: 30
|
||||
maxWaiters: 40
|
||||
`)
|
||||
gotCfg := cfg
|
||||
err = yaml.Unmarshal(inBytes, &gotCfg)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, cfg, gotCfg)
|
||||
}
|
Загрузка…
Ссылка в новой задаче