Stickiness will be reimplemented as part of a balancer/resolver redesigning/extending.
This commit is contained in:
Menghan Li 2018-06-26 10:02:54 -07:00 коммит произвёл GitHub
Родитель 4f70de23e6
Коммит b39aa9e037
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
8 изменённых файлов: 3 добавлений и 839 удалений

Просмотреть файл

@ -1,46 +0,0 @@
# Stickiness
With load balancer, each RPC pick a different backend based on the load
balancing policy. Stickiness policies try to preserve peers for the duration of
a session, so that RPCs with the same stickiness key will be directed to the
same server.
Note that there's only "soft" stickiness now, which means RPCs with the same
stickienss key could still be sent to different servers. If stickiness is
critical for the system, server side application level handling is still
necessary.
## Stickiness Key
A stickiness key works as the session id. RPCs with the same stickiness key will
be assigned to the same backend.
Stickiness key is set as part of the custom metadata.
## Enable stickiness
Stickiness can be enabled by setting `stickinessKey` field in [service
config](https://github.com/grpc/grpc/blob/master/doc/service_config.md).
```json
{
"stickinessKey": "sessionid"
}
```
The value `sesseionid` will be used as the key of the metadata entry that
defines the stickiness key for each RPC.
## Send RPC with stickiness
To set the stickiness key for an RPC, set the corresponding metadata. The
following RPC will be sent with stickiness key `session1`.
```go
// "sessionid" is the metadata key specified by service config, "session1" is
// the stickiness key for this RPC.
md := metadata.Paris("sessionid", "session1")
ctx := metadata.NewOutgoingContext(context.Background(), md)
resp, err := client.SomeRPC(ctx, req)
```

Просмотреть файл

@ -830,8 +830,6 @@ func (cc *ClientConn) switchBalancer(name string) {
if cc.balancerWrapper != nil {
cc.balancerWrapper.close()
}
// Clear all stickiness state.
cc.blockingpicker.clearStickinessState()
builder := balancer.Get(name)
if builder == nil {
@ -1046,18 +1044,6 @@ func (cc *ClientConn) handleServiceConfig(js string) error {
cc.balancerWrapper.handleResolvedAddrs(cc.curAddresses, nil)
}
}
if envConfigStickinessOn {
var newStickinessMDKey string
if sc.stickinessMetadataKey != nil && *sc.stickinessMetadataKey != "" {
newStickinessMDKey = *sc.stickinessMetadataKey
}
// newStickinessMDKey is "" if one of the following happens:
// - stickinessMetadataKey is set to ""
// - stickinessMetadataKey field doesn't exist in service config
cc.blockingpicker.updateStickinessMDKey(strings.ToLower(newStickinessMDKey))
}
cc.mu.Unlock()
return nil
}
@ -1551,13 +1537,6 @@ func (ac *addrConn) getState() connectivity.State {
return ac.state
}
func (ac *addrConn) getCurAddr() (ret resolver.Address) {
ac.mu.Lock()
ret = ac.curAddr
ac.mu.Unlock()
return
}
func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric {
ac.mu.Lock()
addr := ac.curAddr.Addr

Просмотреть файл

@ -1,37 +0,0 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpc
import (
"os"
"strings"
)
const (
envConfigPrefix = "GRPC_GO_"
envConfigStickinessStr = envConfigPrefix + "STICKINESS"
)
var (
envConfigStickinessOn bool
)
func init() {
envConfigStickinessOn = strings.EqualFold(os.Getenv(envConfigStickinessStr), "on")
}

Просмотреть файл

@ -21,15 +21,12 @@ package grpc
import (
"io"
"sync"
"sync/atomic"
"golang.org/x/net/context"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/status"
"google.golang.org/grpc/transport"
)
@ -45,16 +42,10 @@ type pickerWrapper struct {
// The latest connection happened.
connErrMu sync.Mutex
connErr error
stickinessMDKey atomic.Value
stickiness *stickyStore
}
func newPickerWrapper() *pickerWrapper {
bp := &pickerWrapper{
blockingCh: make(chan struct{}),
stickiness: newStickyStore(),
}
bp := &pickerWrapper{blockingCh: make(chan struct{})}
return bp
}
@ -71,27 +62,6 @@ func (bp *pickerWrapper) connectionError() error {
return err
}
func (bp *pickerWrapper) updateStickinessMDKey(newKey string) {
// No need to check ok because mdKey == "" if ok == false.
if oldKey, _ := bp.stickinessMDKey.Load().(string); oldKey != newKey {
bp.stickinessMDKey.Store(newKey)
bp.stickiness.reset(newKey)
}
}
func (bp *pickerWrapper) getStickinessMDKey() string {
// No need to check ok because mdKey == "" if ok == false.
mdKey, _ := bp.stickinessMDKey.Load().(string)
return mdKey
}
func (bp *pickerWrapper) clearStickinessState() {
if oldKey := bp.getStickinessMDKey(); oldKey != "" {
// There's no need to reset store if mdKey was "".
bp.stickiness.reset(oldKey)
}
}
// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
func (bp *pickerWrapper) updatePicker(p balancer.Picker) {
bp.mu.Lock()
@ -131,27 +101,6 @@ func doneChannelzWrapper(acw *acBalancerWrapper, done func(balancer.DoneInfo)) f
// - the subConn returned by the current picker is not READY
// When one of these situations happens, pick blocks until the picker gets updated.
func (bp *pickerWrapper) pick(ctx context.Context, failfast bool, opts balancer.PickOptions) (transport.ClientTransport, func(balancer.DoneInfo), error) {
mdKey := bp.getStickinessMDKey()
stickyKey, isSticky := stickyKeyFromContext(ctx, mdKey)
// Potential race here: if stickinessMDKey is updated after the above two
// lines, and this pick is a sticky pick, the following put could add an
// entry to sticky store with an outdated sticky key.
//
// The solution: keep the current md key in sticky store, and at the
// beginning of each get/put, check the mdkey against store.curMDKey.
// - Cons: one more string comparing for each get/put.
// - Pros: the string matching happens inside get/put, so the overhead for
// non-sticky RPCs will be minimal.
if isSticky {
if t, ok := bp.stickiness.get(mdKey, stickyKey); ok {
// Done function returned is always nil.
return t, nil, nil
}
}
var (
p balancer.Picker
ch chan struct{}
@ -207,9 +156,6 @@ func (bp *pickerWrapper) pick(ctx context.Context, failfast bool, opts balancer.
continue
}
if t, ok := acw.getAddrConn().getReadyTransport(); ok {
if isSticky {
bp.stickiness.put(mdKey, stickyKey, acw)
}
if channelz.IsOn() {
return t, doneChannelzWrapper(acw, done), nil
}
@ -232,105 +178,3 @@ func (bp *pickerWrapper) close() {
bp.done = true
close(bp.blockingCh)
}
const stickinessKeyCountLimit = 1000
type stickyStoreEntry struct {
acw *acBalancerWrapper
addr resolver.Address
}
type stickyStore struct {
mu sync.Mutex
// curMDKey is check before every get/put to avoid races. The operation will
// abort immediately when the given mdKey is different from the curMDKey.
curMDKey string
store *linkedMap
}
func newStickyStore() *stickyStore {
return &stickyStore{
store: newLinkedMap(),
}
}
// reset clears the map in stickyStore, and set the currentMDKey to newMDKey.
func (ss *stickyStore) reset(newMDKey string) {
ss.mu.Lock()
ss.curMDKey = newMDKey
ss.store.clear()
ss.mu.Unlock()
}
// stickyKey is the key to look up in store. mdKey will be checked against
// curMDKey to avoid races.
func (ss *stickyStore) put(mdKey, stickyKey string, acw *acBalancerWrapper) {
ss.mu.Lock()
defer ss.mu.Unlock()
if mdKey != ss.curMDKey {
return
}
// TODO(stickiness): limit the total number of entries.
ss.store.put(stickyKey, &stickyStoreEntry{
acw: acw,
addr: acw.getAddrConn().getCurAddr(),
})
if ss.store.len() > stickinessKeyCountLimit {
ss.store.removeOldest()
}
}
// stickyKey is the key to look up in store. mdKey will be checked against
// curMDKey to avoid races.
func (ss *stickyStore) get(mdKey, stickyKey string) (transport.ClientTransport, bool) {
ss.mu.Lock()
defer ss.mu.Unlock()
if mdKey != ss.curMDKey {
return nil, false
}
entry, ok := ss.store.get(stickyKey)
if !ok {
return nil, false
}
ac := entry.acw.getAddrConn()
if ac.getCurAddr() != entry.addr {
ss.store.remove(stickyKey)
return nil, false
}
t, ok := ac.getReadyTransport()
if !ok {
ss.store.remove(stickyKey)
return nil, false
}
return t, true
}
// Get one value from metadata in ctx with key stickinessMDKey.
//
// It returns "", false if stickinessMDKey is an empty string.
func stickyKeyFromContext(ctx context.Context, stickinessMDKey string) (string, bool) {
if stickinessMDKey == "" {
return "", false
}
md, added, ok := metadata.FromOutgoingContextRaw(ctx)
if !ok {
return "", false
}
if vv, ok := md[stickinessMDKey]; ok {
if len(vv) > 0 {
return vv[0], true
}
}
for _, ss := range added {
for i := 0; i < len(ss)-1; i += 2 {
if ss[i] == stickinessMDKey {
return ss[i+1], true
}
}
}
return "", false
}

Просмотреть файл

@ -73,8 +73,6 @@ type ServiceConfig struct {
// If there's no exact match, look for the default config for the service (/service/) and use the corresponding MethodConfig if it exists.
// Otherwise, the method has no MethodConfig to use.
Methods map[string]MethodConfig
stickinessMetadataKey *string
}
func parseDuration(s *string) (*time.Duration, error) {
@ -148,9 +146,8 @@ type jsonMC struct {
// TODO(lyuxuan): delete this struct after cleaning up old service config implementation.
type jsonSC struct {
LoadBalancingPolicy *string
StickinessMetadataKey *string
MethodConfig *[]jsonMC
LoadBalancingPolicy *string
MethodConfig *[]jsonMC
}
func parseServiceConfig(js string) (ServiceConfig, error) {
@ -163,8 +160,6 @@ func parseServiceConfig(js string) (ServiceConfig, error) {
sc := ServiceConfig{
LB: rsc.LoadBalancingPolicy,
Methods: make(map[string]MethodConfig),
stickinessMetadataKey: rsc.StickinessMetadataKey,
}
if rsc.MethodConfig == nil {
return sc, nil

Просмотреть файл

@ -1,97 +0,0 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpc
import (
"container/list"
)
type linkedMapKVPair struct {
key string
value *stickyStoreEntry
}
// linkedMap is an implementation of a map that supports removing the oldest
// entry.
//
// linkedMap is NOT thread safe.
//
// It's for use of stickiness only!
type linkedMap struct {
m map[string]*list.Element
l *list.List // Head of the list is the oldest element.
}
// newLinkedMap returns a new LinkedMap.
func newLinkedMap() *linkedMap {
return &linkedMap{
m: make(map[string]*list.Element),
l: list.New(),
}
}
// put adds entry (key, value) to the map. Existing key will be overridden.
func (m *linkedMap) put(key string, value *stickyStoreEntry) {
if oldE, ok := m.m[key]; ok {
// Remove existing entry.
m.l.Remove(oldE)
}
e := m.l.PushBack(&linkedMapKVPair{key: key, value: value})
m.m[key] = e
}
// get returns the value of the given key.
func (m *linkedMap) get(key string) (*stickyStoreEntry, bool) {
e, ok := m.m[key]
if !ok {
return nil, false
}
m.l.MoveToBack(e)
return e.Value.(*linkedMapKVPair).value, true
}
// remove removes key from the map, and returns the value. The map is not
// modified if key is not in the map.
func (m *linkedMap) remove(key string) (*stickyStoreEntry, bool) {
e, ok := m.m[key]
if !ok {
return nil, false
}
delete(m.m, key)
m.l.Remove(e)
return e.Value.(*linkedMapKVPair).value, true
}
// len returns the len of the map.
func (m *linkedMap) len() int {
return len(m.m)
}
// clear removes all elements from the map.
func (m *linkedMap) clear() {
m.m = make(map[string]*list.Element)
m.l = list.New()
}
// removeOldest removes the oldest key from the map.
func (m *linkedMap) removeOldest() {
e := m.l.Front()
m.l.Remove(e)
delete(m.m, e.Value.(*linkedMapKVPair).key)
}

Просмотреть файл

@ -1,186 +0,0 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpc
import (
"container/list"
"fmt"
"reflect"
"testing"
)
var linkedMapTestData = make([]*stickyStoreEntry, 5)
func TestLinkedMapPutGet(t *testing.T) {
m := newLinkedMap()
m.put("one", linkedMapTestData[0])
if got, ok := m.get("one"); !ok || got != linkedMapTestData[0] {
t.Errorf("m.Get(%v) = %v, %v, want %v, true", 1, got, ok, "one")
}
m.put("two", linkedMapTestData[1])
if got, ok := m.get("two"); !ok || got != linkedMapTestData[1] {
t.Errorf("m.Get(%v) = %v, %v, want %v, true", 2, got, ok, "two")
}
m.put("oneone", linkedMapTestData[4])
if got, ok := m.get("one"); !ok || got != linkedMapTestData[4] {
t.Errorf("m.Get(%v) = %v, %v, want %v, true", 1, got, ok, "oneone")
}
}
func TestLinkedMapRemove(t *testing.T) {
m := newLinkedMap()
m.put("one", linkedMapTestData[0])
if got, ok := m.get("one"); !ok || got != linkedMapTestData[0] {
t.Errorf("m.Get(%v) = %v, %v, want %v, true", 1, got, ok, "one")
}
m.put("two", linkedMapTestData[1])
if got, ok := m.get("two"); !ok || got != linkedMapTestData[1] {
t.Errorf("m.Get(%v) = %v, %v, want %v, true", 2, got, ok, "two")
}
if got, ok := m.remove("one"); !ok || got != linkedMapTestData[0] {
t.Errorf("m.Remove(%v) = %v, %v, want %v, true", 1, got, ok, "one")
}
if got, ok := m.get("one"); ok {
t.Errorf("m.Get(%v) = %v, %v, want _, false", 1, got, ok)
}
// 2 should still in the map.
if got, ok := m.get("two"); !ok || got != linkedMapTestData[1] {
t.Errorf("m.Get(%v) = %v, %v, want %v, true", 2, got, ok, "two")
}
}
func TestLinkedMapLen(t *testing.T) {
m := newLinkedMap()
if got := m.len(); got != 0 {
t.Errorf("m.Len() = %v, want %v", got, 0)
}
m.put("one", linkedMapTestData[0])
if got := m.len(); got != 1 {
t.Errorf("m.Len() = %v, want %v", got, 1)
}
m.put("two", linkedMapTestData[1])
if got := m.len(); got != 2 {
t.Errorf("m.Len() = %v, want %v", got, 2)
}
m.put("one", linkedMapTestData[4])
if got := m.len(); got != 2 {
t.Errorf("m.Len() = %v, want %v", got, 2)
}
// Internal checks.
if got := len(m.m); got != 2 {
t.Errorf("len(m.m) = %v, want %v", got, 2)
}
if got := m.l.Len(); got != 2 {
t.Errorf("m.l.Len() = %v, want %v", got, 2)
}
}
func TestLinkedMapClear(t *testing.T) {
m := newLinkedMap()
m.put("one", linkedMapTestData[0])
if got, ok := m.get("one"); !ok || got != linkedMapTestData[0] {
t.Errorf("m.Get(%v) = %v, %v, want %v, true", 1, got, ok, "one")
}
m.put("two", linkedMapTestData[1])
if got, ok := m.get("two"); !ok || got != linkedMapTestData[1] {
t.Errorf("m.Get(%v) = %v, %v, want %v, true", 2, got, ok, "two")
}
m.clear()
if got, ok := m.get("one"); ok {
t.Errorf("m.Get(%v) = %v, %v, want _, false", 1, got, ok)
}
if got, ok := m.get("two"); ok {
t.Errorf("m.Get(%v) = %v, %v, want _, false", 2, got, ok)
}
if got := m.len(); got != 0 {
t.Errorf("m.Len() = %v, want %v", got, 0)
}
}
func TestLinkedMapRemoveOldest(t *testing.T) {
m := newLinkedMap()
m.put("one", linkedMapTestData[0])
m.put("two", linkedMapTestData[1])
m.put("three", linkedMapTestData[2])
m.put("four", linkedMapTestData[3])
if got, ok := m.get("one"); !ok || got != linkedMapTestData[0] {
t.Errorf("m.Get(%v) = %v, %v, want %v, true", 1, got, ok, "one")
}
if got, ok := m.get("two"); !ok || got != linkedMapTestData[1] {
t.Errorf("m.Get(%v) = %v, %v, want %v, true", 2, got, ok, "two")
}
if got, ok := m.get("three"); !ok || got != linkedMapTestData[2] {
t.Errorf("m.Get(%v) = %v, %v, want %v, true", 3, got, ok, "three")
}
if got, ok := m.get("four"); !ok || got != linkedMapTestData[3] {
t.Errorf("m.Get(%v) = %v, %v, want %v, true", 4, got, ok, "four")
}
if err := checkListOrdered(m.l, []string{"one", "two", "three", "four"}); err != nil {
t.Fatalf("m.l is not expected: %v", err)
}
m.put("three", linkedMapTestData[2])
if err := checkListOrdered(m.l, []string{"one", "two", "four", "three"}); err != nil {
t.Fatalf("m.l is not expected: %v", err)
}
m.put("four", linkedMapTestData[3])
if err := checkListOrdered(m.l, []string{"one", "two", "three", "four"}); err != nil {
t.Fatalf("m.l is not expected: %v", err)
}
m.removeOldest()
if got, ok := m.get("one"); ok {
t.Errorf("m.Get(%v) = %v, %v, want _, false", 1, got, ok)
}
if err := checkListOrdered(m.l, []string{"two", "three", "four"}); err != nil {
t.Fatalf("m.l is not expected: %v", err)
}
m.get("two") // 2 is refreshed, 3 becomes the oldest
if err := checkListOrdered(m.l, []string{"three", "four", "two"}); err != nil {
t.Fatalf("m.l is not expected: %v", err)
}
m.removeOldest()
if got, ok := m.get("three"); ok {
t.Errorf("m.Get(%v) = %v, %v, want _, false", 3, got, ok)
}
// 2 and 4 are still in map.
if got, ok := m.get("two"); !ok || got != linkedMapTestData[1] {
t.Errorf("m.Get(%v) = %v, %v, want %v, true", 2, got, ok, "two")
}
if got, ok := m.get("four"); !ok || got != linkedMapTestData[3] {
t.Errorf("m.Get(%v) = %v, %v, want %v, true", 4, got, ok, "four")
}
}
func checkListOrdered(l *list.List, want []string) error {
got := make([]string, 0, len(want))
for p := l.Front(); p != nil; p = p.Next() {
got = append(got, p.Value.(*linkedMapKVPair).key)
}
if !reflect.DeepEqual(got, want) {
return fmt.Errorf("list elements: %v, want %v", got, want)
}
return nil
}

Просмотреть файл

@ -1,288 +0,0 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpc
import (
"fmt"
"math"
"strings"
"testing"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/internal/leakcheck"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
)
func TestStickyKeyFromContext(t *testing.T) {
for _, test := range []struct {
org, add []string
mdKey string
wantStr string
wantBool bool
}{
{[]string{}, []string{}, "", "", false},
{[]string{"k1", "v1"}, []string{"k2", "v2"}, "k", "", false},
{[]string{"k", "v"}, []string{}, "k", "v", true},
{[]string{}, []string{"k", "v"}, "k", "v", true},
{[]string{"k1", "v1"}, []string{"k2", "v2"}, "k1", "v1", true},
{[]string{"k1", "v1"}, []string{"k2", "v2"}, "k2", "v2", true},
} {
ctx := context.Background()
if len(test.org) > 0 {
ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs(test.org...))
}
if len(test.add) > 0 {
ctx = metadata.AppendToOutgoingContext(ctx, test.add...)
}
got, ok := stickyKeyFromContext(ctx, test.mdKey)
if got != test.wantStr || ok != test.wantBool {
t.Errorf("test: %+v, got: %q, %v, want: %q, %v\n", test, got, ok, test.wantStr, test.wantBool)
}
}
}
func TestStickinessServiceConfig(t *testing.T) {
envConfigStickinessOn = true
defer func() { envConfigStickinessOn = false }()
defer leakcheck.Check(t)
r, rcleanup := manual.GenerateAndRegisterManualResolver()
defer rcleanup()
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure())
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
defer cc.Close()
const testInput = "testStickinessKey"
wantStr := strings.ToLower(testInput)
r.NewServiceConfig(fmt.Sprintf(`{"stickinessMetadataKey": "%v"}`, testInput)) // ToLower() will be applied to the input.
for i := 0; i < 1000; i++ {
if key := cc.blockingpicker.getStickinessMDKey(); key == wantStr {
return
}
time.Sleep(time.Millisecond)
}
t.Fatalf("cc.blockingpicker.stickiness.stickinessMDKey failed to change to %v within one second", wantStr)
}
func TestStickinessEnd2end(t *testing.T) {
envConfigStickinessOn = true
defer func() { envConfigStickinessOn = false }()
defer leakcheck.Check(t)
r, rcleanup := manual.GenerateAndRegisterManualResolver()
defer rcleanup()
numServers := 2
servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()
cc, err := Dial(r.Scheme()+":///test.server",
WithInsecure(), WithCodec(testCodec{}), WithBalancerName(roundrobin.Name))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
defer cc.Close()
req := "port"
var reply string
r.NewAddress([]resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}})
var (
i int
picked []int
)
// Check that each backend will be picked for at least 3 times.
picked = make([]int, 2, 2)
for i = 0; i < 1000; i++ {
if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil {
if errorDesc(err) == servers[0].port {
picked[0]++
} else if errorDesc(err) == servers[1].port {
picked[1]++
}
}
if picked[0] >= 3 && picked[1] >= 3 {
break
}
time.Sleep(time.Millisecond)
}
if i >= 1000 {
t.Fatalf("When doing roundrobin, addr1 was picked %v times, addr2 was picked %v times", picked[0], picked[1])
}
r.NewServiceConfig(fmt.Sprintf(`{"stickinessMetadataKey": "sessionid"}`))
// Should still be roundrobin.
picked = make([]int, 2, 2)
for i = 0; i < 1000; i++ {
if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil {
if errorDesc(err) == servers[0].port {
picked[0]++
} else if errorDesc(err) == servers[1].port {
picked[1]++
}
}
if picked[0] >= 3 && picked[1] >= 3 {
break
}
time.Sleep(time.Millisecond)
}
if i >= 1000 {
t.Fatalf("When doing roundrobin, addr1 was picked %v times, addr2 was picked %v times", picked[0], picked[1])
}
// Do sticky call, only one backend will be picked.
picked = make([]int, 2, 2)
for i = 0; i < 100; i++ {
ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("sessionid", "1"))
if err = Invoke(ctx, "/foo/bar", &req, &reply, cc); err != nil {
if errorDesc(err) == servers[0].port {
picked[0]++
} else if errorDesc(err) == servers[1].port {
picked[1]++
}
}
time.Sleep(time.Millisecond)
}
if (picked[0] != 0) == (picked[1] != 0) {
t.Fatalf("When doing sticky RPC, addr1 was picked %v times, addr2 was picked %v times, want at least one of them to be 0", picked[0], picked[1])
}
}
// Changing stickinessMDKey in service config will clear the sticky map.
func TestStickinessChangeMDKey(t *testing.T) {
envConfigStickinessOn = true
defer func() { envConfigStickinessOn = false }()
defer leakcheck.Check(t)
r, rcleanup := manual.GenerateAndRegisterManualResolver()
defer rcleanup()
numServers := 2
servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
defer cc.Close()
req := "port"
var reply string
r.NewAddress([]resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}})
r.NewServiceConfig(fmt.Sprintf(`{"stickinessMetadataKey": "sessionid"}`))
// Do sticky call, only one backend will be picked, and there will be one
// entry in stickiness map.
for i := 0; i < 100; i++ {
ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("sessionid", "1"))
Invoke(ctx, "/foo/bar", &req, &reply, cc)
time.Sleep(time.Millisecond)
}
cc.blockingpicker.stickiness.mu.Lock()
mapLen := cc.blockingpicker.stickiness.store.len()
cc.blockingpicker.stickiness.mu.Unlock()
if mapLen != 1 {
t.Fatalf("length of stickiness map is %v, want 1", mapLen)
}
r.NewServiceConfig(fmt.Sprintf(`{"stickinessMetadataKey": "sessionidnew"}`))
var i int
for i = 0; i < 1000; i++ {
cc.blockingpicker.stickiness.mu.Lock()
mapLen = cc.blockingpicker.stickiness.store.len()
cc.blockingpicker.stickiness.mu.Unlock()
if mapLen == 0 {
break
}
time.Sleep(time.Millisecond)
}
if i >= 1000 {
t.Fatalf("After 1 second, length of stickiness map is %v, want 0", mapLen)
}
}
// Switching balancer will clear the sticky map.
func TestStickinessSwitchingBalancer(t *testing.T) {
envConfigStickinessOn = true
defer func() { envConfigStickinessOn = false }()
defer leakcheck.Check(t)
r, rcleanup := manual.GenerateAndRegisterManualResolver()
defer rcleanup()
numServers := 2
servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
defer cc.Close()
req := "port"
var reply string
r.NewAddress([]resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}})
r.NewServiceConfig(fmt.Sprintf(`{"stickinessMetadataKey": "sessionid"}`))
// Do sticky call, only one backend will be picked, and there will be one
// entry in stickiness map.
for i := 0; i < 100; i++ {
ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("sessionid", "1"))
Invoke(ctx, "/foo/bar", &req, &reply, cc)
time.Sleep(time.Millisecond)
}
cc.blockingpicker.stickiness.mu.Lock()
mapLen := cc.blockingpicker.stickiness.store.len()
cc.blockingpicker.stickiness.mu.Unlock()
if mapLen != 1 {
t.Fatalf("length of stickiness map is %v, want 1", mapLen)
}
cc.mu.Lock()
cc.switchBalancer("round_robin")
cc.mu.Unlock()
var i int
for i = 0; i < 1000; i++ {
cc.blockingpicker.stickiness.mu.Lock()
mapLen = cc.blockingpicker.stickiness.store.len()
cc.blockingpicker.stickiness.mu.Unlock()
if mapLen == 0 {
break
}
time.Sleep(time.Millisecond)
}
if i >= 1000 {
t.Fatalf("After 1 second, length of stickiness map is %v, want 0", mapLen)
}
}