зеркало из https://github.com/github/vitess-gh.git
throttler: Split the throttler configuration into more fine-grained fields.
min_duration_between_changes_sec was split into separate "increases" and "decreases" fields. This is required because an insert test needs to wait longer than a decrease test. Additionally, there is a new field "spread_backlog_across_sec". Before this change, the value of "min_duration_between_changes_sec" was used for this.
This commit is contained in:
Родитель
fad57aa25a
Коммит
7e53114818
|
@ -100,8 +100,8 @@ type Configuration struct {
|
|||
// max_replication_lag_sec is meant as a last resort.
|
||||
// By default, the module tries to find out the system maximum capacity while
|
||||
// trying to keep the replication lag around "target_replication_lag_sec".
|
||||
// Usually, we'll wait min_duration_between_changes_sec to see the effect of a
|
||||
// throttler rate change on the replication lag.
|
||||
// Usually, we'll wait min_duration_between_(increases|decreases)_sec to see
|
||||
// the effect of a throttler rate change on the replication lag.
|
||||
// But if the lag goes above this field's value we will go into an "emergency"
|
||||
// state and throttle more aggressively (see "emergency_decrease" below).
|
||||
// This is the only way to ensure that the system will recover.
|
||||
|
@ -121,18 +121,32 @@ type Configuration struct {
|
|||
// if the observed replication lag is above "max_replication_lag_sec".
|
||||
// E.g. 0.50 decreases the current rate by 50%.
|
||||
EmergencyDecrease float64 `protobuf:"fixed64,5,opt,name=emergency_decrease,json=emergencyDecrease" json:"emergency_decrease,omitempty"`
|
||||
// min_duration_between_changes_sec specifies how long we'll wait for the last
|
||||
// rate increase or decrease to have an effect on the system.
|
||||
MinDurationBetweenChangesSec int64 `protobuf:"varint,6,opt,name=min_duration_between_changes_sec,json=minDurationBetweenChangesSec" json:"min_duration_between_changes_sec,omitempty"`
|
||||
// min_duration_between_increases_sec specifies how long we'll wait at least
|
||||
// for the last rate increase to have an effect on the system.
|
||||
MinDurationBetweenIncreasesSec int64 `protobuf:"varint,6,opt,name=min_duration_between_increases_sec,json=minDurationBetweenIncreasesSec" json:"min_duration_between_increases_sec,omitempty"`
|
||||
// max_duration_between_increases_sec specifies how long we'll wait at most
|
||||
// for the last rate increase to have an effect on the system.
|
||||
MaxDurationBetweenIncreasesSec int64 `protobuf:"varint,7,opt,name=max_duration_between_increases_sec,json=maxDurationBetweenIncreasesSec" json:"max_duration_between_increases_sec,omitempty"`
|
||||
// min_duration_between_decreases_sec specifies how long we'll wait at least
|
||||
// for the last rate decrease to have an effect on the system.
|
||||
MinDurationBetweenDecreasesSec int64 `protobuf:"varint,8,opt,name=min_duration_between_decreases_sec,json=minDurationBetweenDecreasesSec" json:"min_duration_between_decreases_sec,omitempty"`
|
||||
// spread_backlog_across_sec is used when we set the throttler rate after
|
||||
// we guessed the rate of a slave and determined its backlog.
|
||||
// For example, at a guessed rate of 100 QPS and a lag of 10s, the replica has
|
||||
// a backlog of 1000 queries.
|
||||
// When we set the new, decreased throttler rate, we factor in how long it
|
||||
// will take the slave to go through the backlog (in addition to new
|
||||
// requests). This field specifies over which timespan we plan to spread this.
|
||||
// For example, for a backlog of 1000 queries spread over 5s means that we
|
||||
// have to further reduce the rate by 200 QPS or the backlog will not be
|
||||
// processed within the 5 seconds.
|
||||
SpreadBacklogAcrossSec int64 `protobuf:"varint,9,opt,name=spread_backlog_across_sec,json=spreadBacklogAcrossSec" json:"spread_backlog_across_sec,omitempty"`
|
||||
// ignore_n_slowest_replicas will ignore replication lag updates from the
|
||||
// N slowest replicas. Under certain circumstances, replicas are still
|
||||
// considered e.g. a) if the lag is at most max_replication_lag_sec, b) there
|
||||
// are less than N+1 replicas or c) the lag increased on each replica such
|
||||
// that all replicas were ignored in a row.
|
||||
IgnoreNSlowestReplicas int32 `protobuf:"varint,8,opt,name=ignore_n_slowest_replicas,json=ignoreNSlowestReplicas" json:"ignore_n_slowest_replicas,omitempty"`
|
||||
IgnoreNSlowestReplicas int32 `protobuf:"varint,10,opt,name=ignore_n_slowest_replicas,json=ignoreNSlowestReplicas" json:"ignore_n_slowest_replicas,omitempty"`
|
||||
// age_bad_rate_after_sec is the duration after which an unchanged bad rate
|
||||
// will "age out" and increase by "bad_rate_increase".
|
||||
// Bad rates are tracked by the code in memory.go and serve as an upper bound
|
||||
|
@ -140,10 +154,10 @@ type Configuration struct {
|
|||
// try known too high (bad) rates over and over again.
|
||||
// To avoid that temporary degradations permanently reduce the maximum rate,
|
||||
// a stable bad rate "ages out" after "age_bad_rate_after_sec".
|
||||
AgeBadRateAfterSec int64 `protobuf:"varint,9,opt,name=age_bad_rate_after_sec,json=ageBadRateAfterSec" json:"age_bad_rate_after_sec,omitempty"`
|
||||
AgeBadRateAfterSec int64 `protobuf:"varint,11,opt,name=age_bad_rate_after_sec,json=ageBadRateAfterSec" json:"age_bad_rate_after_sec,omitempty"`
|
||||
// bad_rate_increase defines the percentage by which a bad rate will be
|
||||
// increased when it's aging out.
|
||||
BadRateIncrease float64 `protobuf:"fixed64,10,opt,name=bad_rate_increase,json=badRateIncrease" json:"bad_rate_increase,omitempty"`
|
||||
BadRateIncrease float64 `protobuf:"fixed64,12,opt,name=bad_rate_increase,json=badRateIncrease" json:"bad_rate_increase,omitempty"`
|
||||
}
|
||||
|
||||
func (m *Configuration) Reset() { *m = Configuration{} }
|
||||
|
@ -257,45 +271,47 @@ func init() {
|
|||
func init() { proto.RegisterFile("throttlerdata.proto", fileDescriptor0) }
|
||||
|
||||
var fileDescriptor0 = []byte{
|
||||
// 629 bytes of a gzipped FileDescriptorProto
|
||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xa4, 0x54, 0x5f, 0x4f, 0x13, 0x4f,
|
||||
0x14, 0xcd, 0x52, 0xca, 0x0f, 0x6e, 0x7f, 0xfc, 0xe9, 0x40, 0xa0, 0x54, 0x62, 0xea, 0x26, 0xc6,
|
||||
0x86, 0xc4, 0x3e, 0x94, 0x98, 0xa0, 0xbc, 0x60, 0x41, 0x8d, 0x46, 0x79, 0x58, 0xa2, 0x0f, 0xbe,
|
||||
0x4c, 0x6e, 0xb7, 0x97, 0x65, 0x63, 0x77, 0x76, 0x9d, 0x19, 0xa4, 0xf5, 0x43, 0xf8, 0x35, 0x7c,
|
||||
0xf6, 0x1b, 0xf9, 0x51, 0xcc, 0xcc, 0x4e, 0xff, 0x6c, 0x5b, 0x89, 0x09, 0x6f, 0x9d, 0x73, 0xcf,
|
||||
0x9c, 0x7b, 0xe6, 0xf6, 0x9e, 0x85, 0x6d, 0x7d, 0x2d, 0x53, 0xad, 0xfb, 0x24, 0x7b, 0xa8, 0xb1,
|
||||
0x95, 0xc9, 0x54, 0xa7, 0x6c, 0xbd, 0x00, 0xfa, 0x55, 0xd8, 0xfc, 0x80, 0x83, 0x00, 0x35, 0xa9,
|
||||
0x80, 0xbe, 0xde, 0x90, 0xd2, 0xfe, 0x0f, 0x0f, 0xb6, 0x26, 0x98, 0xca, 0x52, 0xa1, 0x88, 0x9d,
|
||||
0x42, 0x59, 0x1a, 0xa0, 0xe6, 0x35, 0x4a, 0xcd, 0x4a, 0xfb, 0xb0, 0x55, 0xd4, 0x9e, 0xe5, 0xb7,
|
||||
0xec, 0xe9, 0x95, 0xd0, 0x72, 0x18, 0xe4, 0x17, 0xeb, 0xc7, 0x00, 0x13, 0x90, 0x6d, 0x41, 0xe9,
|
||||
0x0b, 0x0d, 0x6b, 0x5e, 0xc3, 0x6b, 0xae, 0x05, 0xe6, 0x27, 0xdb, 0x81, 0xf2, 0x37, 0xec, 0xdf,
|
||||
0x50, 0x6d, 0xa9, 0xe1, 0x35, 0x4b, 0x41, 0x7e, 0x78, 0xb1, 0x74, 0xec, 0xf9, 0x4f, 0xa0, 0x7a,
|
||||
0x49, 0xda, 0xb5, 0x70, 0x2e, 0x19, 0x83, 0x65, 0xa3, 0x6b, 0x15, 0x4a, 0x81, 0xfd, 0xed, 0x1f,
|
||||
0x02, 0x9b, 0x26, 0x3a, 0xeb, 0x3b, 0x50, 0x16, 0x98, 0x38, 0xeb, 0x6b, 0x41, 0x7e, 0xf0, 0x7f,
|
||||
0x2e, 0xc3, 0xfa, 0x59, 0x2a, 0xae, 0xe2, 0xe8, 0x46, 0xa2, 0x8e, 0x53, 0xc1, 0x4e, 0xa0, 0xae,
|
||||
0x51, 0x46, 0xa4, 0xb9, 0xa4, 0xac, 0x1f, 0x87, 0x16, 0xe5, 0x7d, 0x8c, 0xb8, 0xa2, 0xd0, 0xf5,
|
||||
0xd9, 0xcb, 0x19, 0xc1, 0x84, 0xf0, 0x1e, 0xa3, 0x4b, 0x0a, 0xd9, 0x33, 0xd8, 0x4b, 0x70, 0xb0,
|
||||
0xf0, 0x66, 0xfe, 0x9e, 0x9d, 0x04, 0x07, 0xf3, 0xd7, 0x1e, 0xc1, 0xff, 0xb1, 0x88, 0x75, 0x8c,
|
||||
0x7d, 0x6e, 0x5f, 0x53, 0xb2, 0xdc, 0x8a, 0xc3, 0xcc, 0x33, 0x0c, 0xc5, 0x28, 0xc7, 0x22, 0x94,
|
||||
0x84, 0x8a, 0x6a, 0xcb, 0x0d, 0xaf, 0xe9, 0x05, 0x95, 0x04, 0x07, 0x6f, 0x1d, 0xc4, 0x9e, 0x02,
|
||||
0xa3, 0x84, 0x64, 0x44, 0x22, 0x1c, 0xf2, 0x1e, 0x39, 0x62, 0xd9, 0x12, 0xab, 0xe3, 0xca, 0xb9,
|
||||
0x2b, 0xb0, 0xd7, 0xd0, 0x48, 0x62, 0xc1, 0x7b, 0xee, 0xe1, 0xbc, 0x4b, 0xfa, 0x96, 0x48, 0xf0,
|
||||
0xf0, 0x1a, 0x45, 0x44, 0xca, 0x9a, 0x5e, 0xb1, 0x46, 0x0e, 0x92, 0x58, 0x9c, 0x3b, 0x5a, 0x27,
|
||||
0x67, 0x9d, 0xe5, 0x24, 0x63, 0xfe, 0x1d, 0xf8, 0xc6, 0xd9, 0x9c, 0xce, 0xc8, 0x6a, 0xae, 0xf4,
|
||||
0x9f, 0x55, 0x7a, 0x98, 0xe0, 0x60, 0x46, 0x69, 0x64, 0xdf, 0x6a, 0x3d, 0x87, 0xfd, 0x38, 0x12,
|
||||
0xa9, 0x24, 0x2e, 0xb8, 0xea, 0xa7, 0xb7, 0xa4, 0xc6, 0x7f, 0x83, 0xaa, 0xad, 0x36, 0xbc, 0x66,
|
||||
0x39, 0xd8, 0xcd, 0x09, 0x17, 0x97, 0x79, 0xd9, 0x0d, 0x53, 0xb1, 0x36, 0xec, 0x62, 0x44, 0xbc,
|
||||
0x8b, 0x3d, 0x3b, 0x43, 0x8e, 0x57, 0x9a, 0xa4, 0x6d, 0xbd, 0x66, 0x5b, 0x33, 0x8c, 0xa8, 0x83,
|
||||
0x3d, 0x33, 0xcc, 0x97, 0xa6, 0x64, 0xda, 0x1d, 0x42, 0x75, 0xcc, 0x1f, 0x4f, 0x16, 0xec, 0xc0,
|
||||
0x36, 0xbb, 0x39, 0x77, 0x64, 0xcf, 0x3f, 0x85, 0xbd, 0x37, 0xa4, 0x0b, 0xbb, 0x32, 0x5a, 0xc2,
|
||||
0xc7, 0xb0, 0x31, 0xce, 0x01, 0x37, 0x7b, 0xe5, 0x16, 0x7a, 0x12, 0xb2, 0x0b, 0x4c, 0xc8, 0xff,
|
||||
0xed, 0x41, 0x6d, 0x5e, 0xc2, 0xad, 0x67, 0x08, 0x1b, 0xe1, 0x74, 0x61, 0x14, 0xb1, 0x93, 0x99,
|
||||
0x88, 0xfd, 0x4d, 0xa0, 0x55, 0x40, 0x5d, 0xe6, 0x66, 0x24, 0xeb, 0x1c, 0xb6, 0x17, 0xd0, 0x16,
|
||||
0xa4, 0xb0, 0x3d, 0x9d, 0xc2, 0x4a, 0xfb, 0x60, 0xc6, 0x44, 0xd1, 0xc1, 0x54, 0x46, 0x7f, 0x79,
|
||||
0x50, 0xff, 0x98, 0xf5, 0x50, 0xd3, 0x3d, 0x06, 0xc5, 0x3a, 0xb0, 0x5e, 0x30, 0xfe, 0x4f, 0x2e,
|
||||
0x8a, 0x57, 0x58, 0x13, 0xb6, 0xc2, 0x34, 0x1b, 0xf2, 0xef, 0x24, 0x53, 0x6e, 0x0d, 0x2a, 0x1b,
|
||||
0xab, 0x55, 0x33, 0x94, 0x6c, 0xf8, 0x99, 0x64, 0xfa, 0xc9, 0xa2, 0xfe, 0x11, 0x3c, 0x58, 0x68,
|
||||
0xf9, 0xce, 0xef, 0x46, 0x07, 0xf6, 0x03, 0x52, 0xf7, 0xdb, 0x87, 0x36, 0xd4, 0x17, 0x69, 0xdc,
|
||||
0xd5, 0xb7, 0xbb, 0x62, 0x3f, 0xdf, 0x47, 0x7f, 0x02, 0x00, 0x00, 0xff, 0xff, 0xcc, 0x34, 0x98,
|
||||
0x5c, 0xd5, 0x05, 0x00, 0x00,
|
||||
// 658 bytes of a gzipped FileDescriptorProto
|
||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xa4, 0x55, 0x5d, 0x4f, 0x13, 0x4d,
|
||||
0x14, 0xce, 0x52, 0xca, 0x0b, 0xa7, 0x7c, 0x75, 0x20, 0x50, 0xfa, 0x1a, 0x53, 0x37, 0x31, 0x36,
|
||||
0x24, 0xf6, 0xa2, 0xc4, 0x04, 0xe5, 0x06, 0x2a, 0xc6, 0x68, 0x94, 0x8b, 0x25, 0x7a, 0xe1, 0xcd,
|
||||
0xe4, 0x74, 0xf7, 0xb0, 0x6e, 0xd8, 0x2f, 0x67, 0x06, 0x69, 0xfd, 0x11, 0xfe, 0x17, 0xfd, 0x45,
|
||||
0xfe, 0x14, 0xb3, 0x33, 0xd3, 0x8f, 0x2d, 0x05, 0x4c, 0xb8, 0xdb, 0x39, 0xe7, 0x99, 0xe7, 0x3c,
|
||||
0x67, 0xce, 0x3c, 0xb3, 0xb0, 0xa5, 0xbe, 0x8a, 0x4c, 0xa9, 0x98, 0x44, 0x80, 0x0a, 0x3b, 0xb9,
|
||||
0xc8, 0x54, 0xc6, 0xd6, 0x4a, 0x41, 0xb7, 0x0e, 0x1b, 0x1f, 0x71, 0xe0, 0xa1, 0x22, 0xe9, 0xd1,
|
||||
0xb7, 0x2b, 0x92, 0xca, 0xfd, 0xe9, 0xc0, 0xe6, 0x24, 0x26, 0xf3, 0x2c, 0x95, 0xc4, 0x8e, 0xa1,
|
||||
0x2a, 0x8a, 0x40, 0xc3, 0x69, 0x55, 0xda, 0xb5, 0xee, 0x7e, 0xa7, 0xcc, 0x3d, 0x8b, 0xef, 0xe8,
|
||||
0xd5, 0x9b, 0x54, 0x89, 0xa1, 0x67, 0x36, 0x36, 0x0f, 0x01, 0x26, 0x41, 0xb6, 0x09, 0x95, 0x4b,
|
||||
0x1a, 0x36, 0x9c, 0x96, 0xd3, 0x5e, 0xf1, 0x8a, 0x4f, 0xb6, 0x0d, 0xd5, 0xef, 0x18, 0x5f, 0x51,
|
||||
0x63, 0xa1, 0xe5, 0xb4, 0x2b, 0x9e, 0x59, 0xbc, 0x5a, 0x38, 0x74, 0xdc, 0x67, 0x50, 0x3f, 0x27,
|
||||
0x65, 0x4b, 0x58, 0x95, 0x8c, 0xc1, 0x62, 0xc1, 0xab, 0x19, 0x2a, 0x9e, 0xfe, 0x76, 0xf7, 0x81,
|
||||
0x4d, 0x03, 0xad, 0xf4, 0x6d, 0xa8, 0xa6, 0x98, 0x58, 0xe9, 0x2b, 0x9e, 0x59, 0xb8, 0xbf, 0xab,
|
||||
0xb0, 0xf6, 0x3a, 0x4b, 0x2f, 0xa2, 0xf0, 0x4a, 0xa0, 0x8a, 0xb2, 0x94, 0x1d, 0x41, 0x53, 0xa1,
|
||||
0x08, 0x49, 0x71, 0x41, 0x79, 0x1c, 0xf9, 0x3a, 0xca, 0x63, 0x0c, 0xb9, 0x24, 0xdf, 0xd6, 0xd9,
|
||||
0x35, 0x08, 0x6f, 0x02, 0xf8, 0x80, 0xe1, 0x39, 0xf9, 0xec, 0x05, 0xec, 0x26, 0x38, 0x98, 0xbb,
|
||||
0xd3, 0xf4, 0xb3, 0x9d, 0xe0, 0xe0, 0xe6, 0xb6, 0x27, 0xb0, 0x1a, 0xa5, 0x91, 0x8a, 0x30, 0xe6,
|
||||
0xba, 0x9b, 0x8a, 0xc6, 0xd6, 0x6c, 0xac, 0x68, 0xa3, 0x80, 0x14, 0xcc, 0x51, 0xea, 0x0b, 0x42,
|
||||
0x49, 0x8d, 0xc5, 0x96, 0xd3, 0x76, 0xbc, 0x5a, 0x82, 0x83, 0x77, 0x36, 0xc4, 0x9e, 0x03, 0xa3,
|
||||
0x84, 0x44, 0x48, 0xa9, 0x3f, 0xe4, 0x01, 0x59, 0x60, 0x55, 0x03, 0xeb, 0xe3, 0xcc, 0xa9, 0x4d,
|
||||
0xb0, 0xf7, 0xe0, 0x26, 0x51, 0xca, 0x03, 0xdb, 0x38, 0xef, 0x93, 0xba, 0x26, 0x4a, 0xc7, 0x25,
|
||||
0xa4, 0x96, 0xbd, 0xa4, 0xa5, 0x3c, 0x4e, 0xa2, 0xf4, 0xd4, 0x02, 0x7b, 0x06, 0x37, 0x2a, 0x2b,
|
||||
0x8b, 0x06, 0x0a, 0x2e, 0x1c, 0xdc, 0xc7, 0xf5, 0x9f, 0xe5, 0xc2, 0xc1, 0x7d, 0x5c, 0xf3, 0x74,
|
||||
0x8d, 0x3a, 0x32, 0x5c, 0xcb, 0xb7, 0xe9, 0x1a, 0xf5, 0xa7, 0xb9, 0x5e, 0xc2, 0x9e, 0xcc, 0x05,
|
||||
0x61, 0xc0, 0xfb, 0xe8, 0x5f, 0xc6, 0x59, 0xc8, 0xd1, 0x17, 0x99, 0x34, 0x14, 0x2b, 0x9a, 0x62,
|
||||
0xc7, 0x00, 0x7a, 0x26, 0x7f, 0xa2, 0xd3, 0x76, 0x6b, 0x14, 0xa6, 0x99, 0x20, 0x9e, 0x72, 0x19,
|
||||
0x67, 0xd7, 0x24, 0xc7, 0x37, 0x42, 0x36, 0xa0, 0xe5, 0xb4, 0xab, 0xde, 0x8e, 0x01, 0x9c, 0x9d,
|
||||
0x9b, 0xb4, 0x9d, 0xab, 0x64, 0x5d, 0xd8, 0xc1, 0x90, 0x78, 0x1f, 0x03, 0x3d, 0x4e, 0x8e, 0x17,
|
||||
0x8a, 0x84, 0x2e, 0x59, 0xd3, 0x25, 0x19, 0x86, 0xd4, 0xc3, 0xa0, 0x98, 0xeb, 0x49, 0x91, 0x2a,
|
||||
0xca, 0xed, 0x43, 0x7d, 0x8c, 0x1f, 0x0f, 0x79, 0x55, 0xcf, 0x6e, 0xa3, 0x6f, 0xb0, 0xa3, 0x53,
|
||||
0x72, 0x8f, 0x61, 0xf7, 0x2d, 0xa9, 0xd2, 0xb5, 0x1d, 0xf9, 0xe1, 0x29, 0xac, 0x8f, 0x2d, 0xc9,
|
||||
0x8b, 0x2b, 0x6e, 0xbd, 0x35, 0xf1, 0xfb, 0x19, 0x26, 0xe4, 0xfe, 0x71, 0xa0, 0x71, 0x93, 0xc2,
|
||||
0x3a, 0xc5, 0x87, 0x75, 0x7f, 0x3a, 0x31, 0x72, 0xfb, 0xd1, 0x8c, 0xdb, 0x6f, 0x23, 0xe8, 0x94,
|
||||
0xa2, 0xd6, 0xfe, 0x33, 0x94, 0x4d, 0x0e, 0x5b, 0x73, 0x60, 0x73, 0x1e, 0x84, 0xee, 0xf4, 0x83,
|
||||
0x50, 0xeb, 0x3e, 0x9a, 0x11, 0x51, 0x56, 0x30, 0xf5, 0x5c, 0xfc, 0x72, 0xa0, 0xf9, 0x29, 0x0f,
|
||||
0x50, 0xd1, 0x03, 0x0e, 0x8a, 0xf5, 0x60, 0xad, 0x24, 0xfc, 0x9f, 0x54, 0x94, 0xb7, 0xb0, 0x36,
|
||||
0x6c, 0xfa, 0x59, 0x3e, 0xe4, 0x3f, 0x48, 0x64, 0x5c, 0x0b, 0x94, 0xda, 0xe1, 0xcb, 0xc5, 0xa1,
|
||||
0xe4, 0xc3, 0x2f, 0x24, 0xb2, 0xcf, 0x3a, 0xea, 0x1e, 0xc0, 0xff, 0x73, 0x25, 0xdf, 0xf9, 0x84,
|
||||
0xf5, 0x60, 0xcf, 0x23, 0xf9, 0xb0, 0xfb, 0xd0, 0x85, 0xe6, 0x3c, 0x8e, 0xbb, 0xea, 0xf6, 0x97,
|
||||
0xf4, 0x9f, 0xe4, 0xe0, 0x6f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x5a, 0x18, 0x19, 0x7c, 0x60, 0x06,
|
||||
0x00, 0x00,
|
||||
}
|
||||
|
|
|
@ -374,7 +374,7 @@ func (m *MaxReplicationLagModule) increaseRate(r *result, now time.Time, lagReco
|
|||
}
|
||||
|
||||
func (m *MaxReplicationLagModule) updateNextAllowedIncrease(now time.Time, increase float64, key string) {
|
||||
minDuration := m.config.MinDurationBetweenChanges()
|
||||
minDuration := m.config.MinDurationBetweenIncreases()
|
||||
// We may have to wait longer than the configured minimum duration
|
||||
// until we see an effect of the increase.
|
||||
// Example: If the increase was fully over the capacity, it will take
|
||||
|
@ -471,7 +471,7 @@ func (m *MaxReplicationLagModule) decreaseAndGuessRate(r *result, now time.Time,
|
|||
// Guess the slave capacity based on the replication lag change.
|
||||
rate, reason := m.guessSlaveRate(r, avgMasterRate, lagBefore, lagNow, lagDifference, d)
|
||||
|
||||
m.nextAllowedDecrease = now.Add(m.config.MinDurationBetweenChanges() + 2*time.Second)
|
||||
m.nextAllowedDecrease = now.Add(m.config.MinDurationBetweenDecreases())
|
||||
m.updateRate(r, stateDecreaseAndGuessRate, rate, reason, now, lagRecordNow)
|
||||
}
|
||||
|
||||
|
@ -508,16 +508,16 @@ func (m *MaxReplicationLagModule) guessSlaveRate(r *result, avgMasterRate float6
|
|||
newRate := avgSlaveRate
|
||||
// Reduce the new rate such that it has time to catch up the requests it's
|
||||
// behind within the next interval.
|
||||
futureRequests := newRate * m.config.MinDurationBetweenChanges().Seconds()
|
||||
futureRequests := newRate * m.config.SpreadBacklogAcross().Seconds()
|
||||
newRate *= (futureRequests - requestsBehind) / futureRequests
|
||||
var reason string
|
||||
if newRate < 1 {
|
||||
// Backlog is too high. Reduce rate to 1 request/second.
|
||||
// TODO(mberlin): Make this a constant.
|
||||
newRate = 1
|
||||
reason = fmt.Sprintf("based on the guessed slave rate of: %v the slave won't be able to process the guessed backlog of %d requests within the next %.f seconds", int64(avgSlaveRate), int64(requestsBehind), m.config.MinDurationBetweenChanges().Seconds())
|
||||
reason = fmt.Sprintf("based on the guessed slave rate of: %v the slave won't be able to process the guessed backlog of %d requests within the next %.f seconds", int64(avgSlaveRate), int64(requestsBehind), m.config.SpreadBacklogAcross().Seconds())
|
||||
} else {
|
||||
reason = fmt.Sprintf("new rate is %d lower than the guessed slave rate to account for a guessed backlog of %d requests over %.f seconds", int64(avgSlaveRate-newRate), int64(requestsBehind), m.config.MinDurationBetweenChanges().Seconds())
|
||||
reason = fmt.Sprintf("new rate is %d lower than the guessed slave rate to account for a guessed backlog of %d requests over %.f seconds", int64(avgSlaveRate-newRate), int64(requestsBehind), m.config.SpreadBacklogAcross().Seconds())
|
||||
}
|
||||
|
||||
return int64(newRate), reason
|
||||
|
|
|
@ -14,6 +14,10 @@ type MaxReplicationLagModuleConfig struct {
|
|||
throttlerdata.Configuration
|
||||
}
|
||||
|
||||
// Most of the values are based on the assumption that vttablet is started
|
||||
// with the flag --health_check_interval=20s.
|
||||
const healthCheckInterval = 20
|
||||
|
||||
var defaultMaxReplicationLagModuleConfig = MaxReplicationLagModuleConfig{
|
||||
throttlerdata.Configuration{
|
||||
TargetReplicationLagSec: 2,
|
||||
|
@ -24,11 +28,15 @@ var defaultMaxReplicationLagModuleConfig = MaxReplicationLagModuleConfig{
|
|||
MaxIncrease: 1,
|
||||
EmergencyDecrease: 0.5,
|
||||
|
||||
MinDurationBetweenChangesSec: 10,
|
||||
// Wait for two health broadcast rounds. Otherwise, the "decrease" mode
|
||||
// has less than 2 lag records available to calculate the actual slave rate.
|
||||
MinDurationBetweenIncreasesSec: 2 * healthCheckInterval,
|
||||
// MaxDurationBetweenIncreasesSec defaults to 60+2 seconds because this
|
||||
// corresponds to three 3 broadcasts (assuming --health_check_interval=20s).
|
||||
// corresponds to three 3 broadcasts.
|
||||
// The 2 extra seconds give us headroom to account for delay in the process.
|
||||
MaxDurationBetweenIncreasesSec: 60 + 2,
|
||||
MaxDurationBetweenIncreasesSec: 3*healthCheckInterval + 2,
|
||||
MinDurationBetweenDecreasesSec: healthCheckInterval,
|
||||
SpreadBacklogAcrossSec: healthCheckInterval,
|
||||
|
||||
AgeBadRateAfterSec: 3 * 60,
|
||||
BadRateIncrease: 0.10,
|
||||
|
@ -67,12 +75,18 @@ func (c MaxReplicationLagModuleConfig) Verify() error {
|
|||
if c.EmergencyDecrease <= 0 {
|
||||
return fmt.Errorf("emergency_decrease must be > 0")
|
||||
}
|
||||
if c.MinDurationBetweenChangesSec < 1 {
|
||||
return fmt.Errorf("min_duration_between_changes_sec must be >= 1")
|
||||
if c.MinDurationBetweenIncreasesSec < 1 {
|
||||
return fmt.Errorf("min_duration_between_increases_sec must be >= 1")
|
||||
}
|
||||
if c.MaxDurationBetweenIncreasesSec < 1 {
|
||||
return fmt.Errorf("max_duration_between_increases_sec must be >= 1")
|
||||
}
|
||||
if c.MinDurationBetweenDecreasesSec < 1 {
|
||||
return fmt.Errorf("min_duration_between_decreases_sec must be >= 1")
|
||||
}
|
||||
if c.SpreadBacklogAcrossSec < 1 {
|
||||
return fmt.Errorf("spread_backlog_across_sec must be >= 1")
|
||||
}
|
||||
if c.IgnoreNSlowestReplicas < 0 {
|
||||
return fmt.Errorf("ignore_n_slowest_replicas must be >= 0")
|
||||
}
|
||||
|
@ -82,10 +96,10 @@ func (c MaxReplicationLagModuleConfig) Verify() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// MinDurationBetweenChanges is a helper function which returns the respective
|
||||
// MinDurationBetweenIncreases is a helper function which returns the respective
|
||||
// protobuf field as native Go type.
|
||||
func (c MaxReplicationLagModuleConfig) MinDurationBetweenChanges() time.Duration {
|
||||
return time.Duration(c.MinDurationBetweenChangesSec) * time.Second
|
||||
func (c MaxReplicationLagModuleConfig) MinDurationBetweenIncreases() time.Duration {
|
||||
return time.Duration(c.MinDurationBetweenIncreasesSec) * time.Second
|
||||
}
|
||||
|
||||
// MaxDurationBetweenIncreases is a helper function which returns the respective
|
||||
|
@ -94,6 +108,18 @@ func (c MaxReplicationLagModuleConfig) MaxDurationBetweenIncreases() time.Durati
|
|||
return time.Duration(c.MaxDurationBetweenIncreasesSec) * time.Second
|
||||
}
|
||||
|
||||
// MinDurationBetweenDecreases is a helper function which returns the respective
|
||||
// protobuf field as native Go type.
|
||||
func (c MaxReplicationLagModuleConfig) MinDurationBetweenDecreases() time.Duration {
|
||||
return time.Duration(c.MinDurationBetweenDecreasesSec) * time.Second
|
||||
}
|
||||
|
||||
// SpreadBacklogAcross is a helper function which returns the respective
|
||||
// protobuf field as native Go type.
|
||||
func (c MaxReplicationLagModuleConfig) SpreadBacklogAcross() time.Duration {
|
||||
return time.Duration(c.SpreadBacklogAcrossSec) * time.Second
|
||||
}
|
||||
|
||||
// AgeBadRateAfter is a helper function which returns the respective
|
||||
// protobuf field as native Go type.
|
||||
func (c MaxReplicationLagModuleConfig) AgeBadRateAfter() time.Duration {
|
||||
|
|
|
@ -131,12 +131,13 @@ func TestMaxReplicationLagModule_Increase(t *testing.T) {
|
|||
tf.ratesHistory.add(sinceZero(69*time.Second), 100)
|
||||
tf.process(lagRecord(sinceZero(70*time.Second), r2, 0))
|
||||
// Rate was increased to 200 based on actual rate of 100 within [0s, 69s].
|
||||
// r2 becomes the "replica under test".
|
||||
if err := tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// We have to wait at least config.MinDurationBetweenChangesSec (10s) before
|
||||
// We have to wait at least config.MinDurationBetweenIncreasesSec (40s) before
|
||||
// the next increase.
|
||||
if got, want := tf.m.nextAllowedIncrease, sinceZero(70*time.Second).Add(tf.m.config.MinDurationBetweenChanges()); got != want {
|
||||
if got, want := tf.m.nextAllowedIncrease, sinceZero(70*time.Second).Add(tf.m.config.MinDurationBetweenIncreases()); got != want {
|
||||
t.Fatalf("got = %v, want = %v", got, want)
|
||||
}
|
||||
// r2 @ 75s, 0s lag
|
||||
|
@ -157,11 +158,29 @@ func TestMaxReplicationLagModule_Increase(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// No increase is possible for the next 20 seconds.
|
||||
|
||||
// r2 @ 90s, 0s lag
|
||||
tf.ratesHistory.add(sinceZero(80*time.Second), 200)
|
||||
tf.ratesHistory.add(sinceZero(89*time.Second), 200)
|
||||
tf.process(lagRecord(sinceZero(90*time.Second), r2, 0))
|
||||
if err := tf.checkState(stateIncreaseRate, 400, sinceZero(90*time.Second)); err != nil {
|
||||
if err := tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// r1 @ 100s, 0s lag
|
||||
tf.ratesHistory.add(sinceZero(99*time.Second), 200)
|
||||
tf.process(lagRecord(sinceZero(100*time.Second), r1, 0))
|
||||
if err := tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Next rate increase is possible after testing the rate for 40s.
|
||||
|
||||
// r2 @ 110s, 0s lag
|
||||
tf.ratesHistory.add(sinceZero(109*time.Second), 200)
|
||||
tf.process(lagRecord(sinceZero(110*time.Second), r2, 0))
|
||||
if err := tf.checkState(stateIncreaseRate, 400, sinceZero(110*time.Second)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
@ -180,6 +199,7 @@ func TestMaxReplicationLagModule_Increase_LastErrorOrNotUp(t *testing.T) {
|
|||
tf.ratesHistory.add(sinceZero(69*time.Second), 100)
|
||||
tf.process(lagRecord(sinceZero(70*time.Second), r2, 0))
|
||||
// Rate was increased to 200 based on actual rate of 100 within [0s, 69s].
|
||||
// r2 becomes the "replica under test".
|
||||
if err := tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -189,31 +209,32 @@ func TestMaxReplicationLagModule_Increase_LastErrorOrNotUp(t *testing.T) {
|
|||
rError.LastError = errors.New("HealthCheck reporting broken")
|
||||
tf.m.lagCache.add(rError)
|
||||
|
||||
// r1 @ 80s, 0s lag
|
||||
// r1 @ 110s, 0s lag
|
||||
tf.ratesHistory.add(sinceZero(70*time.Second), 100)
|
||||
tf.ratesHistory.add(sinceZero(79*time.Second), 200)
|
||||
tf.process(lagRecord(sinceZero(80*time.Second), r1, 0))
|
||||
// The r1 lag update triggered an increase and did not wait for r2
|
||||
// because r2 has LastError set.
|
||||
if err := tf.checkState(stateIncreaseRate, 400, sinceZero(80*time.Second)); err != nil {
|
||||
tf.ratesHistory.add(sinceZero(109*time.Second), 200)
|
||||
tf.process(lagRecord(sinceZero(110*time.Second), r1, 0))
|
||||
// We ignore r2 as "replica under test" because it has LastError set.
|
||||
// Instead, we act on r1.
|
||||
// r1 becomes the "replica under test".
|
||||
if err := tf.checkState(stateIncreaseRate, 400, sinceZero(110*time.Second)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Now the increase triggered by r1 is under test and we have to wait for it.
|
||||
// However, we'll simulate a shutdown of r1 i.e. we're no longer tracking it.
|
||||
// r1 @ 85s, 0s lag, !Up
|
||||
tf.ratesHistory.add(sinceZero(80*time.Second), 200)
|
||||
tf.ratesHistory.add(sinceZero(84*time.Second), 400)
|
||||
rNotUp := lagRecord(sinceZero(85*time.Second), r1, 0)
|
||||
// We'll simulate a shutdown of r1 i.e. we're no longer tracking it.
|
||||
// r1 @ 115s, 0s lag, !Up
|
||||
tf.ratesHistory.add(sinceZero(110*time.Second), 200)
|
||||
tf.ratesHistory.add(sinceZero(114*time.Second), 400)
|
||||
rNotUp := lagRecord(sinceZero(115*time.Second), r1, 0)
|
||||
rNotUp.Up = false
|
||||
tf.m.lagCache.add(rNotUp)
|
||||
|
||||
// r2 @ 90s, 0s lag (lastError no longer set)
|
||||
tf.ratesHistory.add(sinceZero(89*time.Second), 400)
|
||||
tf.process(lagRecord(sinceZero(90*time.Second), r2, 0))
|
||||
// The r1 lag update triggered an increase and did not wait for r2
|
||||
// because r2 has !Up set.
|
||||
if err := tf.checkState(stateIncreaseRate, 800, sinceZero(90*time.Second)); err != nil {
|
||||
// r2 @ 150s, 0s lag (lastError no longer set)
|
||||
tf.ratesHistory.add(sinceZero(149*time.Second), 400)
|
||||
tf.process(lagRecord(sinceZero(150*time.Second), r2, 0))
|
||||
// We ignore r1 as "replica under test" because it has !Up set.
|
||||
// Instead, we act on r2.
|
||||
// r2 becomes the "replica under test".
|
||||
if err := tf.checkState(stateIncreaseRate, 800, sinceZero(150*time.Second)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
@ -231,6 +252,7 @@ func TestMaxReplicationLagModule_Reset_ReplicaUnderIncreaseTest(t *testing.T) {
|
|||
tf.ratesHistory.add(sinceZero(69*time.Second), 100)
|
||||
tf.process(lagRecord(sinceZero(70*time.Second), r2, 0))
|
||||
// Rate was increased to 200 based on actual rate of 100 within [0s, 69s].
|
||||
// r2 becomes the "replica under test".
|
||||
if err := tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -246,12 +268,12 @@ func TestMaxReplicationLagModule_Reset_ReplicaUnderIncreaseTest(t *testing.T) {
|
|||
}
|
||||
|
||||
// Now everything goes back to normal and the minimum time between increases
|
||||
// (10s) has passed as well. r2 or r1 can start an increase now.
|
||||
// r1 @ 80s, 0s lag (triggers the increase state)
|
||||
// (40s) has passed as well. r2 or r1 can start an increase now.
|
||||
// r1 @ 110s, 0s lag (triggers the increase state)
|
||||
tf.ratesHistory.add(sinceZero(75*time.Second), 200)
|
||||
tf.ratesHistory.add(sinceZero(79*time.Second), 100)
|
||||
tf.process(lagRecord(sinceZero(79*time.Second), r1, 0))
|
||||
if err := tf.checkState(stateIncreaseRate, 200, sinceZero(79*time.Second)); err != nil {
|
||||
tf.ratesHistory.add(sinceZero(109*time.Second), 100)
|
||||
tf.process(lagRecord(sinceZero(110*time.Second), r1, 0))
|
||||
if err := tf.checkState(stateIncreaseRate, 200, sinceZero(110*time.Second)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
@ -343,10 +365,10 @@ func TestMaxReplicationLagModule_Decrease(t *testing.T) {
|
|||
// 17s / 20s * 200 QPS actual rate => 170 QPS replica rate
|
||||
//
|
||||
// This results in a backlog of 3s * 200 QPS = 600 queries.
|
||||
// Since this backlog is spread across MinDurationBetweenChangesSec (10s),
|
||||
// the guessed rate gets further reduced by 60 QPS (600 queries / 10s).
|
||||
// Hence, the rate is set to 110 QPS (170 - 60).
|
||||
if err := tf.checkState(stateDecreaseAndGuessRate, 110, sinceZero(90*time.Second)); err != nil {
|
||||
// Since this backlog is spread across SpreadBacklogAcrossSec (20s),
|
||||
// the guessed rate gets further reduced by 30 QPS (600 queries / 20s).
|
||||
// Hence, the rate is set to 140 QPS (170 - 30).
|
||||
if err := tf.checkState(stateDecreaseAndGuessRate, 140, sinceZero(90*time.Second)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
@ -389,10 +411,10 @@ func TestMaxReplicationLagModule_Decrease_NoReplicaHistory(t *testing.T) {
|
|||
// 14s / 20s * 200 QPS actual rate => 140 QPS replica rate
|
||||
//
|
||||
// This results in a backlog of 6s * 200 QPS = 1200 queries.
|
||||
// Since this backlog is spread across MinDurationBetweenChangesSec (10s),
|
||||
// the guessed rate gets further reduced by 120 QPS (1200 queries / 10s).
|
||||
// Hence, the rate is set to 20 QPS.
|
||||
if err := tf.checkState(stateDecreaseAndGuessRate, 20, sinceZero(90*time.Second)); err != nil {
|
||||
// Since this backlog is spread across SpreadBacklogAcrossSec (20s),
|
||||
// the guessed rate gets further reduced by 60 QPS (1200 queries / 20s).
|
||||
// Hence, the rate is set to 80 QPS (140 - 60).
|
||||
if err := tf.checkState(stateDecreaseAndGuessRate, 80, sinceZero(90*time.Second)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
@ -487,12 +509,12 @@ func TestMaxReplicationLagModule_IgnoreNSlowestReplicas_IsIgnoredDuringIncrease(
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// r1 @ 100s, 0s lag
|
||||
tf.ratesHistory.add(sinceZero(99*time.Second), 200)
|
||||
tf.process(lagRecord(sinceZero(100*time.Second), r1, 0))
|
||||
// r1 @ 110s, 0s lag
|
||||
tf.ratesHistory.add(sinceZero(109*time.Second), 200)
|
||||
tf.process(lagRecord(sinceZero(110*time.Second), r1, 0))
|
||||
// Meanwhile, r1 is doing fine and will trigger the next increase because
|
||||
// we're no longer waiting for the ignored r2.
|
||||
if err := tf.checkState(stateIncreaseRate, 400, sinceZero(100*time.Second)); err != nil {
|
||||
if err := tf.checkState(stateIncreaseRate, 400, sinceZero(110*time.Second)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
@ -559,11 +581,11 @@ func TestApplyLatestConfig(t *testing.T) {
|
|||
// Let's assume that the current rate of 300 was actually fine.
|
||||
// After the reset, we'll increase it only by 100% to 600.
|
||||
|
||||
// r2 @ 90s, 0s lag
|
||||
// r2 @ 110s, 0s lag
|
||||
tf.ratesHistory.add(sinceZero(80*time.Second), 300)
|
||||
tf.ratesHistory.add(sinceZero(89*time.Second), 300)
|
||||
tf.process(lagRecord(sinceZero(90*time.Second), r2, 0))
|
||||
if err := tf.checkState(stateIncreaseRate, 600, sinceZero(90*time.Second)); err != nil {
|
||||
tf.ratesHistory.add(sinceZero(109*time.Second), 300)
|
||||
tf.process(lagRecord(sinceZero(110*time.Second), r2, 0))
|
||||
if err := tf.checkState(stateIncreaseRate, 600, sinceZero(110*time.Second)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -122,11 +122,13 @@ func (tf *testFixture) configuration(t *testing.T, client throttlerclient.Client
|
|||
InitialRate: 3,
|
||||
MaxIncrease: 0.4,
|
||||
EmergencyDecrease: 0.5,
|
||||
MinDurationBetweenChangesSec: 6,
|
||||
MinDurationBetweenIncreasesSec: 6,
|
||||
MaxDurationBetweenIncreasesSec: 7,
|
||||
IgnoreNSlowestReplicas: 8,
|
||||
AgeBadRateAfterSec: 9,
|
||||
BadRateIncrease: 0.10,
|
||||
MinDurationBetweenDecreasesSec: 8,
|
||||
SpreadBacklogAcrossSec: 9,
|
||||
IgnoreNSlowestReplicas: 10,
|
||||
AgeBadRateAfterSec: 11,
|
||||
BadRateIncrease: 0.12,
|
||||
}
|
||||
names, err := client.UpdateConfiguration(context.Background(), "t2", config /* false */, true /* copyZeroValues */)
|
||||
if err != nil {
|
||||
|
|
|
@ -40,8 +40,8 @@ message Configuration {
|
|||
// max_replication_lag_sec is meant as a last resort.
|
||||
// By default, the module tries to find out the system maximum capacity while
|
||||
// trying to keep the replication lag around "target_replication_lag_sec".
|
||||
// Usually, we'll wait min_duration_between_changes_sec to see the effect of a
|
||||
// throttler rate change on the replication lag.
|
||||
// Usually, we'll wait min_duration_between_(increases|decreases)_sec to see
|
||||
// the effect of a throttler rate change on the replication lag.
|
||||
// But if the lag goes above this field's value we will go into an "emergency"
|
||||
// state and throttle more aggressively (see "emergency_decrease" below).
|
||||
// This is the only way to ensure that the system will recover.
|
||||
|
@ -65,20 +65,36 @@ message Configuration {
|
|||
// E.g. 0.50 decreases the current rate by 50%.
|
||||
double emergency_decrease = 5;
|
||||
|
||||
// min_duration_between_changes_sec specifies how long we'll wait for the last
|
||||
// rate increase or decrease to have an effect on the system.
|
||||
int64 min_duration_between_changes_sec = 6;
|
||||
// min_duration_between_increases_sec specifies how long we'll wait at least
|
||||
// for the last rate increase to have an effect on the system.
|
||||
int64 min_duration_between_increases_sec = 6;
|
||||
|
||||
// max_duration_between_increases_sec specifies how long we'll wait at most
|
||||
// for the last rate increase to have an effect on the system.
|
||||
int64 max_duration_between_increases_sec = 7;
|
||||
|
||||
// min_duration_between_decreases_sec specifies how long we'll wait at least
|
||||
// for the last rate decrease to have an effect on the system.
|
||||
int64 min_duration_between_decreases_sec = 8;
|
||||
|
||||
// spread_backlog_across_sec is used when we set the throttler rate after
|
||||
// we guessed the rate of a slave and determined its backlog.
|
||||
// For example, at a guessed rate of 100 QPS and a lag of 10s, the replica has
|
||||
// a backlog of 1000 queries.
|
||||
// When we set the new, decreased throttler rate, we factor in how long it
|
||||
// will take the slave to go through the backlog (in addition to new
|
||||
// requests). This field specifies over which timespan we plan to spread this.
|
||||
// For example, for a backlog of 1000 queries spread over 5s means that we
|
||||
// have to further reduce the rate by 200 QPS or the backlog will not be
|
||||
// processed within the 5 seconds.
|
||||
int64 spread_backlog_across_sec = 9;
|
||||
|
||||
// ignore_n_slowest_replicas will ignore replication lag updates from the
|
||||
// N slowest replicas. Under certain circumstances, replicas are still
|
||||
// considered e.g. a) if the lag is at most max_replication_lag_sec, b) there
|
||||
// are less than N+1 replicas or c) the lag increased on each replica such
|
||||
// that all replicas were ignored in a row.
|
||||
int32 ignore_n_slowest_replicas = 8;
|
||||
int32 ignore_n_slowest_replicas = 10;
|
||||
|
||||
// age_bad_rate_after_sec is the duration after which an unchanged bad rate
|
||||
// will "age out" and increase by "bad_rate_increase".
|
||||
|
@ -87,11 +103,11 @@ message Configuration {
|
|||
// try known too high (bad) rates over and over again.
|
||||
// To avoid that temporary degradations permanently reduce the maximum rate,
|
||||
// a stable bad rate "ages out" after "age_bad_rate_after_sec".
|
||||
int64 age_bad_rate_after_sec = 9;
|
||||
int64 age_bad_rate_after_sec = 11;
|
||||
|
||||
// bad_rate_increase defines the percentage by which a bad rate will be
|
||||
// increased when it's aging out.
|
||||
double bad_rate_increase = 10;
|
||||
double bad_rate_increase = 12;
|
||||
}
|
||||
|
||||
// GetConfigurationRequest is the payload for the GetConfiguration RPC.
|
||||
|
|
|
@ -19,7 +19,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
|
|||
name='throttlerdata.proto',
|
||||
package='throttlerdata',
|
||||
syntax='proto3',
|
||||
serialized_pb=_b('\n\x13throttlerdata.proto\x12\rthrottlerdata\"\x11\n\x0fMaxRatesRequest\"{\n\x10MaxRatesResponse\x12\x39\n\x05rates\x18\x01 \x03(\x0b\x32*.throttlerdata.MaxRatesResponse.RatesEntry\x1a,\n\nRatesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x03:\x02\x38\x01\"!\n\x11SetMaxRateRequest\x12\x0c\n\x04rate\x18\x01 \x01(\x03\"#\n\x12SetMaxRateResponse\x12\r\n\x05names\x18\x01 \x03(\t\"\xd0\x02\n\rConfiguration\x12\"\n\x1atarget_replication_lag_sec\x18\x01 \x01(\x03\x12\x1f\n\x17max_replication_lag_sec\x18\x02 \x01(\x03\x12\x14\n\x0cinitial_rate\x18\x03 \x01(\x03\x12\x14\n\x0cmax_increase\x18\x04 \x01(\x01\x12\x1a\n\x12\x65mergency_decrease\x18\x05 \x01(\x01\x12(\n min_duration_between_changes_sec\x18\x06 \x01(\x03\x12*\n\"max_duration_between_increases_sec\x18\x07 \x01(\x03\x12!\n\x19ignore_n_slowest_replicas\x18\x08 \x01(\x05\x12\x1e\n\x16\x61ge_bad_rate_after_sec\x18\t \x01(\x03\x12\x19\n\x11\x62\x61\x64_rate_increase\x18\n \x01(\x01\"1\n\x17GetConfigurationRequest\x12\x16\n\x0ethrottler_name\x18\x01 \x01(\t\"\xc4\x01\n\x18GetConfigurationResponse\x12S\n\x0e\x63onfigurations\x18\x01 \x03(\x0b\x32;.throttlerdata.GetConfigurationResponse.ConfigurationsEntry\x1aS\n\x13\x43onfigurationsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12+\n\x05value\x18\x02 \x01(\x0b\x32\x1c.throttlerdata.Configuration:\x02\x38\x01\"\x83\x01\n\x1aUpdateConfigurationRequest\x12\x16\n\x0ethrottler_name\x18\x01 \x01(\t\x12\x33\n\rconfiguration\x18\x02 \x01(\x0b\x32\x1c.throttlerdata.Configuration\x12\x18\n\x10\x63opy_zero_values\x18\x03 \x01(\x08\",\n\x1bUpdateConfigurationResponse\x12\r\n\x05names\x18\x01 \x03(\t\"3\n\x19ResetConfigurationRequest\x12\x16\n\x0ethrottler_name\x18\x01 \x01(\t\"+\n\x1aResetConfigurationResponse\x12\r\n\x05names\x18\x01 \x03(\tb\x06proto3')
|
||||
serialized_pb=_b('\n\x13throttlerdata.proto\x12\rthrottlerdata\"\x11\n\x0fMaxRatesRequest\"{\n\x10MaxRatesResponse\x12\x39\n\x05rates\x18\x01 \x03(\x0b\x32*.throttlerdata.MaxRatesResponse.RatesEntry\x1a,\n\nRatesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x03:\x02\x38\x01\"!\n\x11SetMaxRateRequest\x12\x0c\n\x04rate\x18\x01 \x01(\x03\"#\n\x12SetMaxRateResponse\x12\r\n\x05names\x18\x01 \x03(\t\"\xa1\x03\n\rConfiguration\x12\"\n\x1atarget_replication_lag_sec\x18\x01 \x01(\x03\x12\x1f\n\x17max_replication_lag_sec\x18\x02 \x01(\x03\x12\x14\n\x0cinitial_rate\x18\x03 \x01(\x03\x12\x14\n\x0cmax_increase\x18\x04 \x01(\x01\x12\x1a\n\x12\x65mergency_decrease\x18\x05 \x01(\x01\x12*\n\"min_duration_between_increases_sec\x18\x06 \x01(\x03\x12*\n\"max_duration_between_increases_sec\x18\x07 \x01(\x03\x12*\n\"min_duration_between_decreases_sec\x18\x08 \x01(\x03\x12!\n\x19spread_backlog_across_sec\x18\t \x01(\x03\x12!\n\x19ignore_n_slowest_replicas\x18\n \x01(\x05\x12\x1e\n\x16\x61ge_bad_rate_after_sec\x18\x0b \x01(\x03\x12\x19\n\x11\x62\x61\x64_rate_increase\x18\x0c \x01(\x01\"1\n\x17GetConfigurationRequest\x12\x16\n\x0ethrottler_name\x18\x01 \x01(\t\"\xc4\x01\n\x18GetConfigurationResponse\x12S\n\x0e\x63onfigurations\x18\x01 \x03(\x0b\x32;.throttlerdata.GetConfigurationResponse.ConfigurationsEntry\x1aS\n\x13\x43onfigurationsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12+\n\x05value\x18\x02 \x01(\x0b\x32\x1c.throttlerdata.Configuration:\x02\x38\x01\"\x83\x01\n\x1aUpdateConfigurationRequest\x12\x16\n\x0ethrottler_name\x18\x01 \x01(\t\x12\x33\n\rconfiguration\x18\x02 \x01(\x0b\x32\x1c.throttlerdata.Configuration\x12\x18\n\x10\x63opy_zero_values\x18\x03 \x01(\x08\",\n\x1bUpdateConfigurationResponse\x12\r\n\x05names\x18\x01 \x03(\t\"3\n\x19ResetConfigurationRequest\x12\x16\n\x0ethrottler_name\x18\x01 \x01(\t\"+\n\x1aResetConfigurationResponse\x12\r\n\x05names\x18\x01 \x03(\tb\x06proto3')
|
||||
)
|
||||
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
|
||||
|
||||
|
@ -223,7 +223,7 @@ _CONFIGURATION = _descriptor.Descriptor(
|
|||
is_extension=False, extension_scope=None,
|
||||
options=None),
|
||||
_descriptor.FieldDescriptor(
|
||||
name='min_duration_between_changes_sec', full_name='throttlerdata.Configuration.min_duration_between_changes_sec', index=5,
|
||||
name='min_duration_between_increases_sec', full_name='throttlerdata.Configuration.min_duration_between_increases_sec', index=5,
|
||||
number=6, type=3, cpp_type=2, label=1,
|
||||
has_default_value=False, default_value=0,
|
||||
message_type=None, enum_type=None, containing_type=None,
|
||||
|
@ -237,22 +237,36 @@ _CONFIGURATION = _descriptor.Descriptor(
|
|||
is_extension=False, extension_scope=None,
|
||||
options=None),
|
||||
_descriptor.FieldDescriptor(
|
||||
name='ignore_n_slowest_replicas', full_name='throttlerdata.Configuration.ignore_n_slowest_replicas', index=7,
|
||||
number=8, type=5, cpp_type=1, label=1,
|
||||
name='min_duration_between_decreases_sec', full_name='throttlerdata.Configuration.min_duration_between_decreases_sec', index=7,
|
||||
number=8, type=3, cpp_type=2, label=1,
|
||||
has_default_value=False, default_value=0,
|
||||
message_type=None, enum_type=None, containing_type=None,
|
||||
is_extension=False, extension_scope=None,
|
||||
options=None),
|
||||
_descriptor.FieldDescriptor(
|
||||
name='age_bad_rate_after_sec', full_name='throttlerdata.Configuration.age_bad_rate_after_sec', index=8,
|
||||
name='spread_backlog_across_sec', full_name='throttlerdata.Configuration.spread_backlog_across_sec', index=8,
|
||||
number=9, type=3, cpp_type=2, label=1,
|
||||
has_default_value=False, default_value=0,
|
||||
message_type=None, enum_type=None, containing_type=None,
|
||||
is_extension=False, extension_scope=None,
|
||||
options=None),
|
||||
_descriptor.FieldDescriptor(
|
||||
name='bad_rate_increase', full_name='throttlerdata.Configuration.bad_rate_increase', index=9,
|
||||
number=10, type=1, cpp_type=5, label=1,
|
||||
name='ignore_n_slowest_replicas', full_name='throttlerdata.Configuration.ignore_n_slowest_replicas', index=9,
|
||||
number=10, type=5, cpp_type=1, label=1,
|
||||
has_default_value=False, default_value=0,
|
||||
message_type=None, enum_type=None, containing_type=None,
|
||||
is_extension=False, extension_scope=None,
|
||||
options=None),
|
||||
_descriptor.FieldDescriptor(
|
||||
name='age_bad_rate_after_sec', full_name='throttlerdata.Configuration.age_bad_rate_after_sec', index=10,
|
||||
number=11, type=3, cpp_type=2, label=1,
|
||||
has_default_value=False, default_value=0,
|
||||
message_type=None, enum_type=None, containing_type=None,
|
||||
is_extension=False, extension_scope=None,
|
||||
options=None),
|
||||
_descriptor.FieldDescriptor(
|
||||
name='bad_rate_increase', full_name='throttlerdata.Configuration.bad_rate_increase', index=11,
|
||||
number=12, type=1, cpp_type=5, label=1,
|
||||
has_default_value=False, default_value=0,
|
||||
message_type=None, enum_type=None, containing_type=None,
|
||||
is_extension=False, extension_scope=None,
|
||||
|
@ -270,7 +284,7 @@ _CONFIGURATION = _descriptor.Descriptor(
|
|||
oneofs=[
|
||||
],
|
||||
serialized_start=255,
|
||||
serialized_end=591,
|
||||
serialized_end=672,
|
||||
)
|
||||
|
||||
|
||||
|
@ -300,8 +314,8 @@ _GETCONFIGURATIONREQUEST = _descriptor.Descriptor(
|
|||
extension_ranges=[],
|
||||
oneofs=[
|
||||
],
|
||||
serialized_start=593,
|
||||
serialized_end=642,
|
||||
serialized_start=674,
|
||||
serialized_end=723,
|
||||
)
|
||||
|
||||
|
||||
|
@ -338,8 +352,8 @@ _GETCONFIGURATIONRESPONSE_CONFIGURATIONSENTRY = _descriptor.Descriptor(
|
|||
extension_ranges=[],
|
||||
oneofs=[
|
||||
],
|
||||
serialized_start=758,
|
||||
serialized_end=841,
|
||||
serialized_start=839,
|
||||
serialized_end=922,
|
||||
)
|
||||
|
||||
_GETCONFIGURATIONRESPONSE = _descriptor.Descriptor(
|
||||
|
@ -368,8 +382,8 @@ _GETCONFIGURATIONRESPONSE = _descriptor.Descriptor(
|
|||
extension_ranges=[],
|
||||
oneofs=[
|
||||
],
|
||||
serialized_start=645,
|
||||
serialized_end=841,
|
||||
serialized_start=726,
|
||||
serialized_end=922,
|
||||
)
|
||||
|
||||
|
||||
|
@ -413,8 +427,8 @@ _UPDATECONFIGURATIONREQUEST = _descriptor.Descriptor(
|
|||
extension_ranges=[],
|
||||
oneofs=[
|
||||
],
|
||||
serialized_start=844,
|
||||
serialized_end=975,
|
||||
serialized_start=925,
|
||||
serialized_end=1056,
|
||||
)
|
||||
|
||||
|
||||
|
@ -444,8 +458,8 @@ _UPDATECONFIGURATIONRESPONSE = _descriptor.Descriptor(
|
|||
extension_ranges=[],
|
||||
oneofs=[
|
||||
],
|
||||
serialized_start=977,
|
||||
serialized_end=1021,
|
||||
serialized_start=1058,
|
||||
serialized_end=1102,
|
||||
)
|
||||
|
||||
|
||||
|
@ -475,8 +489,8 @@ _RESETCONFIGURATIONREQUEST = _descriptor.Descriptor(
|
|||
extension_ranges=[],
|
||||
oneofs=[
|
||||
],
|
||||
serialized_start=1023,
|
||||
serialized_end=1074,
|
||||
serialized_start=1104,
|
||||
serialized_end=1155,
|
||||
)
|
||||
|
||||
|
||||
|
@ -506,8 +520,8 @@ _RESETCONFIGURATIONRESPONSE = _descriptor.Descriptor(
|
|||
extension_ranges=[],
|
||||
oneofs=[
|
||||
],
|
||||
serialized_start=1076,
|
||||
serialized_end=1119,
|
||||
serialized_start=1157,
|
||||
serialized_end=1200,
|
||||
)
|
||||
|
||||
_MAXRATESRESPONSE_RATESENTRY.containing_type = _MAXRATESRESPONSE
|
||||
|
|
|
@ -313,11 +313,13 @@ class BaseShardingTest(object):
|
|||
'initial_rate:3 '
|
||||
'max_increase:0.4 '
|
||||
'emergency_decrease:0.5 '
|
||||
'min_duration_between_changes_sec:6 '
|
||||
'min_duration_between_increases_sec:6 '
|
||||
'max_duration_between_increases_sec:7 '
|
||||
'min_duration_between_decreases_sec:8 '
|
||||
'spread_backlog_across_sec:9 '
|
||||
'ignore_n_slowest_replicas:0 '
|
||||
'age_bad_rate_after_sec:9 '
|
||||
'bad_rate_increase:0.10 '],
|
||||
'age_bad_rate_after_sec:11 '
|
||||
'bad_rate_increase:0.12 '],
|
||||
auto_log=True, trap_output=True)
|
||||
self.assertIn('%d active throttler(s)' % len(names), stdout)
|
||||
# Check the updated configuration.
|
||||
|
|
Загрузка…
Ссылка в новой задаче