fix(watch): watchers sometimes replaying old data

Etcd handles this logic for us automatically, and it was causing issues as the
header referred to the key, not cluster, revision.
This commit is contained in:
Connor Peet 2017-10-21 09:56:26 -07:00
Родитель 834a3b50cc
Коммит 8f0d60ef10
4 изменённых файлов: 36 добавлений и 92 удалений

Просмотреть файл

@ -33,7 +33,7 @@ export class Namespace {
public readonly leaseClient = new RPC.LeaseClient(this.pool);
public readonly watchClient = new RPC.WatchClient(this.pool);
private readonly nsApplicator = new NSApplicator(this.prefix);
private readonly watchManager = new WatchManager(this.watchClient, this.kv);
private readonly watchManager = new WatchManager(this.watchClient);
constructor(protected readonly prefix: Buffer, protected readonly pool: ConnectionPool) {}

Просмотреть файл

@ -5,7 +5,9 @@ import * as grpc from 'grpc';
export interface ICallable {
exec(service: keyof typeof Services, method: string, params: object): Promise<any>;
getConnection(service: keyof typeof Services): Promise<{ client: grpc.Client }>;
getConnection(
service: keyof typeof Services,
): Promise<{ client: grpc.Client; metadata: grpc.Metadata }>;
}
export interface IResponseStream<T> {
@ -74,7 +76,9 @@ export class WatchClient {
* last compaction revision.
*/
public watch(): Promise<IDuplexStream<IWatchRequest, IWatchResponse>> {
return this.client.getConnection('Watch').then(cnx => (<any>cnx.client).watch());
return this.client
.getConnection('Watch')
.then(({ client, metadata }) => (<any>client).watch(metadata));
}
}
@ -99,7 +103,9 @@ export class LeaseClient {
* to the server and streaming keep alive responses from the server to the client.
*/
public leaseKeepAlive(): Promise<IDuplexStream<ILeaseKeepAliveRequest, ILeaseKeepAliveResponse>> {
return this.client.getConnection('Lease').then(cnx => (<any>cnx.client).leaseKeepAlive());
return this.client
.getConnection('Lease')
.then(({ client, metadata }) => (<any>client).leaseKeepAlive(metadata));
}
/**
* LeaseTimeToLive retrieves lease information.
@ -1084,25 +1090,6 @@ export interface IAuthRoleGrantPermissionResponse {
export interface IAuthRoleRevokePermissionResponse {
header: IResponseHeader;
}
export interface IUser {
name?: Buffer;
password?: Buffer;
roles?: string[];
}
export enum Permission {
Read = 0,
Write = 1,
Readwrite = 2,
}
export interface IPermission {
permType: keyof typeof Permission;
key: Buffer;
range_end: Buffer;
}
export interface IRole {
name?: Buffer;
keyPermission?: IPermission[];
}
export interface IKeyValue {
/**
* key is the first key for the range. If range_end is not given, the request only looks up key.
@ -1141,6 +1128,25 @@ export interface IEvent {
*/
prev_kv: IKeyValue;
}
export interface IUser {
name?: Buffer;
password?: Buffer;
roles?: string[];
}
export enum Permission {
Read = 0,
Write = 1,
Readwrite = 2,
}
export interface IPermission {
permType: keyof typeof Permission;
key: Buffer;
range_end: Buffer;
}
export interface IRole {
name?: Buffer;
keyPermission?: IPermission[];
}
export const Services = {
KV: KVClient,
Watch: WatchClient,

Просмотреть файл

@ -1,12 +1,7 @@
import BigNumber from 'bignumber.js';
import { EventEmitter } from 'events';
import {
castGrpcErrorMessage,
ClientRuntimeError,
EtcdError,
EtcdPermissionDeniedError,
} from './errors';
import { castGrpcErrorMessage, ClientRuntimeError, EtcdError } from './errors';
import { Rangable, Range } from './range';
import * as RPC from './rpc';
import { NSApplicator, onceEvent, toBuffer } from './util';
@ -33,12 +28,8 @@ const enum QueueState {
class AttachQueue {
private state = QueueState.Idle;
private queue: Watcher[] = [];
private revision: number;
constructor(
private readonly stream: RPC.IDuplexStream<RPC.IWatchRequest, RPC.IWatchResponse>,
private readonly kv: RPC.KVClient,
) {}
constructor(private readonly stream: RPC.IDuplexStream<RPC.IWatchRequest, RPC.IWatchResponse>) {}
/**
* Inserts a watcher to be attached to the stream.
@ -78,6 +69,7 @@ class AttachQueue {
*/
public destroy() {
this.state = QueueState.Destroyed;
this.queue = [];
}
/**
@ -93,54 +85,10 @@ class AttachQueue {
}
const watcher = this.queue[0];
if (watcher.request.start_revision) {
if (!this.revision) {
return this.readRevision(watcher);
}
watcher.request.start_revision = Math.min(
Number(watcher.request.start_revision),
this.revision,
);
}
this.state = QueueState.Attaching;
watcher.emit('connecting', watcher.request);
this.stream.write({ create_request: watcher.request });
}
/**
* Gets and updates the latest revision in etcd. This is necessary to recover
* watchers that have a later revision if etcd data is wiped.
*/
private readRevision(requester: Watcher) {
this.state = QueueState.ReadingRevision;
this.kv
.range({
key: this.queue[0].request.key,
keys_only: true,
})
.then(res => {
this.revision = Number(res.header.revision);
this.readQueue();
})
.catch(err => {
// If we got an error here, one of two things happened:
// - the watch is on a key the user doesn't have access to, we should
// throw away the watcher and move to the next one
// - some other stream error occurred... try to bulldoze on but the
// stream is probably about to die (or it may have already died)
if (err instanceof EtcdPermissionDeniedError) {
requester.emit('error', err);
this.queue.shift();
} else {
this.revision = 0;
}
this.readQueue();
});
}
}
/**
@ -176,7 +124,7 @@ export class WatchManager {
*/
private queue: null | AttachQueue;
constructor(private readonly client: RPC.WatchClient, private readonly kv: RPC.KVClient) {}
constructor(private readonly client: RPC.WatchClient) {}
/**
* Attach registers the watcher on the connection.
@ -264,7 +212,7 @@ export class WatchManager {
.watch()
.then(stream => {
this.state = State.Connected;
this.queue = new AttachQueue(stream, this.kv);
this.queue = new AttachQueue(stream);
this.stream = stream
.on('data', res => this.handleResponse(res))
.on('error', err => this.handleError(err));

Просмотреть файл

@ -29,15 +29,8 @@ describe('watch', () => {
return Promise.all([
client.put(key).value('updated!'),
onceEvent(watcher, 'put').then((res: IKeyValue) => {
try {
expect(res.key.toString()).to.equal(key);
expect(res.value.toString()).to.equal('updated!');
} catch (e) {
// todo(connor4312): temp debug logic for an intermittent failure in this test
// tslint:disable-next-line
console.log(JSON.stringify([watcher.request, res]));
throw e;
}
expect(res.key.toString()).to.equal(key);
expect(res.value.toString()).to.equal('updated!');
}),
]).then(() => watcher);
}
@ -72,9 +65,6 @@ describe('watch', () => {
proxy.pause();
await onceEvent(watcher, 'disconnected');
proxy.resume();
// todo(connor4312): temp debug logic for an intermittent failure in this test
// tslint:disable-next-line
console.log('reconnecting with', watcher.request);
await onceEvent(watcher, 'connected');
await expectWatching(watcher, 'foo1');