From 8f0d60ef10917814b8207991cf038b048659931c Mon Sep 17 00:00:00 2001 From: Connor Peet Date: Sat, 21 Oct 2017 09:56:26 -0700 Subject: [PATCH] fix(watch): watchers sometimes replaying old data Etcd handles this logic for us automatically, and it was causing issues as the header referred to the key, not cluster, revision. --- src/namespace.ts | 2 +- src/rpc.ts | 50 +++++++++++++++++++++---------------- src/watch.ts | 62 ++++------------------------------------------ test/watch.test.ts | 14 ++--------- 4 files changed, 36 insertions(+), 92 deletions(-) diff --git a/src/namespace.ts b/src/namespace.ts index bfaebfe..d36e625 100644 --- a/src/namespace.ts +++ b/src/namespace.ts @@ -33,7 +33,7 @@ export class Namespace { public readonly leaseClient = new RPC.LeaseClient(this.pool); public readonly watchClient = new RPC.WatchClient(this.pool); private readonly nsApplicator = new NSApplicator(this.prefix); - private readonly watchManager = new WatchManager(this.watchClient, this.kv); + private readonly watchManager = new WatchManager(this.watchClient); constructor(protected readonly prefix: Buffer, protected readonly pool: ConnectionPool) {} diff --git a/src/rpc.ts b/src/rpc.ts index f4ce5c9..359438c 100644 --- a/src/rpc.ts +++ b/src/rpc.ts @@ -5,7 +5,9 @@ import * as grpc from 'grpc'; export interface ICallable { exec(service: keyof typeof Services, method: string, params: object): Promise; - getConnection(service: keyof typeof Services): Promise<{ client: grpc.Client }>; + getConnection( + service: keyof typeof Services, + ): Promise<{ client: grpc.Client; metadata: grpc.Metadata }>; } export interface IResponseStream { @@ -74,7 +76,9 @@ export class WatchClient { * last compaction revision. */ public watch(): Promise> { - return this.client.getConnection('Watch').then(cnx => (cnx.client).watch()); + return this.client + .getConnection('Watch') + .then(({ client, metadata }) => (client).watch(metadata)); } } @@ -99,7 +103,9 @@ export class LeaseClient { * to the server and streaming keep alive responses from the server to the client. */ public leaseKeepAlive(): Promise> { - return this.client.getConnection('Lease').then(cnx => (cnx.client).leaseKeepAlive()); + return this.client + .getConnection('Lease') + .then(({ client, metadata }) => (client).leaseKeepAlive(metadata)); } /** * LeaseTimeToLive retrieves lease information. @@ -1084,25 +1090,6 @@ export interface IAuthRoleGrantPermissionResponse { export interface IAuthRoleRevokePermissionResponse { header: IResponseHeader; } -export interface IUser { - name?: Buffer; - password?: Buffer; - roles?: string[]; -} -export enum Permission { - Read = 0, - Write = 1, - Readwrite = 2, -} -export interface IPermission { - permType: keyof typeof Permission; - key: Buffer; - range_end: Buffer; -} -export interface IRole { - name?: Buffer; - keyPermission?: IPermission[]; -} export interface IKeyValue { /** * key is the first key for the range. If range_end is not given, the request only looks up key. @@ -1141,6 +1128,25 @@ export interface IEvent { */ prev_kv: IKeyValue; } +export interface IUser { + name?: Buffer; + password?: Buffer; + roles?: string[]; +} +export enum Permission { + Read = 0, + Write = 1, + Readwrite = 2, +} +export interface IPermission { + permType: keyof typeof Permission; + key: Buffer; + range_end: Buffer; +} +export interface IRole { + name?: Buffer; + keyPermission?: IPermission[]; +} export const Services = { KV: KVClient, Watch: WatchClient, diff --git a/src/watch.ts b/src/watch.ts index 8fe3d42..e38ff59 100644 --- a/src/watch.ts +++ b/src/watch.ts @@ -1,12 +1,7 @@ import BigNumber from 'bignumber.js'; import { EventEmitter } from 'events'; -import { - castGrpcErrorMessage, - ClientRuntimeError, - EtcdError, - EtcdPermissionDeniedError, -} from './errors'; +import { castGrpcErrorMessage, ClientRuntimeError, EtcdError } from './errors'; import { Rangable, Range } from './range'; import * as RPC from './rpc'; import { NSApplicator, onceEvent, toBuffer } from './util'; @@ -33,12 +28,8 @@ const enum QueueState { class AttachQueue { private state = QueueState.Idle; private queue: Watcher[] = []; - private revision: number; - constructor( - private readonly stream: RPC.IDuplexStream, - private readonly kv: RPC.KVClient, - ) {} + constructor(private readonly stream: RPC.IDuplexStream) {} /** * Inserts a watcher to be attached to the stream. @@ -78,6 +69,7 @@ class AttachQueue { */ public destroy() { this.state = QueueState.Destroyed; + this.queue = []; } /** @@ -93,54 +85,10 @@ class AttachQueue { } const watcher = this.queue[0]; - if (watcher.request.start_revision) { - if (!this.revision) { - return this.readRevision(watcher); - } - - watcher.request.start_revision = Math.min( - Number(watcher.request.start_revision), - this.revision, - ); - } - this.state = QueueState.Attaching; watcher.emit('connecting', watcher.request); this.stream.write({ create_request: watcher.request }); } - - /** - * Gets and updates the latest revision in etcd. This is necessary to recover - * watchers that have a later revision if etcd data is wiped. - */ - private readRevision(requester: Watcher) { - this.state = QueueState.ReadingRevision; - - this.kv - .range({ - key: this.queue[0].request.key, - keys_only: true, - }) - .then(res => { - this.revision = Number(res.header.revision); - this.readQueue(); - }) - .catch(err => { - // If we got an error here, one of two things happened: - // - the watch is on a key the user doesn't have access to, we should - // throw away the watcher and move to the next one - // - some other stream error occurred... try to bulldoze on but the - // stream is probably about to die (or it may have already died) - if (err instanceof EtcdPermissionDeniedError) { - requester.emit('error', err); - this.queue.shift(); - } else { - this.revision = 0; - } - - this.readQueue(); - }); - } } /** @@ -176,7 +124,7 @@ export class WatchManager { */ private queue: null | AttachQueue; - constructor(private readonly client: RPC.WatchClient, private readonly kv: RPC.KVClient) {} + constructor(private readonly client: RPC.WatchClient) {} /** * Attach registers the watcher on the connection. @@ -264,7 +212,7 @@ export class WatchManager { .watch() .then(stream => { this.state = State.Connected; - this.queue = new AttachQueue(stream, this.kv); + this.queue = new AttachQueue(stream); this.stream = stream .on('data', res => this.handleResponse(res)) .on('error', err => this.handleError(err)); diff --git a/test/watch.test.ts b/test/watch.test.ts index 3c22980..d2add87 100644 --- a/test/watch.test.ts +++ b/test/watch.test.ts @@ -29,15 +29,8 @@ describe('watch', () => { return Promise.all([ client.put(key).value('updated!'), onceEvent(watcher, 'put').then((res: IKeyValue) => { - try { - expect(res.key.toString()).to.equal(key); - expect(res.value.toString()).to.equal('updated!'); - } catch (e) { - // todo(connor4312): temp debug logic for an intermittent failure in this test - // tslint:disable-next-line - console.log(JSON.stringify([watcher.request, res])); - throw e; - } + expect(res.key.toString()).to.equal(key); + expect(res.value.toString()).to.equal('updated!'); }), ]).then(() => watcher); } @@ -72,9 +65,6 @@ describe('watch', () => { proxy.pause(); await onceEvent(watcher, 'disconnected'); proxy.resume(); - // todo(connor4312): temp debug logic for an intermittent failure in this test - // tslint:disable-next-line - console.log('reconnecting with', watcher.request); await onceEvent(watcher, 'connected'); await expectWatching(watcher, 'foo1');