diff --git a/go/vt/proto/throttlerdata/throttlerdata.pb.go b/go/vt/proto/throttlerdata/throttlerdata.pb.go index c3d05d832c..9fd6ade490 100644 --- a/go/vt/proto/throttlerdata/throttlerdata.pb.go +++ b/go/vt/proto/throttlerdata/throttlerdata.pb.go @@ -100,8 +100,8 @@ type Configuration struct { // max_replication_lag_sec is meant as a last resort. // By default, the module tries to find out the system maximum capacity while // trying to keep the replication lag around "target_replication_lag_sec". - // Usually, we'll wait min_duration_between_changes_sec to see the effect of a - // throttler rate change on the replication lag. + // Usually, we'll wait min_duration_between_(increases|decreases)_sec to see + // the effect of a throttler rate change on the replication lag. // But if the lag goes above this field's value we will go into an "emergency" // state and throttle more aggressively (see "emergency_decrease" below). // This is the only way to ensure that the system will recover. @@ -121,18 +121,32 @@ type Configuration struct { // if the observed replication lag is above "max_replication_lag_sec". // E.g. 0.50 decreases the current rate by 50%. EmergencyDecrease float64 `protobuf:"fixed64,5,opt,name=emergency_decrease,json=emergencyDecrease" json:"emergency_decrease,omitempty"` - // min_duration_between_changes_sec specifies how long we'll wait for the last - // rate increase or decrease to have an effect on the system. - MinDurationBetweenChangesSec int64 `protobuf:"varint,6,opt,name=min_duration_between_changes_sec,json=minDurationBetweenChangesSec" json:"min_duration_between_changes_sec,omitempty"` + // min_duration_between_increases_sec specifies how long we'll wait at least + // for the last rate increase to have an effect on the system. + MinDurationBetweenIncreasesSec int64 `protobuf:"varint,6,opt,name=min_duration_between_increases_sec,json=minDurationBetweenIncreasesSec" json:"min_duration_between_increases_sec,omitempty"` // max_duration_between_increases_sec specifies how long we'll wait at most // for the last rate increase to have an effect on the system. MaxDurationBetweenIncreasesSec int64 `protobuf:"varint,7,opt,name=max_duration_between_increases_sec,json=maxDurationBetweenIncreasesSec" json:"max_duration_between_increases_sec,omitempty"` + // min_duration_between_decreases_sec specifies how long we'll wait at least + // for the last rate decrease to have an effect on the system. + MinDurationBetweenDecreasesSec int64 `protobuf:"varint,8,opt,name=min_duration_between_decreases_sec,json=minDurationBetweenDecreasesSec" json:"min_duration_between_decreases_sec,omitempty"` + // spread_backlog_across_sec is used when we set the throttler rate after + // we guessed the rate of a slave and determined its backlog. + // For example, at a guessed rate of 100 QPS and a lag of 10s, the replica has + // a backlog of 1000 queries. + // When we set the new, decreased throttler rate, we factor in how long it + // will take the slave to go through the backlog (in addition to new + // requests). This field specifies over which timespan we plan to spread this. + // For example, for a backlog of 1000 queries spread over 5s means that we + // have to further reduce the rate by 200 QPS or the backlog will not be + // processed within the 5 seconds. + SpreadBacklogAcrossSec int64 `protobuf:"varint,9,opt,name=spread_backlog_across_sec,json=spreadBacklogAcrossSec" json:"spread_backlog_across_sec,omitempty"` // ignore_n_slowest_replicas will ignore replication lag updates from the // N slowest replicas. Under certain circumstances, replicas are still // considered e.g. a) if the lag is at most max_replication_lag_sec, b) there // are less than N+1 replicas or c) the lag increased on each replica such // that all replicas were ignored in a row. - IgnoreNSlowestReplicas int32 `protobuf:"varint,8,opt,name=ignore_n_slowest_replicas,json=ignoreNSlowestReplicas" json:"ignore_n_slowest_replicas,omitempty"` + IgnoreNSlowestReplicas int32 `protobuf:"varint,10,opt,name=ignore_n_slowest_replicas,json=ignoreNSlowestReplicas" json:"ignore_n_slowest_replicas,omitempty"` // age_bad_rate_after_sec is the duration after which an unchanged bad rate // will "age out" and increase by "bad_rate_increase". // Bad rates are tracked by the code in memory.go and serve as an upper bound @@ -140,10 +154,10 @@ type Configuration struct { // try known too high (bad) rates over and over again. // To avoid that temporary degradations permanently reduce the maximum rate, // a stable bad rate "ages out" after "age_bad_rate_after_sec". - AgeBadRateAfterSec int64 `protobuf:"varint,9,opt,name=age_bad_rate_after_sec,json=ageBadRateAfterSec" json:"age_bad_rate_after_sec,omitempty"` + AgeBadRateAfterSec int64 `protobuf:"varint,11,opt,name=age_bad_rate_after_sec,json=ageBadRateAfterSec" json:"age_bad_rate_after_sec,omitempty"` // bad_rate_increase defines the percentage by which a bad rate will be // increased when it's aging out. - BadRateIncrease float64 `protobuf:"fixed64,10,opt,name=bad_rate_increase,json=badRateIncrease" json:"bad_rate_increase,omitempty"` + BadRateIncrease float64 `protobuf:"fixed64,12,opt,name=bad_rate_increase,json=badRateIncrease" json:"bad_rate_increase,omitempty"` } func (m *Configuration) Reset() { *m = Configuration{} } @@ -257,45 +271,47 @@ func init() { func init() { proto.RegisterFile("throttlerdata.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 629 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xa4, 0x54, 0x5f, 0x4f, 0x13, 0x4f, - 0x14, 0xcd, 0x52, 0xca, 0x0f, 0x6e, 0x7f, 0xfc, 0xe9, 0x40, 0xa0, 0x54, 0x62, 0xea, 0x26, 0xc6, - 0x86, 0xc4, 0x3e, 0x94, 0x98, 0xa0, 0xbc, 0x60, 0x41, 0x8d, 0x46, 0x79, 0x58, 0xa2, 0x0f, 0xbe, - 0x4c, 0x6e, 0xb7, 0x97, 0x65, 0x63, 0x77, 0x76, 0x9d, 0x19, 0xa4, 0xf5, 0x43, 0xf8, 0x35, 0x7c, - 0xf6, 0x1b, 0xf9, 0x51, 0xcc, 0xcc, 0x4e, 0xff, 0x6c, 0x5b, 0x89, 0x09, 0x6f, 0x9d, 0x73, 0xcf, - 0x9c, 0x7b, 0xe6, 0xf6, 0x9e, 0x85, 0x6d, 0x7d, 0x2d, 0x53, 0xad, 0xfb, 0x24, 0x7b, 0xa8, 0xb1, - 0x95, 0xc9, 0x54, 0xa7, 0x6c, 0xbd, 0x00, 0xfa, 0x55, 0xd8, 0xfc, 0x80, 0x83, 0x00, 0x35, 0xa9, - 0x80, 0xbe, 0xde, 0x90, 0xd2, 0xfe, 0x0f, 0x0f, 0xb6, 0x26, 0x98, 0xca, 0x52, 0xa1, 0x88, 0x9d, - 0x42, 0x59, 0x1a, 0xa0, 0xe6, 0x35, 0x4a, 0xcd, 0x4a, 0xfb, 0xb0, 0x55, 0xd4, 0x9e, 0xe5, 0xb7, - 0xec, 0xe9, 0x95, 0xd0, 0x72, 0x18, 0xe4, 0x17, 0xeb, 0xc7, 0x00, 0x13, 0x90, 0x6d, 0x41, 0xe9, - 0x0b, 0x0d, 0x6b, 0x5e, 0xc3, 0x6b, 0xae, 0x05, 0xe6, 0x27, 0xdb, 0x81, 0xf2, 0x37, 0xec, 0xdf, - 0x50, 0x6d, 0xa9, 0xe1, 0x35, 0x4b, 0x41, 0x7e, 0x78, 0xb1, 0x74, 0xec, 0xf9, 0x4f, 0xa0, 0x7a, - 0x49, 0xda, 0xb5, 0x70, 0x2e, 0x19, 0x83, 0x65, 0xa3, 0x6b, 0x15, 0x4a, 0x81, 0xfd, 0xed, 0x1f, - 0x02, 0x9b, 0x26, 0x3a, 0xeb, 0x3b, 0x50, 0x16, 0x98, 0x38, 0xeb, 0x6b, 0x41, 0x7e, 0xf0, 0x7f, - 0x2e, 0xc3, 0xfa, 0x59, 0x2a, 0xae, 0xe2, 0xe8, 0x46, 0xa2, 0x8e, 0x53, 0xc1, 0x4e, 0xa0, 0xae, - 0x51, 0x46, 0xa4, 0xb9, 0xa4, 0xac, 0x1f, 0x87, 0x16, 0xe5, 0x7d, 0x8c, 0xb8, 0xa2, 0xd0, 0xf5, - 0xd9, 0xcb, 0x19, 0xc1, 0x84, 0xf0, 0x1e, 0xa3, 0x4b, 0x0a, 0xd9, 0x33, 0xd8, 0x4b, 0x70, 0xb0, - 0xf0, 0x66, 0xfe, 0x9e, 0x9d, 0x04, 0x07, 0xf3, 0xd7, 0x1e, 0xc1, 0xff, 0xb1, 0x88, 0x75, 0x8c, - 0x7d, 0x6e, 0x5f, 0x53, 0xb2, 0xdc, 0x8a, 0xc3, 0xcc, 0x33, 0x0c, 0xc5, 0x28, 0xc7, 0x22, 0x94, - 0x84, 0x8a, 0x6a, 0xcb, 0x0d, 0xaf, 0xe9, 0x05, 0x95, 0x04, 0x07, 0x6f, 0x1d, 0xc4, 0x9e, 0x02, - 0xa3, 0x84, 0x64, 0x44, 0x22, 0x1c, 0xf2, 0x1e, 0x39, 0x62, 0xd9, 0x12, 0xab, 0xe3, 0xca, 0xb9, - 0x2b, 0xb0, 0xd7, 0xd0, 0x48, 0x62, 0xc1, 0x7b, 0xee, 0xe1, 0xbc, 0x4b, 0xfa, 0x96, 0x48, 0xf0, - 0xf0, 0x1a, 0x45, 0x44, 0xca, 0x9a, 0x5e, 0xb1, 0x46, 0x0e, 0x92, 0x58, 0x9c, 0x3b, 0x5a, 0x27, - 0x67, 0x9d, 0xe5, 0x24, 0x63, 0xfe, 0x1d, 0xf8, 0xc6, 0xd9, 0x9c, 0xce, 0xc8, 0x6a, 0xae, 0xf4, - 0x9f, 0x55, 0x7a, 0x98, 0xe0, 0x60, 0x46, 0x69, 0x64, 0xdf, 0x6a, 0x3d, 0x87, 0xfd, 0x38, 0x12, - 0xa9, 0x24, 0x2e, 0xb8, 0xea, 0xa7, 0xb7, 0xa4, 0xc6, 0x7f, 0x83, 0xaa, 0xad, 0x36, 0xbc, 0x66, - 0x39, 0xd8, 0xcd, 0x09, 0x17, 0x97, 0x79, 0xd9, 0x0d, 0x53, 0xb1, 0x36, 0xec, 0x62, 0x44, 0xbc, - 0x8b, 0x3d, 0x3b, 0x43, 0x8e, 0x57, 0x9a, 0xa4, 0x6d, 0xbd, 0x66, 0x5b, 0x33, 0x8c, 0xa8, 0x83, - 0x3d, 0x33, 0xcc, 0x97, 0xa6, 0x64, 0xda, 0x1d, 0x42, 0x75, 0xcc, 0x1f, 0x4f, 0x16, 0xec, 0xc0, - 0x36, 0xbb, 0x39, 0x77, 0x64, 0xcf, 0x3f, 0x85, 0xbd, 0x37, 0xa4, 0x0b, 0xbb, 0x32, 0x5a, 0xc2, - 0xc7, 0xb0, 0x31, 0xce, 0x01, 0x37, 0x7b, 0xe5, 0x16, 0x7a, 0x12, 0xb2, 0x0b, 0x4c, 0xc8, 0xff, - 0xed, 0x41, 0x6d, 0x5e, 0xc2, 0xad, 0x67, 0x08, 0x1b, 0xe1, 0x74, 0x61, 0x14, 0xb1, 0x93, 0x99, - 0x88, 0xfd, 0x4d, 0xa0, 0x55, 0x40, 0x5d, 0xe6, 0x66, 0x24, 0xeb, 0x1c, 0xb6, 0x17, 0xd0, 0x16, - 0xa4, 0xb0, 0x3d, 0x9d, 0xc2, 0x4a, 0xfb, 0x60, 0xc6, 0x44, 0xd1, 0xc1, 0x54, 0x46, 0x7f, 0x79, - 0x50, 0xff, 0x98, 0xf5, 0x50, 0xd3, 0x3d, 0x06, 0xc5, 0x3a, 0xb0, 0x5e, 0x30, 0xfe, 0x4f, 0x2e, - 0x8a, 0x57, 0x58, 0x13, 0xb6, 0xc2, 0x34, 0x1b, 0xf2, 0xef, 0x24, 0x53, 0x6e, 0x0d, 0x2a, 0x1b, - 0xab, 0x55, 0x33, 0x94, 0x6c, 0xf8, 0x99, 0x64, 0xfa, 0xc9, 0xa2, 0xfe, 0x11, 0x3c, 0x58, 0x68, - 0xf9, 0xce, 0xef, 0x46, 0x07, 0xf6, 0x03, 0x52, 0xf7, 0xdb, 0x87, 0x36, 0xd4, 0x17, 0x69, 0xdc, - 0xd5, 0xb7, 0xbb, 0x62, 0x3f, 0xdf, 0x47, 0x7f, 0x02, 0x00, 0x00, 0xff, 0xff, 0xcc, 0x34, 0x98, - 0x5c, 0xd5, 0x05, 0x00, 0x00, + // 658 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xa4, 0x55, 0x5d, 0x4f, 0x13, 0x4d, + 0x14, 0xce, 0x52, 0xca, 0x0b, 0xa7, 0x7c, 0x75, 0x20, 0x50, 0xfa, 0x1a, 0x53, 0x37, 0x31, 0x36, + 0x24, 0xf6, 0xa2, 0xc4, 0x04, 0xe5, 0x06, 0x2a, 0xc6, 0x68, 0x94, 0x8b, 0x25, 0x7a, 0xe1, 0xcd, + 0xe4, 0x74, 0xf7, 0xb0, 0x6e, 0xd8, 0x2f, 0x67, 0x06, 0x69, 0xfd, 0x11, 0xfe, 0x17, 0xfd, 0x45, + 0xfe, 0x14, 0xb3, 0x33, 0xd3, 0x8f, 0x2d, 0x05, 0x4c, 0xb8, 0xdb, 0x39, 0xe7, 0x99, 0xe7, 0x3c, + 0x67, 0xce, 0x3c, 0xb3, 0xb0, 0xa5, 0xbe, 0x8a, 0x4c, 0xa9, 0x98, 0x44, 0x80, 0x0a, 0x3b, 0xb9, + 0xc8, 0x54, 0xc6, 0xd6, 0x4a, 0x41, 0xb7, 0x0e, 0x1b, 0x1f, 0x71, 0xe0, 0xa1, 0x22, 0xe9, 0xd1, + 0xb7, 0x2b, 0x92, 0xca, 0xfd, 0xe9, 0xc0, 0xe6, 0x24, 0x26, 0xf3, 0x2c, 0x95, 0xc4, 0x8e, 0xa1, + 0x2a, 0x8a, 0x40, 0xc3, 0x69, 0x55, 0xda, 0xb5, 0xee, 0x7e, 0xa7, 0xcc, 0x3d, 0x8b, 0xef, 0xe8, + 0xd5, 0x9b, 0x54, 0x89, 0xa1, 0x67, 0x36, 0x36, 0x0f, 0x01, 0x26, 0x41, 0xb6, 0x09, 0x95, 0x4b, + 0x1a, 0x36, 0x9c, 0x96, 0xd3, 0x5e, 0xf1, 0x8a, 0x4f, 0xb6, 0x0d, 0xd5, 0xef, 0x18, 0x5f, 0x51, + 0x63, 0xa1, 0xe5, 0xb4, 0x2b, 0x9e, 0x59, 0xbc, 0x5a, 0x38, 0x74, 0xdc, 0x67, 0x50, 0x3f, 0x27, + 0x65, 0x4b, 0x58, 0x95, 0x8c, 0xc1, 0x62, 0xc1, 0xab, 0x19, 0x2a, 0x9e, 0xfe, 0x76, 0xf7, 0x81, + 0x4d, 0x03, 0xad, 0xf4, 0x6d, 0xa8, 0xa6, 0x98, 0x58, 0xe9, 0x2b, 0x9e, 0x59, 0xb8, 0xbf, 0xab, + 0xb0, 0xf6, 0x3a, 0x4b, 0x2f, 0xa2, 0xf0, 0x4a, 0xa0, 0x8a, 0xb2, 0x94, 0x1d, 0x41, 0x53, 0xa1, + 0x08, 0x49, 0x71, 0x41, 0x79, 0x1c, 0xf9, 0x3a, 0xca, 0x63, 0x0c, 0xb9, 0x24, 0xdf, 0xd6, 0xd9, + 0x35, 0x08, 0x6f, 0x02, 0xf8, 0x80, 0xe1, 0x39, 0xf9, 0xec, 0x05, 0xec, 0x26, 0x38, 0x98, 0xbb, + 0xd3, 0xf4, 0xb3, 0x9d, 0xe0, 0xe0, 0xe6, 0xb6, 0x27, 0xb0, 0x1a, 0xa5, 0x91, 0x8a, 0x30, 0xe6, + 0xba, 0x9b, 0x8a, 0xc6, 0xd6, 0x6c, 0xac, 0x68, 0xa3, 0x80, 0x14, 0xcc, 0x51, 0xea, 0x0b, 0x42, + 0x49, 0x8d, 0xc5, 0x96, 0xd3, 0x76, 0xbc, 0x5a, 0x82, 0x83, 0x77, 0x36, 0xc4, 0x9e, 0x03, 0xa3, + 0x84, 0x44, 0x48, 0xa9, 0x3f, 0xe4, 0x01, 0x59, 0x60, 0x55, 0x03, 0xeb, 0xe3, 0xcc, 0xa9, 0x4d, + 0xb0, 0xf7, 0xe0, 0x26, 0x51, 0xca, 0x03, 0xdb, 0x38, 0xef, 0x93, 0xba, 0x26, 0x4a, 0xc7, 0x25, + 0xa4, 0x96, 0xbd, 0xa4, 0xa5, 0x3c, 0x4e, 0xa2, 0xf4, 0xd4, 0x02, 0x7b, 0x06, 0x37, 0x2a, 0x2b, + 0x8b, 0x06, 0x0a, 0x2e, 0x1c, 0xdc, 0xc7, 0xf5, 0x9f, 0xe5, 0xc2, 0xc1, 0x7d, 0x5c, 0xf3, 0x74, + 0x8d, 0x3a, 0x32, 0x5c, 0xcb, 0xb7, 0xe9, 0x1a, 0xf5, 0xa7, 0xb9, 0x5e, 0xc2, 0x9e, 0xcc, 0x05, + 0x61, 0xc0, 0xfb, 0xe8, 0x5f, 0xc6, 0x59, 0xc8, 0xd1, 0x17, 0x99, 0x34, 0x14, 0x2b, 0x9a, 0x62, + 0xc7, 0x00, 0x7a, 0x26, 0x7f, 0xa2, 0xd3, 0x76, 0x6b, 0x14, 0xa6, 0x99, 0x20, 0x9e, 0x72, 0x19, + 0x67, 0xd7, 0x24, 0xc7, 0x37, 0x42, 0x36, 0xa0, 0xe5, 0xb4, 0xab, 0xde, 0x8e, 0x01, 0x9c, 0x9d, + 0x9b, 0xb4, 0x9d, 0xab, 0x64, 0x5d, 0xd8, 0xc1, 0x90, 0x78, 0x1f, 0x03, 0x3d, 0x4e, 0x8e, 0x17, + 0x8a, 0x84, 0x2e, 0x59, 0xd3, 0x25, 0x19, 0x86, 0xd4, 0xc3, 0xa0, 0x98, 0xeb, 0x49, 0x91, 0x2a, + 0xca, 0xed, 0x43, 0x7d, 0x8c, 0x1f, 0x0f, 0x79, 0x55, 0xcf, 0x6e, 0xa3, 0x6f, 0xb0, 0xa3, 0x53, + 0x72, 0x8f, 0x61, 0xf7, 0x2d, 0xa9, 0xd2, 0xb5, 0x1d, 0xf9, 0xe1, 0x29, 0xac, 0x8f, 0x2d, 0xc9, + 0x8b, 0x2b, 0x6e, 0xbd, 0x35, 0xf1, 0xfb, 0x19, 0x26, 0xe4, 0xfe, 0x71, 0xa0, 0x71, 0x93, 0xc2, + 0x3a, 0xc5, 0x87, 0x75, 0x7f, 0x3a, 0x31, 0x72, 0xfb, 0xd1, 0x8c, 0xdb, 0x6f, 0x23, 0xe8, 0x94, + 0xa2, 0xd6, 0xfe, 0x33, 0x94, 0x4d, 0x0e, 0x5b, 0x73, 0x60, 0x73, 0x1e, 0x84, 0xee, 0xf4, 0x83, + 0x50, 0xeb, 0x3e, 0x9a, 0x11, 0x51, 0x56, 0x30, 0xf5, 0x5c, 0xfc, 0x72, 0xa0, 0xf9, 0x29, 0x0f, + 0x50, 0xd1, 0x03, 0x0e, 0x8a, 0xf5, 0x60, 0xad, 0x24, 0xfc, 0x9f, 0x54, 0x94, 0xb7, 0xb0, 0x36, + 0x6c, 0xfa, 0x59, 0x3e, 0xe4, 0x3f, 0x48, 0x64, 0x5c, 0x0b, 0x94, 0xda, 0xe1, 0xcb, 0xc5, 0xa1, + 0xe4, 0xc3, 0x2f, 0x24, 0xb2, 0xcf, 0x3a, 0xea, 0x1e, 0xc0, 0xff, 0x73, 0x25, 0xdf, 0xf9, 0x84, + 0xf5, 0x60, 0xcf, 0x23, 0xf9, 0xb0, 0xfb, 0xd0, 0x85, 0xe6, 0x3c, 0x8e, 0xbb, 0xea, 0xf6, 0x97, + 0xf4, 0x9f, 0xe4, 0xe0, 0x6f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x5a, 0x18, 0x19, 0x7c, 0x60, 0x06, + 0x00, 0x00, } diff --git a/go/vt/throttler/max_replication_lag_module.go b/go/vt/throttler/max_replication_lag_module.go index aab29a57ef..f0cab6e59a 100644 --- a/go/vt/throttler/max_replication_lag_module.go +++ b/go/vt/throttler/max_replication_lag_module.go @@ -374,7 +374,7 @@ func (m *MaxReplicationLagModule) increaseRate(r *result, now time.Time, lagReco } func (m *MaxReplicationLagModule) updateNextAllowedIncrease(now time.Time, increase float64, key string) { - minDuration := m.config.MinDurationBetweenChanges() + minDuration := m.config.MinDurationBetweenIncreases() // We may have to wait longer than the configured minimum duration // until we see an effect of the increase. // Example: If the increase was fully over the capacity, it will take @@ -471,7 +471,7 @@ func (m *MaxReplicationLagModule) decreaseAndGuessRate(r *result, now time.Time, // Guess the slave capacity based on the replication lag change. rate, reason := m.guessSlaveRate(r, avgMasterRate, lagBefore, lagNow, lagDifference, d) - m.nextAllowedDecrease = now.Add(m.config.MinDurationBetweenChanges() + 2*time.Second) + m.nextAllowedDecrease = now.Add(m.config.MinDurationBetweenDecreases()) m.updateRate(r, stateDecreaseAndGuessRate, rate, reason, now, lagRecordNow) } @@ -508,16 +508,16 @@ func (m *MaxReplicationLagModule) guessSlaveRate(r *result, avgMasterRate float6 newRate := avgSlaveRate // Reduce the new rate such that it has time to catch up the requests it's // behind within the next interval. - futureRequests := newRate * m.config.MinDurationBetweenChanges().Seconds() + futureRequests := newRate * m.config.SpreadBacklogAcross().Seconds() newRate *= (futureRequests - requestsBehind) / futureRequests var reason string if newRate < 1 { // Backlog is too high. Reduce rate to 1 request/second. // TODO(mberlin): Make this a constant. newRate = 1 - reason = fmt.Sprintf("based on the guessed slave rate of: %v the slave won't be able to process the guessed backlog of %d requests within the next %.f seconds", int64(avgSlaveRate), int64(requestsBehind), m.config.MinDurationBetweenChanges().Seconds()) + reason = fmt.Sprintf("based on the guessed slave rate of: %v the slave won't be able to process the guessed backlog of %d requests within the next %.f seconds", int64(avgSlaveRate), int64(requestsBehind), m.config.SpreadBacklogAcross().Seconds()) } else { - reason = fmt.Sprintf("new rate is %d lower than the guessed slave rate to account for a guessed backlog of %d requests over %.f seconds", int64(avgSlaveRate-newRate), int64(requestsBehind), m.config.MinDurationBetweenChanges().Seconds()) + reason = fmt.Sprintf("new rate is %d lower than the guessed slave rate to account for a guessed backlog of %d requests over %.f seconds", int64(avgSlaveRate-newRate), int64(requestsBehind), m.config.SpreadBacklogAcross().Seconds()) } return int64(newRate), reason diff --git a/go/vt/throttler/max_replication_lag_module_config.go b/go/vt/throttler/max_replication_lag_module_config.go index d80b3a07f0..88d41e4e2c 100644 --- a/go/vt/throttler/max_replication_lag_module_config.go +++ b/go/vt/throttler/max_replication_lag_module_config.go @@ -14,6 +14,10 @@ type MaxReplicationLagModuleConfig struct { throttlerdata.Configuration } +// Most of the values are based on the assumption that vttablet is started +// with the flag --health_check_interval=20s. +const healthCheckInterval = 20 + var defaultMaxReplicationLagModuleConfig = MaxReplicationLagModuleConfig{ throttlerdata.Configuration{ TargetReplicationLagSec: 2, @@ -24,11 +28,15 @@ var defaultMaxReplicationLagModuleConfig = MaxReplicationLagModuleConfig{ MaxIncrease: 1, EmergencyDecrease: 0.5, - MinDurationBetweenChangesSec: 10, + // Wait for two health broadcast rounds. Otherwise, the "decrease" mode + // has less than 2 lag records available to calculate the actual slave rate. + MinDurationBetweenIncreasesSec: 2 * healthCheckInterval, // MaxDurationBetweenIncreasesSec defaults to 60+2 seconds because this - // corresponds to three 3 broadcasts (assuming --health_check_interval=20s). + // corresponds to three 3 broadcasts. // The 2 extra seconds give us headroom to account for delay in the process. - MaxDurationBetweenIncreasesSec: 60 + 2, + MaxDurationBetweenIncreasesSec: 3*healthCheckInterval + 2, + MinDurationBetweenDecreasesSec: healthCheckInterval, + SpreadBacklogAcrossSec: healthCheckInterval, AgeBadRateAfterSec: 3 * 60, BadRateIncrease: 0.10, @@ -67,12 +75,18 @@ func (c MaxReplicationLagModuleConfig) Verify() error { if c.EmergencyDecrease <= 0 { return fmt.Errorf("emergency_decrease must be > 0") } - if c.MinDurationBetweenChangesSec < 1 { - return fmt.Errorf("min_duration_between_changes_sec must be >= 1") + if c.MinDurationBetweenIncreasesSec < 1 { + return fmt.Errorf("min_duration_between_increases_sec must be >= 1") } if c.MaxDurationBetweenIncreasesSec < 1 { return fmt.Errorf("max_duration_between_increases_sec must be >= 1") } + if c.MinDurationBetweenDecreasesSec < 1 { + return fmt.Errorf("min_duration_between_decreases_sec must be >= 1") + } + if c.SpreadBacklogAcrossSec < 1 { + return fmt.Errorf("spread_backlog_across_sec must be >= 1") + } if c.IgnoreNSlowestReplicas < 0 { return fmt.Errorf("ignore_n_slowest_replicas must be >= 0") } @@ -82,10 +96,10 @@ func (c MaxReplicationLagModuleConfig) Verify() error { return nil } -// MinDurationBetweenChanges is a helper function which returns the respective +// MinDurationBetweenIncreases is a helper function which returns the respective // protobuf field as native Go type. -func (c MaxReplicationLagModuleConfig) MinDurationBetweenChanges() time.Duration { - return time.Duration(c.MinDurationBetweenChangesSec) * time.Second +func (c MaxReplicationLagModuleConfig) MinDurationBetweenIncreases() time.Duration { + return time.Duration(c.MinDurationBetweenIncreasesSec) * time.Second } // MaxDurationBetweenIncreases is a helper function which returns the respective @@ -94,6 +108,18 @@ func (c MaxReplicationLagModuleConfig) MaxDurationBetweenIncreases() time.Durati return time.Duration(c.MaxDurationBetweenIncreasesSec) * time.Second } +// MinDurationBetweenDecreases is a helper function which returns the respective +// protobuf field as native Go type. +func (c MaxReplicationLagModuleConfig) MinDurationBetweenDecreases() time.Duration { + return time.Duration(c.MinDurationBetweenDecreasesSec) * time.Second +} + +// SpreadBacklogAcross is a helper function which returns the respective +// protobuf field as native Go type. +func (c MaxReplicationLagModuleConfig) SpreadBacklogAcross() time.Duration { + return time.Duration(c.SpreadBacklogAcrossSec) * time.Second +} + // AgeBadRateAfter is a helper function which returns the respective // protobuf field as native Go type. func (c MaxReplicationLagModuleConfig) AgeBadRateAfter() time.Duration { diff --git a/go/vt/throttler/max_replication_lag_module_test.go b/go/vt/throttler/max_replication_lag_module_test.go index efecaf310f..c8f8f143cb 100644 --- a/go/vt/throttler/max_replication_lag_module_test.go +++ b/go/vt/throttler/max_replication_lag_module_test.go @@ -131,12 +131,13 @@ func TestMaxReplicationLagModule_Increase(t *testing.T) { tf.ratesHistory.add(sinceZero(69*time.Second), 100) tf.process(lagRecord(sinceZero(70*time.Second), r2, 0)) // Rate was increased to 200 based on actual rate of 100 within [0s, 69s]. + // r2 becomes the "replica under test". if err := tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)); err != nil { t.Fatal(err) } - // We have to wait at least config.MinDurationBetweenChangesSec (10s) before + // We have to wait at least config.MinDurationBetweenIncreasesSec (40s) before // the next increase. - if got, want := tf.m.nextAllowedIncrease, sinceZero(70*time.Second).Add(tf.m.config.MinDurationBetweenChanges()); got != want { + if got, want := tf.m.nextAllowedIncrease, sinceZero(70*time.Second).Add(tf.m.config.MinDurationBetweenIncreases()); got != want { t.Fatalf("got = %v, want = %v", got, want) } // r2 @ 75s, 0s lag @@ -157,11 +158,29 @@ func TestMaxReplicationLagModule_Increase(t *testing.T) { t.Fatal(err) } + // No increase is possible for the next 20 seconds. + // r2 @ 90s, 0s lag tf.ratesHistory.add(sinceZero(80*time.Second), 200) tf.ratesHistory.add(sinceZero(89*time.Second), 200) tf.process(lagRecord(sinceZero(90*time.Second), r2, 0)) - if err := tf.checkState(stateIncreaseRate, 400, sinceZero(90*time.Second)); err != nil { + if err := tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)); err != nil { + t.Fatal(err) + } + + // r1 @ 100s, 0s lag + tf.ratesHistory.add(sinceZero(99*time.Second), 200) + tf.process(lagRecord(sinceZero(100*time.Second), r1, 0)) + if err := tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)); err != nil { + t.Fatal(err) + } + + // Next rate increase is possible after testing the rate for 40s. + + // r2 @ 110s, 0s lag + tf.ratesHistory.add(sinceZero(109*time.Second), 200) + tf.process(lagRecord(sinceZero(110*time.Second), r2, 0)) + if err := tf.checkState(stateIncreaseRate, 400, sinceZero(110*time.Second)); err != nil { t.Fatal(err) } } @@ -180,6 +199,7 @@ func TestMaxReplicationLagModule_Increase_LastErrorOrNotUp(t *testing.T) { tf.ratesHistory.add(sinceZero(69*time.Second), 100) tf.process(lagRecord(sinceZero(70*time.Second), r2, 0)) // Rate was increased to 200 based on actual rate of 100 within [0s, 69s]. + // r2 becomes the "replica under test". if err := tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)); err != nil { t.Fatal(err) } @@ -189,31 +209,32 @@ func TestMaxReplicationLagModule_Increase_LastErrorOrNotUp(t *testing.T) { rError.LastError = errors.New("HealthCheck reporting broken") tf.m.lagCache.add(rError) - // r1 @ 80s, 0s lag + // r1 @ 110s, 0s lag tf.ratesHistory.add(sinceZero(70*time.Second), 100) - tf.ratesHistory.add(sinceZero(79*time.Second), 200) - tf.process(lagRecord(sinceZero(80*time.Second), r1, 0)) - // The r1 lag update triggered an increase and did not wait for r2 - // because r2 has LastError set. - if err := tf.checkState(stateIncreaseRate, 400, sinceZero(80*time.Second)); err != nil { + tf.ratesHistory.add(sinceZero(109*time.Second), 200) + tf.process(lagRecord(sinceZero(110*time.Second), r1, 0)) + // We ignore r2 as "replica under test" because it has LastError set. + // Instead, we act on r1. + // r1 becomes the "replica under test". + if err := tf.checkState(stateIncreaseRate, 400, sinceZero(110*time.Second)); err != nil { t.Fatal(err) } - // Now the increase triggered by r1 is under test and we have to wait for it. - // However, we'll simulate a shutdown of r1 i.e. we're no longer tracking it. - // r1 @ 85s, 0s lag, !Up - tf.ratesHistory.add(sinceZero(80*time.Second), 200) - tf.ratesHistory.add(sinceZero(84*time.Second), 400) - rNotUp := lagRecord(sinceZero(85*time.Second), r1, 0) + // We'll simulate a shutdown of r1 i.e. we're no longer tracking it. + // r1 @ 115s, 0s lag, !Up + tf.ratesHistory.add(sinceZero(110*time.Second), 200) + tf.ratesHistory.add(sinceZero(114*time.Second), 400) + rNotUp := lagRecord(sinceZero(115*time.Second), r1, 0) rNotUp.Up = false tf.m.lagCache.add(rNotUp) - // r2 @ 90s, 0s lag (lastError no longer set) - tf.ratesHistory.add(sinceZero(89*time.Second), 400) - tf.process(lagRecord(sinceZero(90*time.Second), r2, 0)) - // The r1 lag update triggered an increase and did not wait for r2 - // because r2 has !Up set. - if err := tf.checkState(stateIncreaseRate, 800, sinceZero(90*time.Second)); err != nil { + // r2 @ 150s, 0s lag (lastError no longer set) + tf.ratesHistory.add(sinceZero(149*time.Second), 400) + tf.process(lagRecord(sinceZero(150*time.Second), r2, 0)) + // We ignore r1 as "replica under test" because it has !Up set. + // Instead, we act on r2. + // r2 becomes the "replica under test". + if err := tf.checkState(stateIncreaseRate, 800, sinceZero(150*time.Second)); err != nil { t.Fatal(err) } } @@ -231,6 +252,7 @@ func TestMaxReplicationLagModule_Reset_ReplicaUnderIncreaseTest(t *testing.T) { tf.ratesHistory.add(sinceZero(69*time.Second), 100) tf.process(lagRecord(sinceZero(70*time.Second), r2, 0)) // Rate was increased to 200 based on actual rate of 100 within [0s, 69s]. + // r2 becomes the "replica under test". if err := tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)); err != nil { t.Fatal(err) } @@ -246,12 +268,12 @@ func TestMaxReplicationLagModule_Reset_ReplicaUnderIncreaseTest(t *testing.T) { } // Now everything goes back to normal and the minimum time between increases - // (10s) has passed as well. r2 or r1 can start an increase now. - // r1 @ 80s, 0s lag (triggers the increase state) + // (40s) has passed as well. r2 or r1 can start an increase now. + // r1 @ 110s, 0s lag (triggers the increase state) tf.ratesHistory.add(sinceZero(75*time.Second), 200) - tf.ratesHistory.add(sinceZero(79*time.Second), 100) - tf.process(lagRecord(sinceZero(79*time.Second), r1, 0)) - if err := tf.checkState(stateIncreaseRate, 200, sinceZero(79*time.Second)); err != nil { + tf.ratesHistory.add(sinceZero(109*time.Second), 100) + tf.process(lagRecord(sinceZero(110*time.Second), r1, 0)) + if err := tf.checkState(stateIncreaseRate, 200, sinceZero(110*time.Second)); err != nil { t.Fatal(err) } } @@ -343,10 +365,10 @@ func TestMaxReplicationLagModule_Decrease(t *testing.T) { // 17s / 20s * 200 QPS actual rate => 170 QPS replica rate // // This results in a backlog of 3s * 200 QPS = 600 queries. - // Since this backlog is spread across MinDurationBetweenChangesSec (10s), - // the guessed rate gets further reduced by 60 QPS (600 queries / 10s). - // Hence, the rate is set to 110 QPS (170 - 60). - if err := tf.checkState(stateDecreaseAndGuessRate, 110, sinceZero(90*time.Second)); err != nil { + // Since this backlog is spread across SpreadBacklogAcrossSec (20s), + // the guessed rate gets further reduced by 30 QPS (600 queries / 20s). + // Hence, the rate is set to 140 QPS (170 - 30). + if err := tf.checkState(stateDecreaseAndGuessRate, 140, sinceZero(90*time.Second)); err != nil { t.Fatal(err) } } @@ -389,10 +411,10 @@ func TestMaxReplicationLagModule_Decrease_NoReplicaHistory(t *testing.T) { // 14s / 20s * 200 QPS actual rate => 140 QPS replica rate // // This results in a backlog of 6s * 200 QPS = 1200 queries. - // Since this backlog is spread across MinDurationBetweenChangesSec (10s), - // the guessed rate gets further reduced by 120 QPS (1200 queries / 10s). - // Hence, the rate is set to 20 QPS. - if err := tf.checkState(stateDecreaseAndGuessRate, 20, sinceZero(90*time.Second)); err != nil { + // Since this backlog is spread across SpreadBacklogAcrossSec (20s), + // the guessed rate gets further reduced by 60 QPS (1200 queries / 20s). + // Hence, the rate is set to 80 QPS (140 - 60). + if err := tf.checkState(stateDecreaseAndGuessRate, 80, sinceZero(90*time.Second)); err != nil { t.Fatal(err) } } @@ -487,12 +509,12 @@ func TestMaxReplicationLagModule_IgnoreNSlowestReplicas_IsIgnoredDuringIncrease( t.Fatal(err) } - // r1 @ 100s, 0s lag - tf.ratesHistory.add(sinceZero(99*time.Second), 200) - tf.process(lagRecord(sinceZero(100*time.Second), r1, 0)) + // r1 @ 110s, 0s lag + tf.ratesHistory.add(sinceZero(109*time.Second), 200) + tf.process(lagRecord(sinceZero(110*time.Second), r1, 0)) // Meanwhile, r1 is doing fine and will trigger the next increase because // we're no longer waiting for the ignored r2. - if err := tf.checkState(stateIncreaseRate, 400, sinceZero(100*time.Second)); err != nil { + if err := tf.checkState(stateIncreaseRate, 400, sinceZero(110*time.Second)); err != nil { t.Fatal(err) } } @@ -559,11 +581,11 @@ func TestApplyLatestConfig(t *testing.T) { // Let's assume that the current rate of 300 was actually fine. // After the reset, we'll increase it only by 100% to 600. - // r2 @ 90s, 0s lag + // r2 @ 110s, 0s lag tf.ratesHistory.add(sinceZero(80*time.Second), 300) - tf.ratesHistory.add(sinceZero(89*time.Second), 300) - tf.process(lagRecord(sinceZero(90*time.Second), r2, 0)) - if err := tf.checkState(stateIncreaseRate, 600, sinceZero(90*time.Second)); err != nil { + tf.ratesHistory.add(sinceZero(109*time.Second), 300) + tf.process(lagRecord(sinceZero(110*time.Second), r2, 0)) + if err := tf.checkState(stateIncreaseRate, 600, sinceZero(110*time.Second)); err != nil { t.Fatal(err) } } diff --git a/go/vt/throttler/throttlerclienttest/throttlerclient_testsuite.go b/go/vt/throttler/throttlerclienttest/throttlerclient_testsuite.go index d6a5ce2c35..28e819ce45 100644 --- a/go/vt/throttler/throttlerclienttest/throttlerclient_testsuite.go +++ b/go/vt/throttler/throttlerclienttest/throttlerclient_testsuite.go @@ -122,11 +122,13 @@ func (tf *testFixture) configuration(t *testing.T, client throttlerclient.Client InitialRate: 3, MaxIncrease: 0.4, EmergencyDecrease: 0.5, - MinDurationBetweenChangesSec: 6, + MinDurationBetweenIncreasesSec: 6, MaxDurationBetweenIncreasesSec: 7, - IgnoreNSlowestReplicas: 8, - AgeBadRateAfterSec: 9, - BadRateIncrease: 0.10, + MinDurationBetweenDecreasesSec: 8, + SpreadBacklogAcrossSec: 9, + IgnoreNSlowestReplicas: 10, + AgeBadRateAfterSec: 11, + BadRateIncrease: 0.12, } names, err := client.UpdateConfiguration(context.Background(), "t2", config /* false */, true /* copyZeroValues */) if err != nil { diff --git a/proto/throttlerdata.proto b/proto/throttlerdata.proto index 2458e249fa..16a470b1b9 100644 --- a/proto/throttlerdata.proto +++ b/proto/throttlerdata.proto @@ -40,8 +40,8 @@ message Configuration { // max_replication_lag_sec is meant as a last resort. // By default, the module tries to find out the system maximum capacity while // trying to keep the replication lag around "target_replication_lag_sec". - // Usually, we'll wait min_duration_between_changes_sec to see the effect of a - // throttler rate change on the replication lag. + // Usually, we'll wait min_duration_between_(increases|decreases)_sec to see + // the effect of a throttler rate change on the replication lag. // But if the lag goes above this field's value we will go into an "emergency" // state and throttle more aggressively (see "emergency_decrease" below). // This is the only way to ensure that the system will recover. @@ -65,20 +65,36 @@ message Configuration { // E.g. 0.50 decreases the current rate by 50%. double emergency_decrease = 5; - // min_duration_between_changes_sec specifies how long we'll wait for the last - // rate increase or decrease to have an effect on the system. - int64 min_duration_between_changes_sec = 6; + // min_duration_between_increases_sec specifies how long we'll wait at least + // for the last rate increase to have an effect on the system. + int64 min_duration_between_increases_sec = 6; // max_duration_between_increases_sec specifies how long we'll wait at most // for the last rate increase to have an effect on the system. int64 max_duration_between_increases_sec = 7; + // min_duration_between_decreases_sec specifies how long we'll wait at least + // for the last rate decrease to have an effect on the system. + int64 min_duration_between_decreases_sec = 8; + + // spread_backlog_across_sec is used when we set the throttler rate after + // we guessed the rate of a slave and determined its backlog. + // For example, at a guessed rate of 100 QPS and a lag of 10s, the replica has + // a backlog of 1000 queries. + // When we set the new, decreased throttler rate, we factor in how long it + // will take the slave to go through the backlog (in addition to new + // requests). This field specifies over which timespan we plan to spread this. + // For example, for a backlog of 1000 queries spread over 5s means that we + // have to further reduce the rate by 200 QPS or the backlog will not be + // processed within the 5 seconds. + int64 spread_backlog_across_sec = 9; + // ignore_n_slowest_replicas will ignore replication lag updates from the // N slowest replicas. Under certain circumstances, replicas are still // considered e.g. a) if the lag is at most max_replication_lag_sec, b) there // are less than N+1 replicas or c) the lag increased on each replica such // that all replicas were ignored in a row. - int32 ignore_n_slowest_replicas = 8; + int32 ignore_n_slowest_replicas = 10; // age_bad_rate_after_sec is the duration after which an unchanged bad rate // will "age out" and increase by "bad_rate_increase". @@ -87,11 +103,11 @@ message Configuration { // try known too high (bad) rates over and over again. // To avoid that temporary degradations permanently reduce the maximum rate, // a stable bad rate "ages out" after "age_bad_rate_after_sec". - int64 age_bad_rate_after_sec = 9; + int64 age_bad_rate_after_sec = 11; // bad_rate_increase defines the percentage by which a bad rate will be // increased when it's aging out. - double bad_rate_increase = 10; + double bad_rate_increase = 12; } // GetConfigurationRequest is the payload for the GetConfiguration RPC. diff --git a/py/vtproto/throttlerdata_pb2.py b/py/vtproto/throttlerdata_pb2.py index accb9111dc..a40a767bb3 100644 --- a/py/vtproto/throttlerdata_pb2.py +++ b/py/vtproto/throttlerdata_pb2.py @@ -19,7 +19,7 @@ DESCRIPTOR = _descriptor.FileDescriptor( name='throttlerdata.proto', package='throttlerdata', syntax='proto3', - serialized_pb=_b('\n\x13throttlerdata.proto\x12\rthrottlerdata\"\x11\n\x0fMaxRatesRequest\"{\n\x10MaxRatesResponse\x12\x39\n\x05rates\x18\x01 \x03(\x0b\x32*.throttlerdata.MaxRatesResponse.RatesEntry\x1a,\n\nRatesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x03:\x02\x38\x01\"!\n\x11SetMaxRateRequest\x12\x0c\n\x04rate\x18\x01 \x01(\x03\"#\n\x12SetMaxRateResponse\x12\r\n\x05names\x18\x01 \x03(\t\"\xd0\x02\n\rConfiguration\x12\"\n\x1atarget_replication_lag_sec\x18\x01 \x01(\x03\x12\x1f\n\x17max_replication_lag_sec\x18\x02 \x01(\x03\x12\x14\n\x0cinitial_rate\x18\x03 \x01(\x03\x12\x14\n\x0cmax_increase\x18\x04 \x01(\x01\x12\x1a\n\x12\x65mergency_decrease\x18\x05 \x01(\x01\x12(\n min_duration_between_changes_sec\x18\x06 \x01(\x03\x12*\n\"max_duration_between_increases_sec\x18\x07 \x01(\x03\x12!\n\x19ignore_n_slowest_replicas\x18\x08 \x01(\x05\x12\x1e\n\x16\x61ge_bad_rate_after_sec\x18\t \x01(\x03\x12\x19\n\x11\x62\x61\x64_rate_increase\x18\n \x01(\x01\"1\n\x17GetConfigurationRequest\x12\x16\n\x0ethrottler_name\x18\x01 \x01(\t\"\xc4\x01\n\x18GetConfigurationResponse\x12S\n\x0e\x63onfigurations\x18\x01 \x03(\x0b\x32;.throttlerdata.GetConfigurationResponse.ConfigurationsEntry\x1aS\n\x13\x43onfigurationsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12+\n\x05value\x18\x02 \x01(\x0b\x32\x1c.throttlerdata.Configuration:\x02\x38\x01\"\x83\x01\n\x1aUpdateConfigurationRequest\x12\x16\n\x0ethrottler_name\x18\x01 \x01(\t\x12\x33\n\rconfiguration\x18\x02 \x01(\x0b\x32\x1c.throttlerdata.Configuration\x12\x18\n\x10\x63opy_zero_values\x18\x03 \x01(\x08\",\n\x1bUpdateConfigurationResponse\x12\r\n\x05names\x18\x01 \x03(\t\"3\n\x19ResetConfigurationRequest\x12\x16\n\x0ethrottler_name\x18\x01 \x01(\t\"+\n\x1aResetConfigurationResponse\x12\r\n\x05names\x18\x01 \x03(\tb\x06proto3') + serialized_pb=_b('\n\x13throttlerdata.proto\x12\rthrottlerdata\"\x11\n\x0fMaxRatesRequest\"{\n\x10MaxRatesResponse\x12\x39\n\x05rates\x18\x01 \x03(\x0b\x32*.throttlerdata.MaxRatesResponse.RatesEntry\x1a,\n\nRatesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x03:\x02\x38\x01\"!\n\x11SetMaxRateRequest\x12\x0c\n\x04rate\x18\x01 \x01(\x03\"#\n\x12SetMaxRateResponse\x12\r\n\x05names\x18\x01 \x03(\t\"\xa1\x03\n\rConfiguration\x12\"\n\x1atarget_replication_lag_sec\x18\x01 \x01(\x03\x12\x1f\n\x17max_replication_lag_sec\x18\x02 \x01(\x03\x12\x14\n\x0cinitial_rate\x18\x03 \x01(\x03\x12\x14\n\x0cmax_increase\x18\x04 \x01(\x01\x12\x1a\n\x12\x65mergency_decrease\x18\x05 \x01(\x01\x12*\n\"min_duration_between_increases_sec\x18\x06 \x01(\x03\x12*\n\"max_duration_between_increases_sec\x18\x07 \x01(\x03\x12*\n\"min_duration_between_decreases_sec\x18\x08 \x01(\x03\x12!\n\x19spread_backlog_across_sec\x18\t \x01(\x03\x12!\n\x19ignore_n_slowest_replicas\x18\n \x01(\x05\x12\x1e\n\x16\x61ge_bad_rate_after_sec\x18\x0b \x01(\x03\x12\x19\n\x11\x62\x61\x64_rate_increase\x18\x0c \x01(\x01\"1\n\x17GetConfigurationRequest\x12\x16\n\x0ethrottler_name\x18\x01 \x01(\t\"\xc4\x01\n\x18GetConfigurationResponse\x12S\n\x0e\x63onfigurations\x18\x01 \x03(\x0b\x32;.throttlerdata.GetConfigurationResponse.ConfigurationsEntry\x1aS\n\x13\x43onfigurationsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12+\n\x05value\x18\x02 \x01(\x0b\x32\x1c.throttlerdata.Configuration:\x02\x38\x01\"\x83\x01\n\x1aUpdateConfigurationRequest\x12\x16\n\x0ethrottler_name\x18\x01 \x01(\t\x12\x33\n\rconfiguration\x18\x02 \x01(\x0b\x32\x1c.throttlerdata.Configuration\x12\x18\n\x10\x63opy_zero_values\x18\x03 \x01(\x08\",\n\x1bUpdateConfigurationResponse\x12\r\n\x05names\x18\x01 \x03(\t\"3\n\x19ResetConfigurationRequest\x12\x16\n\x0ethrottler_name\x18\x01 \x01(\t\"+\n\x1aResetConfigurationResponse\x12\r\n\x05names\x18\x01 \x03(\tb\x06proto3') ) _sym_db.RegisterFileDescriptor(DESCRIPTOR) @@ -223,7 +223,7 @@ _CONFIGURATION = _descriptor.Descriptor( is_extension=False, extension_scope=None, options=None), _descriptor.FieldDescriptor( - name='min_duration_between_changes_sec', full_name='throttlerdata.Configuration.min_duration_between_changes_sec', index=5, + name='min_duration_between_increases_sec', full_name='throttlerdata.Configuration.min_duration_between_increases_sec', index=5, number=6, type=3, cpp_type=2, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, @@ -237,22 +237,36 @@ _CONFIGURATION = _descriptor.Descriptor( is_extension=False, extension_scope=None, options=None), _descriptor.FieldDescriptor( - name='ignore_n_slowest_replicas', full_name='throttlerdata.Configuration.ignore_n_slowest_replicas', index=7, - number=8, type=5, cpp_type=1, label=1, + name='min_duration_between_decreases_sec', full_name='throttlerdata.Configuration.min_duration_between_decreases_sec', index=7, + number=8, type=3, cpp_type=2, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None), _descriptor.FieldDescriptor( - name='age_bad_rate_after_sec', full_name='throttlerdata.Configuration.age_bad_rate_after_sec', index=8, + name='spread_backlog_across_sec', full_name='throttlerdata.Configuration.spread_backlog_across_sec', index=8, number=9, type=3, cpp_type=2, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None), _descriptor.FieldDescriptor( - name='bad_rate_increase', full_name='throttlerdata.Configuration.bad_rate_increase', index=9, - number=10, type=1, cpp_type=5, label=1, + name='ignore_n_slowest_replicas', full_name='throttlerdata.Configuration.ignore_n_slowest_replicas', index=9, + number=10, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='age_bad_rate_after_sec', full_name='throttlerdata.Configuration.age_bad_rate_after_sec', index=10, + number=11, type=3, cpp_type=2, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='bad_rate_increase', full_name='throttlerdata.Configuration.bad_rate_increase', index=11, + number=12, type=1, cpp_type=5, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, @@ -270,7 +284,7 @@ _CONFIGURATION = _descriptor.Descriptor( oneofs=[ ], serialized_start=255, - serialized_end=591, + serialized_end=672, ) @@ -300,8 +314,8 @@ _GETCONFIGURATIONREQUEST = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=593, - serialized_end=642, + serialized_start=674, + serialized_end=723, ) @@ -338,8 +352,8 @@ _GETCONFIGURATIONRESPONSE_CONFIGURATIONSENTRY = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=758, - serialized_end=841, + serialized_start=839, + serialized_end=922, ) _GETCONFIGURATIONRESPONSE = _descriptor.Descriptor( @@ -368,8 +382,8 @@ _GETCONFIGURATIONRESPONSE = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=645, - serialized_end=841, + serialized_start=726, + serialized_end=922, ) @@ -413,8 +427,8 @@ _UPDATECONFIGURATIONREQUEST = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=844, - serialized_end=975, + serialized_start=925, + serialized_end=1056, ) @@ -444,8 +458,8 @@ _UPDATECONFIGURATIONRESPONSE = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=977, - serialized_end=1021, + serialized_start=1058, + serialized_end=1102, ) @@ -475,8 +489,8 @@ _RESETCONFIGURATIONREQUEST = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=1023, - serialized_end=1074, + serialized_start=1104, + serialized_end=1155, ) @@ -506,8 +520,8 @@ _RESETCONFIGURATIONRESPONSE = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=1076, - serialized_end=1119, + serialized_start=1157, + serialized_end=1200, ) _MAXRATESRESPONSE_RATESENTRY.containing_type = _MAXRATESRESPONSE diff --git a/test/base_sharding.py b/test/base_sharding.py index c510ef74ca..2a8ac1ca4c 100644 --- a/test/base_sharding.py +++ b/test/base_sharding.py @@ -313,11 +313,13 @@ class BaseShardingTest(object): 'initial_rate:3 ' 'max_increase:0.4 ' 'emergency_decrease:0.5 ' - 'min_duration_between_changes_sec:6 ' + 'min_duration_between_increases_sec:6 ' 'max_duration_between_increases_sec:7 ' + 'min_duration_between_decreases_sec:8 ' + 'spread_backlog_across_sec:9 ' 'ignore_n_slowest_replicas:0 ' - 'age_bad_rate_after_sec:9 ' - 'bad_rate_increase:0.10 '], + 'age_bad_rate_after_sec:11 ' + 'bad_rate_increase:0.12 '], auto_log=True, trap_output=True) self.assertIn('%d active throttler(s)' % len(names), stdout) # Check the updated configuration.