diff --git a/go/vt/tabletserver/cache_pool_test.go b/go/vt/tabletserver/cache_pool_test.go index 218389ab4e..b5e10ca2ca 100644 --- a/go/vt/tabletserver/cache_pool_test.go +++ b/go/vt/tabletserver/cache_pool_test.go @@ -6,9 +6,11 @@ package tabletserver import ( "fmt" + "io/ioutil" "math/rand" "net/http" "net/http/httptest" + "regexp" "testing" "time" @@ -188,7 +190,7 @@ func TestCachePoolGetFailedBecauseCachePoolIsClosed(t *testing.T) { } func TestCachePoolStatsURL(t *testing.T) { - fakecacheservice.Register() + cache := fakecacheservice.Register() fakesqldb.Register() rowCacheConfig := RowCacheConfig{ Binary: "ls", @@ -198,15 +200,66 @@ func TestCachePoolStatsURL(t *testing.T) { idleTimeout := 1 * time.Second cachePool.idleTimeout = idleTimeout cachePool.Open() - defer cachePool.Close() - request, _ := http.NewRequest("GET", cachePool.statsURL, nil) + request, _ := http.NewRequest("GET", fmt.Sprintf("%sstats", cachePool.statsURL), nil) response := httptest.NewRecorder() cachePool.ServeHTTP(response, request) + // any memcache calls should fail + cache.EnableCacheServiceError() + response = httptest.NewRecorder() + cachePool.ServeHTTP(response, request) + cache.DisableCacheServiceError() + cachePool.Close() + response = httptest.NewRecorder() + cachePool.ServeHTTP(response, request) + body, _ := ioutil.ReadAll(response.Body) + matcher := regexp.MustCompile("closed") + if !matcher.Match(body) { + t.Fatalf("stats page should contain 'closed', but got %s", string(body)) + } +} + +func TestCachePoolMemcacheStatsFail(t *testing.T) { + cache := fakecacheservice.Register() + fakesqldb.Register() + rowCacheConfig := RowCacheConfig{ + Binary: "ls", + Connections: 100, + } + cachePool := newTestCachePool(rowCacheConfig, true) + idleTimeout := 1 * time.Second + cachePool.idleTimeout = idleTimeout + cachePool.Open() + defer cachePool.Close() + memcacheStatsBefore := internalErrors.Counts()["MemcacheStats"] + // any memcache calls should fail + cache.EnableCacheServiceError() + cachePool.memcacheStats.update() + memcacheStatsAfter := internalErrors.Counts()["MemcacheStats"] + if memcacheStatsAfter <= memcacheStatsBefore { + t.Fatalf("memcache stats should cause an internal error") + } +} + +func TestCachePoolFailToStartBecauseCacheServiceWasDown(t *testing.T) { + cache := fakecacheservice.Register() + fakesqldb.Register() + testUtils := &testUtils{} + rowCacheConfig := RowCacheConfig{ + Binary: "ls", + Connections: 100, + } + cachePool := newTestCachePool(rowCacheConfig, false) + idleTimeout := 1 * time.Second + cachePool.idleTimeout = idleTimeout + // any memcache calls should fail + cache.EnableCacheServiceError() + defer testUtils.checkTabletErrorWithRecover(t, ErrFatal, "can't communicate with cache service") + cachePool.Open() } func newTestCachePool(rowcacheConfig RowCacheConfig, enablePublishStats bool) *CachePool { randID := rand.Int63() name := fmt.Sprintf("TestCachePool-%d-", randID) - statsURL := fmt.Sprintf("/debug/cache-%d", randID) + statsURL := fmt.Sprintf("/debug/cache-%d/", randID) return NewCachePool(name, rowcacheConfig, 1*time.Second, statsURL, enablePublishStats) } diff --git a/go/vt/tabletserver/fakecacheservice/fakecacheservice.go b/go/vt/tabletserver/fakecacheservice/fakecacheservice.go index c65b9e4cc3..efd57da75a 100644 --- a/go/vt/tabletserver/fakecacheservice/fakecacheservice.go +++ b/go/vt/tabletserver/fakecacheservice/fakecacheservice.go @@ -12,8 +12,11 @@ import ( "time" cs "github.com/youtube/vitess/go/cacheservice" + "github.com/youtube/vitess/go/sync2" ) +var errCacheService = "cacheservice error" + // FakeCacheService is a fake implementation of CacheService type FakeCacheService struct { cache *Cache @@ -21,8 +24,9 @@ type FakeCacheService struct { // Cache is a cache like data structure. type Cache struct { - data map[string]*cs.Result - mu sync.Mutex + mu sync.Mutex + data map[string]*cs.Result + enableCacheServiceError sync2.AtomicInt32 } // Set sets a key and associated value to the cache. @@ -61,6 +65,16 @@ func (cache *Cache) Clear() { cache.data = make(map[string]*cs.Result) } +// EnableCacheServiceError makes cache service return error. +func (cache *Cache) EnableCacheServiceError() { + cache.enableCacheServiceError.Set(1) +} + +// DisableCacheServiceError makes cache service back to normal. +func (cache *Cache) DisableCacheServiceError() { + cache.enableCacheServiceError.Set(0) +} + // NewFakeCacheService creates a FakeCacheService func NewFakeCacheService(cache *Cache) *FakeCacheService { return &FakeCacheService{ @@ -70,6 +84,9 @@ func NewFakeCacheService(cache *Cache) *FakeCacheService { // Get returns cached data for given keys. func (service *FakeCacheService) Get(keys ...string) ([]cs.Result, error) { + if service.cache.enableCacheServiceError.Get() == 1 { + return nil, fmt.Errorf(errCacheService) + } results := make([]cs.Result, 0, len(keys)) for _, key := range keys { if val, ok := service.cache.Get(key); ok { @@ -83,6 +100,9 @@ func (service *FakeCacheService) Get(keys ...string) ([]cs.Result, error) { // for using with CAS. Gets returns a CAS identifier with the item. If // the item's CAS value has changed since you Gets'ed it, it will not be stored. func (service *FakeCacheService) Gets(keys ...string) ([]cs.Result, error) { + if service.cache.enableCacheServiceError.Get() == 1 { + return nil, fmt.Errorf(errCacheService) + } results := make([]cs.Result, 0, len(keys)) for _, key := range keys { if val, ok := service.cache.Get(key); ok { @@ -96,6 +116,9 @@ func (service *FakeCacheService) Gets(keys ...string) ([]cs.Result, error) { // Set set the value with specified cache key. func (service *FakeCacheService) Set(key string, flags uint16, timeout uint64, value []byte) (bool, error) { + if service.cache.enableCacheServiceError.Get() == 1 { + return false, fmt.Errorf(errCacheService) + } service.cache.Set(key, &cs.Result{ Key: key, Value: value, @@ -107,6 +130,9 @@ func (service *FakeCacheService) Set(key string, flags uint16, timeout uint64, v // Add store the value only if it does not already exist. func (service *FakeCacheService) Add(key string, flags uint16, timeout uint64, value []byte) (bool, error) { + if service.cache.enableCacheServiceError.Get() == 1 { + return false, fmt.Errorf(errCacheService) + } if _, ok := service.cache.Get(key); ok { return false, nil } @@ -122,6 +148,9 @@ func (service *FakeCacheService) Add(key string, flags uint16, timeout uint64, v // Replace replaces the value, only if the value already exists, // for the specified cache key. func (service *FakeCacheService) Replace(key string, flags uint16, timeout uint64, value []byte) (bool, error) { + if service.cache.enableCacheServiceError.Get() == 1 { + return false, fmt.Errorf(errCacheService) + } result, ok := service.cache.Get(key) if !ok { return false, nil @@ -134,6 +163,9 @@ func (service *FakeCacheService) Replace(key string, flags uint16, timeout uint6 // Append appends the value after the last bytes in an existing item. func (service *FakeCacheService) Append(key string, flags uint16, timeout uint64, value []byte) (bool, error) { + if service.cache.enableCacheServiceError.Get() == 1 { + return false, fmt.Errorf(errCacheService) + } result, ok := service.cache.Get(key) if !ok { return false, nil @@ -146,6 +178,9 @@ func (service *FakeCacheService) Append(key string, flags uint16, timeout uint64 // Prepend prepends the value before existing value. func (service *FakeCacheService) Prepend(key string, flags uint16, timeout uint64, value []byte) (bool, error) { + if service.cache.enableCacheServiceError.Get() == 1 { + return false, fmt.Errorf(errCacheService) + } result, ok := service.cache.Get(key) if !ok { return false, nil @@ -158,6 +193,9 @@ func (service *FakeCacheService) Prepend(key string, flags uint16, timeout uint6 // Cas stores the value only if no one else has updated the data since you read it last. func (service *FakeCacheService) Cas(key string, flags uint16, timeout uint64, value []byte, cas uint64) (bool, error) { + if service.cache.enableCacheServiceError.Get() == 1 { + return false, fmt.Errorf(errCacheService) + } result, ok := service.cache.Get(key) if !ok || result.Cas != cas { return false, nil @@ -171,18 +209,27 @@ func (service *FakeCacheService) Cas(key string, flags uint16, timeout uint64, v // Delete delete the value for the specified cache key. func (service *FakeCacheService) Delete(key string) (bool, error) { + if service.cache.enableCacheServiceError.Get() == 1 { + return false, fmt.Errorf(errCacheService) + } service.cache.Delete(key) return true, nil } // FlushAll purges the entire cache. func (service *FakeCacheService) FlushAll() error { + if service.cache.enableCacheServiceError.Get() == 1 { + return fmt.Errorf(errCacheService) + } service.cache.Clear() return nil } // Stats returns a list of basic stats. func (service *FakeCacheService) Stats(key string) ([]byte, error) { + if service.cache.enableCacheServiceError.Get() == 1 { + return nil, fmt.Errorf(errCacheService) + } return []byte{}, nil } diff --git a/go/vt/tabletserver/fakecacheservice/fakecacheservice_test.go b/go/vt/tabletserver/fakecacheservice/fakecacheservice_test.go index 180ed674a2..9e813bc5d9 100644 --- a/go/vt/tabletserver/fakecacheservice/fakecacheservice_test.go +++ b/go/vt/tabletserver/fakecacheservice/fakecacheservice_test.go @@ -122,3 +122,50 @@ func TestFakeCacheService(t *testing.T) { service.Stats("") service.Close() } + +func TestFakeCacheServiceError(t *testing.T) { + service := NewFakeCacheService(&Cache{data: make(map[string]*cs.Result)}) + service.cache.EnableCacheServiceError() + key1 := "key1" + _, err := service.Set(key1, 0, 0, []byte("test")) + checkCacheServiceError(t, err) + _, err = service.Get(key1) + checkCacheServiceError(t, err) + _, err = service.Gets(key1) + checkCacheServiceError(t, err) + _, err = service.Cas(key1, 0, 0, []byte("test2"), 0) + checkCacheServiceError(t, err) + _, err = service.Add(key1, 0, 0, []byte("test3")) + checkCacheServiceError(t, err) + _, err = service.Replace("unknownKey", 0, 0, []byte("test4")) + checkCacheServiceError(t, err) + _, err = service.Append("unknownKey", 0, 0, []byte("test5")) + checkCacheServiceError(t, err) + _, err = service.Prepend("unknownKey", 0, 0, []byte("test5")) + checkCacheServiceError(t, err) + _, err = service.Prepend(key1, 0, 0, []byte("test5")) + checkCacheServiceError(t, err) + _, err = service.Delete(key1) + checkCacheServiceError(t, err) + err = service.FlushAll() + checkCacheServiceError(t, err) + _, err = service.Stats("") + checkCacheServiceError(t, err) + + service.cache.DisableCacheServiceError() + ok, err := service.Set(key1, 0, 0, []byte("test")) + if !ok || err != nil { + t.Fatalf("set should succeed") + } + results, err := service.Get(key1) + if !reflect.DeepEqual(results[0].Value, []byte("test")) { + t.Fatalf("expect to get value: test, but get: %s", string(results[0].Value)) + } + service.Close() +} + +func checkCacheServiceError(t *testing.T, err error) { + if err.Error() != errCacheService { + t.Fatalf("should get cacheservice error") + } +} diff --git a/go/vt/tabletserver/testutils_test.go b/go/vt/tabletserver/testutils_test.go index b24ffbde6e..bb60b4a442 100644 --- a/go/vt/tabletserver/testutils_test.go +++ b/go/vt/tabletserver/testutils_test.go @@ -9,6 +9,7 @@ import ( "html/template" "math/rand" "reflect" + "strings" "testing" "time" ) @@ -44,6 +45,43 @@ func (util *testUtils) checkEqual(t *testing.T, expected interface{}, result int } } +func (util *testUtils) checkTabletErrorWithRecover(t *testing.T, tabletErrType int, tabletErrStr string) { + err := recover() + if err == nil { + t.Fatalf("should get error") + } + util.checkTabletError(t, err, tabletErrType, tabletErrStr) +} + +func (util *testUtils) checkTabletError(t *testing.T, err interface{}, tabletErrType int, tabletErrStr string) { + tabletError, ok := err.(*TabletError) + if !ok { + t.Fatalf("should return a TabletError, but got err: %v", err) + } + if tabletError.ErrorType != tabletErrType { + t.Fatalf("should return a TabletError with error type: %s", util.getTabletErrorString(tabletErrType)) + } + if !strings.Contains(tabletError.Error(), tabletErrStr) { + t.Fatalf("expect the tablet error should contain string: '%s', but it does not. Got tablet error: '%s'", tabletErrStr, tabletError.Error()) + } +} + +func (util *testUtils) getTabletErrorString(tabletErrorType int) string { + switch tabletErrorType { + case ErrFail: + return "ErrFail" + case ErrRetry: + return "ErrRetry" + case ErrFatal: + return "ErrFatal" + case ErrTxPoolFull: + return "ErrTxPoolFull" + case ErrNotInTx: + return "ErrNotInTx" + } + return "" +} + func newTestSchemaInfo( queryCacheSize int, reloadTime time.Duration,