diff --git a/src/errors.ts b/src/errors.ts index 359e9d8..cf7e9dd 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -15,6 +15,7 @@ export class GRPCGenericError extends Error {} /** * GRPCConnectFailed is thrown when connecting to GRPC fails. + * @see https://github.com/grpc/grpc/blob/v1.4.x/src/node/src/constants.js#L151-L158 */ export class GRPCConnectFailedError extends GRPCGenericError {} @@ -26,6 +27,7 @@ export class GRPCProtocolError extends GRPCGenericError {} /** * GRPCInternalError is thrown when a internal error occurs on either end. + * @see https://github.com/grpc/grpc/blob/v1.4.x/src/node/src/constants.js#L145-L150 */ export class GRPCInternalError extends GRPCGenericError {} @@ -34,6 +36,61 @@ export class GRPCInternalError extends GRPCGenericError {} */ export class GRPCCancelledError extends GRPCGenericError {} +/** + * @see https://github.com/grpc/grpc/blob/v1.4.x/src/node/src/constants.js#L50-L57 + */ +export class GRPCUnknownError extends GRPCGenericError {} + +/** + * @see https://github.com/grpc/grpc/blob/v1.4.x/src/node/src/constants.js#L58-L64 + */ +export class GRPCInvalidArgumentError extends GRPCGenericError {} + +/** + * @see https://github.com/grpc/grpc/blob/v1.4.x/src/node/src/constants.js#L65-L72 + */ +export class GRPCDeadlineExceededError extends GRPCGenericError {} + +/** + * @see https://github.com/grpc/grpc/blob/v1.4.x/src/node/src/constants.js#L73-L74 + */ +export class GRPCNotFoundError extends GRPCGenericError {} + +/** + * @see https://github.com/grpc/grpc/blob/v1.4.x/src/node/src/constants.js#L75-L79 + */ +export class GRPCAlreadyExistsError extends GRPCGenericError {} + +/** + * @see https://github.com/grpc/grpc/blob/v1.4.x/src/node/src/constants.js#L89-L93 + */ +export class GRPCResourceExhastedError extends GRPCGenericError {} + +/** + * @see https://github.com/grpc/grpc/blob/v1.4.x/src/node/src/constants.js#L94-L116 + */ +export class GRPCFailedPreconditionError extends GRPCGenericError {} + +/** + * @see https://github.com/grpc/grpc/blob/v1.4.x/src/node/src/constants.js#L117-L124 + */ +export class GRPCAbortedError extends GRPCGenericError {} + +/** + * @see https://github.com/grpc/grpc/blob/v1.4.x/src/node/src/constants.js#L143-L144 + */ +export class GRPCNotImplementedError extends GRPCGenericError {} + +/** + * @see https://github.com/grpc/grpc/blob/v1.4.x/src/node/src/constants.js#L125-L142 + */ +export class GRPCOutOfRangeError extends GRPCGenericError {} + +/** + * @see https://github.com/grpc/grpc/blob/v1.4.x/src/node/src/constants.js#L159-L160 + */ +export class GRPCDataLossError extends GRPCGenericError {} + /** * EtcdError is an application error returned by etcd. */ @@ -85,12 +142,18 @@ export class EtcdLockFailedError extends Error {} /** * EtcdAuthenticationFailedError is thrown when an invalid username/password * combination is submitted. + * + * @see https://github.com/grpc/grpc/blob/v1.4.x/src/node/src/constants.js#L161-L165 */ export class EtcdAuthenticationFailedError extends Error {} /** * EtcdPermissionDeniedError is thrown when the user attempts to modify a key * that they don't have access to. + * + * Also can be emitted from GRPC. + * + * @see https://github.com/grpc/grpc/blob/v1.4.x/src/node/src/constants.js#L80-L88 */ export class EtcdPermissionDeniedError extends Error {} @@ -135,9 +198,9 @@ const grpcMessageToError = new Map([ ['etcdserver: permission denied', EtcdPermissionDeniedError], ]); -function getMatchingGrpcError(err: Error): IErrorCtor | null { +function getMatchingGrpcError(message: string): IErrorCtor | null { for (const [key, value] of grpcMessageToError) { - if (err.message.includes(key)) { + if (message.includes(key)) { return value; } } @@ -149,6 +212,14 @@ function rewriteErrorName(str: string, ctor: new (...args: any[]) => Error): str return str.replace(/^Error:/, `${ctor.name}:`); } +/** + * Tries to convert an Etcd error string to an etcd error. + */ +export function castGrpcErrorMessage(message: string): Error { + const ctor = getMatchingGrpcError(message) || EtcdError; + return new ctor(message); +} + /** * Tries to convert GRPC's generic, untyped errors to typed errors we can * consume. Yes, this method is abhorrent. @@ -158,7 +229,7 @@ export function castGrpcError(err: Error): Error { return err; // it looks like it's already some kind of typed error } - let ctor = getMatchingGrpcError(err); + let ctor = getMatchingGrpcError(err.message); if (!ctor) { ctor = err.message.includes('etcdserver:') ? EtcdError : GRPCGenericError; } diff --git a/src/index.ts b/src/index.ts index 5f7a4b5..ca432f1 100644 --- a/src/index.ts +++ b/src/index.ts @@ -12,6 +12,7 @@ export * from './namespace'; export * from './options'; export * from './range'; export * from './rpc'; +export { WatchBuilder, Watcher } from './watch'; /** * Etcd3 is a high-level interface for interacting and calling etcd endpoints. diff --git a/src/lease.ts b/src/lease.ts index f5434a1..975b3e9 100644 --- a/src/lease.ts +++ b/src/lease.ts @@ -243,8 +243,8 @@ export class Lease extends EventEmitter { // When the cluster goes down, we keep trying to reconnect. But if we're // far past the end of our key's TTL, there's no way we're going to be // able to renew it. Fire a "lost". - if (this.lastKeepAlive < Date.now() - 2 * 1000 * this.ttl) { - this.emit('list', new GRPCConnectFailedError('We lost connect to etcd and our lease has expired.')); + if (Date.now() - this.lastKeepAlive > 2 * 1000 * this.ttl) { + this.emit('lost', new GRPCConnectFailedError('We lost connection to etcd and our lease has expired.')); return this.close(); } @@ -253,40 +253,49 @@ export class Lease extends EventEmitter { return stream.end(); } - stream.on('data', res => { - if (leaseExpired(res)) { - const err = new EtcdLeaseInvalidError(res.ID); - this.emit('keepaliveFailed', err); - this.emit('lost', err); - return this.close(); - } - - this.lastKeepAlive = Date.now(); - this.emit('keepaliveSucceeded', res); - }); - - stream.on('error', err => { - this.emit('keepaliveFailed', castGrpcError(err)); - this.teardown(); - this.keepalive(); - }); - const keepaliveTimer = setInterval( - () => { - this.emit('keepaliveFired'); - this.grant() - .then(id => stream.write({ ID: id })) - .catch(() => this.close()); // will only throw if the initial grant failed - }, + () => this.fireKeepAlive(stream), 1000 * this.ttl / 3, ); this.teardown = () => { + this.teardown = () => undefined; clearInterval(keepaliveTimer); stream.end(); }; + stream + .on('error', err => this.handleKeepaliveError(err)) + .on('data', res => { + if (leaseExpired(res)) { + return this.handleKeepaliveError(new EtcdLeaseInvalidError(res.ID)); + } + + this.lastKeepAlive = Date.now(); + this.emit('keepaliveSucceeded', res); + }); + this.emit('keepaliveEstablished'); - }); + this.fireKeepAlive(stream); + }).catch(err => this.handleKeepaliveError(err)); + } + + private fireKeepAlive(stream: RPC.IRequestStream) { + this.emit('keepaliveFired'); + this.grant() + .then(id => stream.write({ ID: id })) + .catch(() => this.close()); // will only throw if the initial grant failed + } + + private handleKeepaliveError(err: Error) { + this.emit('keepaliveFailed', castGrpcError(err)); + this.teardown(); + + if (err instanceof EtcdLeaseInvalidError) { + this.emit('lost', err); + this.close(); + } else { + setTimeout(() => this.keepalive(), 100); + } } } diff --git a/src/namespace.ts b/src/namespace.ts index e9aa3e2..0420eeb 100644 --- a/src/namespace.ts +++ b/src/namespace.ts @@ -5,6 +5,7 @@ import { Lock } from './lock'; import { Rangable, Range } from './range'; import * as RPC from './rpc'; import { NSApplicator, toBuffer } from './util'; +import { WatchBuilder, WatchManager } from './watch'; /** * Namespace is the class on which CRUD operations can be invoked. The default @@ -30,13 +31,13 @@ export class Namespace { private readonly nsApplicator = new NSApplicator(this.prefix); public readonly kv = new RPC.KVClient(this.pool); public readonly leaseClient = new RPC.LeaseClient(this.pool); - public readonly watch = new RPC.WatchClient(this.pool); + public readonly watchClient = new RPC.WatchClient(this.pool); + private readonly watchManager = new WatchManager(this.watchClient); constructor( protected readonly prefix: Buffer, protected readonly pool: ConnectionPool, ) {} - /** * `.get()` starts a query to look up a single key from etcd. */ @@ -97,6 +98,14 @@ export class Namespace { .and(key, column, cmp, value); } + /** + * `.watch()` creates a new watch builder. See the documentation on the + * WatchBuilder for usage examples. + */ + public watch(): WatchBuilder { + return new WatchBuilder(this.watchManager, this.nsApplicator); + } + /** * Creates a structure representing an etcd range. Used in permission grants * and queries. This is a convenience method for `Etcd3.Range.from(...)`. diff --git a/src/util.ts b/src/util.ts index 4509677..80bb2f8 100644 --- a/src/util.ts +++ b/src/util.ts @@ -1,3 +1,5 @@ +import { EventEmitter } from 'events'; + import { ClientRuntimeError } from './errors'; export const zeroKey = Buffer.from([0]); @@ -150,6 +152,31 @@ export function forOwn(obj: T, iterator: (value: T[K], key } } +/** + * onceEvent returns a promise that resolves once any of the listed events + * fire on the emitter. + */ +export function onceEvent(emitter: EventEmitter, ...events: string[]): Promise { + return new Promise((resolve, reject) => { + const teardown: (() => void)[] = []; + + const handler = (data: any, event: string) => { + teardown.forEach(t => t()); + if (event === 'error') { + reject(data); + } else { + resolve(data); + } + }; + + events.forEach(event => { + const fn = (data: any) => handler(data, event); + teardown.push(() => emitter.removeListener(event, fn)); + emitter.once(event, fn); + }); + }); +} + /** * PromiseWrap provides promise-like functions that auto-invoke an exec * method when called. diff --git a/src/watch.tla b/src/watch.tla new file mode 100644 index 0000000..a52c3d7 --- /dev/null +++ b/src/watch.tla @@ -0,0 +1,215 @@ +---------------------------- MODULE Etcd3Watcher ---------------------------- + +EXTENDS TLC + +(***************************************************************************) +(* Declarations of the GRPC connection state *) +(***************************************************************************) +CONSTANT Idle, Connected, Connecting +CONSTANT Unsubscribing, Unsubscribed, Subscribing, Subscribed + +(*************************************************************************** +--algorithm Watch { + variable socket = Idle, watcher = Unsubscribed; + + (************************************************************************) + (* Small 'hack' to get around macro expansion only providing us access *) + (* to socket' rather than socket, which prevents us from verifying *) + (* transitions. *) + (************************************************************************) + define { prevWatcher == watcher } + + macro CheckInvariants() { + assert socket \in {Idle, Connected, Connecting}; + assert watcher \in {Subscribed, Subscribing, Unsubscribed, Unsubscribing}; + assert socket = Connected \/ watcher # Subscribed; + + if (prevWatcher = Unsubscribing) { + assert watcher \in {Unsubscribing, Unsubscribed} + } else if (prevWatcher = Subscribing) { + assert watcher \in {Subscribing, Subscribed} + } + } + + (************************************************************************) + (* Watcher is the class attached to the manager that handles the state *) + (* of an individual subscription. It can requset to be subscribed or *) + (* unsubscribed at any time. *) + (************************************************************************) + process (Watcher = 0) { l0: while (TRUE) { + if (watcher = Subscribed) { + watcher := Unsubscribing + } else if (watcher = Unsubscribed) { + watcher := Subscribing + } + } + } + + (************************************************************************) + (* The manager handles global socket state and reconnections. *) + (************************************************************************) + process (Manager = 1) { l1: while (TRUE) { + if (socket = Idle) { + goto connect + } else { + either goto connected or goto disconnect + }; + + disconnect: + (************************************************************) + (* Transitioning from "connected" or "connecting" to *) + (* "disonnectined". The watcher gets kicked off if they ask *) + (* to be, otherwise we mark it as reconnecting. *) + (************************************************************) + if (watcher \in {Unsubscribed, Unsubscribing}) { + watcher := Unsubscribed + } else { + watcher := Subscribing + }; + + socket := Idle; + CheckInvariants(); + + connect: + (************************************************************) + (* Transitioning from "disconnecting" to "connecting". If *) + (* the watcher wants to be unsubscribed, unsubscribe here! *) + (************************************************************) + assert socket = Idle; + if (watcher \in {Unsubscribed, Unsubscribing}) { + watcher := Unsubscribed + }; + socket := Connecting; + CheckInvariants(); + + connected: + (************************************************************) + (* In contact the server, the watcher will be unsubscribed *) + (* if it asked to be, or connecting if it's subscribing. *) + (************************************************************) + socket := Connected; + if (watcher = Subscribing) { + watcher := Subscribed + } else if (watcher = Unsubscribing) { + watcher := Unsubscribed + }; + + CheckInvariants(); + } + } +} + ***************************************************************************) +\* BEGIN TRANSLATION +VARIABLES socket, watcher, pc + +(* define statement *) +prevWatcher == watcher + + +vars == << socket, watcher, pc >> + +ProcSet == {0} \cup {1} + +Init == (* Global variables *) + /\ socket = Idle + /\ watcher = Unsubscribed + /\ pc = [self \in ProcSet |-> CASE self = 0 -> "l0" + [] self = 1 -> "l1"] + +l0 == /\ pc[0] = "l0" + /\ IF watcher = Subscribed + THEN /\ watcher' = Unsubscribing + ELSE /\ IF watcher = Unsubscribed + THEN /\ watcher' = Subscribing + ELSE /\ TRUE + /\ UNCHANGED watcher + /\ pc' = [pc EXCEPT ![0] = "l0"] + /\ UNCHANGED socket + +Watcher == l0 + +l1 == /\ pc[1] = "l1" + /\ IF socket = Idle + THEN /\ pc' = [pc EXCEPT ![1] = "connect"] + ELSE /\ \/ /\ pc' = [pc EXCEPT ![1] = "connected"] + \/ /\ pc' = [pc EXCEPT ![1] = "disconnect"] + /\ UNCHANGED << socket, watcher >> + +disconnect == /\ pc[1] = "disconnect" + /\ IF watcher \in {Unsubscribed, Unsubscribing} + THEN /\ watcher' = Unsubscribed + ELSE /\ watcher' = Subscribing + /\ socket' = Idle + /\ Assert(socket' \in {Idle, Connected, Connecting}, + "Failure of assertion at line 23, column 9 of macro called at line 71, column 13.") + /\ Assert(watcher' \in {Subscribed, Subscribing, Unsubscribed, Unsubscribing}, + "Failure of assertion at line 24, column 9 of macro called at line 71, column 13.") + /\ Assert(socket' = Connected \/ watcher' # Subscribed, + "Failure of assertion at line 25, column 9 of macro called at line 71, column 13.") + /\ IF prevWatcher = Unsubscribing + THEN /\ Assert(watcher' \in {Unsubscribing, Unsubscribed}, + "Failure of assertion at line 28, column 13 of macro called at line 71, column 13.") + ELSE /\ IF prevWatcher = Subscribing + THEN /\ Assert(watcher' \in {Subscribing, Subscribed}, + "Failure of assertion at line 30, column 13 of macro called at line 71, column 13.") + ELSE /\ TRUE + /\ pc' = [pc EXCEPT ![1] = "connect"] + +connect == /\ pc[1] = "connect" + /\ Assert(socket = Idle, + "Failure of assertion at line 78, column 13.") + /\ IF watcher \in {Unsubscribed, Unsubscribing} + THEN /\ watcher' = Unsubscribed + ELSE /\ TRUE + /\ UNCHANGED watcher + /\ socket' = Connecting + /\ Assert(socket' \in {Idle, Connected, Connecting}, + "Failure of assertion at line 23, column 9 of macro called at line 83, column 13.") + /\ Assert(watcher' \in {Subscribed, Subscribing, Unsubscribed, Unsubscribing}, + "Failure of assertion at line 24, column 9 of macro called at line 83, column 13.") + /\ Assert(socket' = Connected \/ watcher' # Subscribed, + "Failure of assertion at line 25, column 9 of macro called at line 83, column 13.") + /\ IF prevWatcher = Unsubscribing + THEN /\ Assert(watcher' \in {Unsubscribing, Unsubscribed}, + "Failure of assertion at line 28, column 13 of macro called at line 83, column 13.") + ELSE /\ IF prevWatcher = Subscribing + THEN /\ Assert(watcher' \in {Subscribing, Subscribed}, + "Failure of assertion at line 30, column 13 of macro called at line 83, column 13.") + ELSE /\ TRUE + /\ pc' = [pc EXCEPT ![1] = "connected"] + +connected == /\ pc[1] = "connected" + /\ socket' = Connected + /\ IF watcher = Subscribing + THEN /\ watcher' = Subscribed + ELSE /\ IF watcher = Unsubscribing + THEN /\ watcher' = Unsubscribed + ELSE /\ TRUE + /\ UNCHANGED watcher + /\ Assert(socket' \in {Idle, Connected, Connecting}, + "Failure of assertion at line 23, column 9 of macro called at line 97, column 13.") + /\ Assert(watcher' \in {Subscribed, Subscribing, Unsubscribed, Unsubscribing}, + "Failure of assertion at line 24, column 9 of macro called at line 97, column 13.") + /\ Assert(socket' = Connected \/ watcher' # Subscribed, + "Failure of assertion at line 25, column 9 of macro called at line 97, column 13.") + /\ IF prevWatcher = Unsubscribing + THEN /\ Assert(watcher' \in {Unsubscribing, Unsubscribed}, + "Failure of assertion at line 28, column 13 of macro called at line 97, column 13.") + ELSE /\ IF prevWatcher = Subscribing + THEN /\ Assert(watcher' \in {Subscribing, Subscribed}, + "Failure of assertion at line 30, column 13 of macro called at line 97, column 13.") + ELSE /\ TRUE + /\ pc' = [pc EXCEPT ![1] = "l1"] + +Manager == l1 \/ disconnect \/ connect \/ connected + +Next == Watcher \/ Manager + +Spec == Init /\ [][Next]_vars + +\* END TRANSLATION + +============================================================================= +\* Modification History +\* Last modified Thu Jun 15 21:12:44 PDT 2017 by Connor +\* Created Thu Jun 15 08:07:31 PDT 2017 by Connor diff --git a/src/watch.ts b/src/watch.ts new file mode 100644 index 0000000..1c471b8 --- /dev/null +++ b/src/watch.ts @@ -0,0 +1,430 @@ +import { EventEmitter } from 'events'; + +import { castGrpcErrorMessage, ClientRuntimeError, EtcdError } from './errors'; +import { Rangable, Range } from './range'; +import * as RPC from './rpc'; +import { NSApplicator, onceEvent, toBuffer } from './util'; + +const enum State { + Idle, + Connecting, + Connected, +} + +/** + * The WatchManager is a singleton that exists in namespaces to handle watching + * multiple keys in a single GRPC stream. The underlying stream will only be + * alive if there's at least one watcher. + * + * This class is not exposed externally. + */ +export class WatchManager { + /** + * Current state of the watcher. + */ + private state = State.Idle; + + /** + * The current GRPC stream, if any. + */ + private stream: null | RPC.IDuplexStream; + + /** + * List of attached watchers. + */ + private watchers: Watcher[] = []; + + /** + * Set of watchers we're currently closing. + */ + private expectedClosers = new Set(); + + constructor(private readonly client: RPC.WatchClient) {} + + /** + * Attach registers the watcher on the connection. + */ + public attach(watcher: Watcher) { + this.watchers.push(watcher); + + switch (this.state) { + case State.Idle: + this.establishStream(); + break; + case State.Connecting: + break; + case State.Connected: + this.getStream().write({ create_request: watcher.request }); + break; + default: + throw new ClientRuntimeError(`Unknown watcher state ${this.state}`); + } + } + + /** + * Detaches a watcher from the connection. + */ + public detach(watcher: Watcher): Promise { + // If we aren't connected, just remove the watcher, easy. + if (this.state !== State.Connected) { + this.watchers = this.watchers.filter(w => w !== watcher); + return Promise.resolve(); + } + + // If we're awaiting an ID to come back, wait for that to happen or for + // us to lose connection, whichever happens first. + if (watcher.id === null) { + return onceEvent(watcher, 'connected', 'disconnected') + .then(() => this.detach(watcher)); + } + + // If the watcher does have an ID, mark that we expect to close it and + // run the cancellation request. The 'end' event will get fired when + // the cancellation comes back, or if we reconnect and see that we + // wanted to cancel the Watcher. + this.expectedClosers.add(watcher); + this.getStream().write({ cancel_request: { watch_id: watcher.id } }); + return onceEvent(watcher, 'end').then(() => undefined); + } + + /** + * Returns the current GRPC stream, *throwing* if we aren't in a state where + * we can get the stream. Calls here are only valid if state == Connected + */ + private getStream() { + if (this.state !== State.Connected) { + throw new ClientRuntimeError('Cannot call getStream() if state != Connected'); + } + if (!this.stream) { + throw new ClientRuntimeError('Expected the watcher stream to exist while state == Connected'); + } + + return this.stream; + } + + /** + * Establishes a GRPC watcher stream, if there are any active watcher. + */ + private establishStream() { + if (this.state !== State.Idle) { + throw new ClientRuntimeError('Cannot call establishStream() if state != Idle'); + } + + // possible we reconnect and watchers are removed in the meantime + if (this.watchers.length === 0) { + return; + } + + // clear anyone who is in the process of closing, we won't re-add them + this.expectedClosers.forEach(watcher => { + this.watchers = this.watchers.filter(w => w !== watcher); + watcher.emit('end'); + }); + this.expectedClosers.clear(); + + this.state = State.Connecting; + this.client.watch() + .then(stream => { + this.state = State.Connected; + this.stream = stream + .on('data', res => this.handleResponse(res)) + .on('error', err => this.handleError(err)); + + // possible watchers are remove while we're connecting. + if (this.watchers.length === 0) { + return this.destroyStream(); + } + + this.watchers.forEach(watcher => { + stream.write({ create_request: watcher.request }); + }); + }) + .catch(err => this.handleError(err)); + } + + /** + * Closes the currently attached watcher stream. + */ + private destroyStream() { + if (this.state !== State.Connected) { + throw new ClientRuntimeError('Cannot call establishStream() if state != Connected'); + } + if (this.watchers.length > 0) { + throw new ClientRuntimeError('Cannot call destroyStream() with active watchers'); + } + + this.getStream().end(); + } + + /** + * Handles an error emission on the current stream. Emits a message out to + * all attached watchers and tries to reconnect. + */ + private handleError(err: Error) { + if (this.state === State.Connected) { + this.getStream().end(); + } + this.state = State.Idle; + + this.watchers.forEach(watcher => { + watcher.emit('disconnected', err); + (<{ id: null }> watcher).id = null; + }); + + this.establishStream(); + } + + /** + * Handles a create response, assigning the watcher ID to the next pending + * watcher. + */ + private handleCreateResponse(res: RPC.IWatchResponse) { + // responses are in-order, the response we get will be for the first + // watcher that's waiting for its ID. + const target = this.watchers.find(watcher => watcher.id === null); + if (!target) { + throw new ClientRuntimeError('Could not find watcher corresponding to create response'); + } + + (<{ id: string }> target).id = res.watch_id; + target.emit('connected', res); + } + + /** + * Handles a cancel update, gracefully closing the watcher if it's expected, + * or emitting an error if it's not. + */ + private handleCancelResponse(watcher: Watcher, res: RPC.IWatchResponse) { + this.watchers = this.watchers.filter(w => w !== watcher); + + if (this.expectedClosers.has(watcher)) { + this.expectedClosers.delete(watcher); + watcher.emit('end'); + return; + } + + watcher.emit('error', castGrpcErrorMessage(`Watcher canceled: ${res.cancel_reason}`)); + } + + /** + * Emits a data update on the target watcher. + */ + private handleUpdateResponse(watcher: Watcher, res: RPC.IWatchResponse) { + watcher.emit('data', res); + } + + /** + * Dispatches some watch response on the event stream. + */ + private handleResponse(res: RPC.IWatchResponse) { + if (res.created) { + this.handleCreateResponse(res); + return; + } + + const watcher = this.watchers.find(w => w.id === res.watch_id); + if (!watcher) { + throw new ClientRuntimeError('Failed to find watcher for IWatchResponse'); + } + + if (!res.canceled) { + this.handleUpdateResponse(watcher, res); + return; + } + + this.handleCancelResponse(watcher, res); + if (this.watchers.length === 0) { + this.destroyStream(); + } + } +} + +export const operationNames = { + put: RPC.FilterType.Nodelete, + delete: RPC.FilterType.Noput, +}; + +/** + * WatchBuilder is used for creating etcd watchers. The created watchers are + * resilient against disconnections, automatically resubscribing and replaying + * changes when reconnecting. + * + * ``` + * const client = new Etcd3(); + * + * client.watch() + * .key('foo') + * .create() + * .then(watcher => { + * watcher + * .on('disconnected', () => console.log('disconnected...')) + * .on('connected', () => console.log('successfully reconnected!')) + * .on('put', res => console.log('foo got set to:', res.value.toString()); + * }); + * ``` + */ +export class WatchBuilder { + private request: RPC.IWatchCreateRequest = { progress_notify: true }; + + constructor( + private readonly manager: WatchManager, + private readonly namespace: NSApplicator, + ) {} + + /** + * Sets a single key to be watched. + */ + public key(key: string | Buffer): this { + this.request.key = toBuffer(key); + return this; + } + /** + * Prefix instructs the watcher to watch all keys with the given prefix. + */ + public prefix(value: string | Buffer): this { + return this.inRange(Range.prefix(value)); + } + + /** + * inRange instructs the builder to watch keys in the specified byte range. + */ + public inRange(r: Rangable): this { + const range = Range.from(r); + this.request.key = range.start; + this.request.range_end = range.end; + return this; + } + + /** + * ignore omits certain operation kinds from the watch stream. + */ + public ignore(...operations: (keyof typeof operationNames)[]): this { + this.request.filters = operations.map(op => operationNames[op]); + return this; + } + + /** + * Instructs the watcher to return the previous key/value pair in updates. + */ + public withPreviousKV(): this { + this.request.prev_kv = true; + return this; + } + + /** + * Resolves the watch request into a Watcher, and fires off to etcd. + */ + public create(): Promise { + const watcher = new Watcher( + this.manager, + this.namespace, + this.namespace.applyToRequest(this.request), + ); + + return onceEvent(watcher, 'connected').then(() => watcher); + } +} + +/** + * The Watcher encapsulates + */ +export class Watcher extends EventEmitter { + /** + * id is the watcher's ID in etcd. This is `null` initially and during + * reconnections, only populated while the watcher is idle. + */ + public readonly id: string | null = null; + + constructor( + private readonly manager: WatchManager, + private readonly namespace: NSApplicator, + public readonly request: RPC.IWatchCreateRequest, + ) { + super(); + this.manager.attach(this); + + this.on('data', changes => { + changes.events.forEach(ev => { + ev.kv.key = this.namespace.unprefix(ev.kv.key); + if (ev.prev_kv) { + ev.prev_kv.key = this.namespace.unprefix(ev.prev_kv.key); + } + + this.emit(ev.type.toLowerCase(), ev.kv, ev.prev_kv); + }); + + this.request.start_revision = Number(changes.header.revision) + 1; + }); + + this.on('connected', changes => { + this.request.start_revision = Number(changes.header.revision) + 1; + }); + } + + /** + * connected is fired after etcd knowledges the watcher is connected. + * When this event is fired, `id` will already be populated. + */ + public on(event: 'connected', handler: (res: RPC.IWatchResponse) => void): this; + + /** + * data is fired when etcd reports an update + * on one of the keys being watched. + */ + public on(event: 'data', handler: (res: RPC.IWatchResponse) => void): this; + + /** + * put is fired, in addition to `data`, when a key is created + * or updated in etcd. + */ + public on(event: 'put', handler: (kv: RPC.IKeyValue, previous?: RPC.IKeyValue) => void): this; + + /** + * put is fired, in addition to `data`, when a key is deleted from etcd. + */ + public on(event: 'delete', handler: (kv: RPC.IKeyValue, previous?: RPC.IKeyValue) => void): this; + + /** + * end is fired after the watcher is closed normally. Like Node.js streams, + * end is NOT fired if `error` is fired. + */ + public on(event: 'end', handler: () => void): this; + + /** + * disconnected is fired if the watcher is disconnected from etcd. The + * watcher will automatically attempt to reconnect when this occurs. When + * this event is fired, `id` will still be populated if it was previously. + */ + public on(event: 'disconnected', handler: (res: EtcdError) => void): this; + + /** + * error is fired if a non-recoverable error occurs that prevents the watcher + * from functioning. This generally occurs if etcd unexpectedly canceled our + * lease, which can occur if (for example) we don't have permission to read + * the watched key or range. When this event is fired, `id` will still be + * populated if it was previously. + */ + public on(event: 'error', handler: (err: EtcdError) => void): this; + /** + * Implements EventEmitter.on(...). + */ + public on(event: string, handler: Function): this { // tslint:disable-line + return super.on(event, handler); + } + + /** + * lastRevision returns the latest etcd cluster revision that this + * watcher observed. This will be `null` if the watcher has not yet + * connected. + */ + public lastRevision(): number | null { + return this.request.start_revision; + } + + /** + * Cancels the watcher. + */ + public cancel(): Promise { + return this.manager.detach(this); + } +} diff --git a/test/client.test.ts b/test/client.test.ts index b28c386..e715590 100644 --- a/test/client.test.ts +++ b/test/client.test.ts @@ -1,59 +1,14 @@ import { expect } from 'chai'; import * as sinon from 'sinon'; -import { - Etcd3, - EtcdAuthenticationFailedError, - EtcdLeaseInvalidError, - EtcdLockFailedError, - EtcdPermissionDeniedError, - EtcdRoleExistsError, - EtcdRoleNotFoundError, - EtcdRoleNotGrantedError, - EtcdUserExistsError, - EtcdUserNotFoundError, - GRPCConnectFailedError, - Lease, - Namespace, - Role, -} from '../src'; -import { getOptions } from './util'; - -function expectReject(promise: Promise, err: { new (message: string): Error }) { - return promise - .then(() => { throw new Error('expected to reject'); }) - .catch(actualErr => { - if (!(actualErr instanceof err)) { - console.error(actualErr.stack); - expect(actualErr).to.be.an.instanceof(err); - } - }); -} - -function wipeAll(things: Promise<{ delete(): any }[]>) { - return things.then(items => Promise.all(items.map(item => item.delete()))); -} +import { Etcd3 } from '../src'; +import { createTestClientAndKeys, tearDownTestClient } from './util'; describe('client', () => { let client: Etcd3; - let badClient: Etcd3; - beforeEach(() => { - client = new Etcd3(getOptions()); - badClient = new Etcd3(getOptions({ hosts: '127.0.0.1:1' })); - return Promise.all([ - client.put('foo1').value('bar1'), - client.put('foo2').value('bar2'), - client.put('foo3').value('{"value":"bar3"}'), - client.put('baz').value('bar5'), - ]); - }); - - afterEach(async () => { - await client.delete().all(); - client.close(); - badClient.close(); - }); + beforeEach(async () => (client = await createTestClientAndKeys())); + afterEach(async () => await tearDownTestClient(client)); it('allows mocking', async () => { const mock = client.mock({ @@ -67,496 +22,4 @@ describe('client', () => { client.unmock(); expect(await client.get('foo1').string()).to.equal('bar1'); }); - - describe('namespacing', () => { - let ns: Namespace; - beforeEach(() => ns = client.namespace('user1/')); - - const assertEqualInNamespace = async (key: string, value: string) => { - expect(await ns.get(key)).to.equal(value); - expect(await client.get(`user1/${key}`)).to.equal(value); - }; - - it('puts and gets values in the namespace', async () => { - await ns.put('foo').value(''); - await assertEqualInNamespace('foo', ''); - expect(await ns.getAll().strings()).to.deep.equal({ foo: '' }); - }); - - it('deletes values in the namespace', async () => { - await ns.put('foo1').value(''); - await ns.put('foo2').value(''); - - await ns.delete().key('foo1'); - expect(await ns.getAll().strings()).to.deep.equal({ foo2: '' }); - await ns.delete().all(); - - expect(await ns.getAll().strings()).to.deep.equal({}); - expect(await client.getAll().keys()).to.have.length.greaterThan(0); - }); - - it('contains leases in the namespace', async () => { - const lease = ns.lease(100); - await lease.put('leased').value(''); - await assertEqualInNamespace('leased', ''); - await lease.revoke(); - }); - - it('contains locks in the namespace', async () => { - const lock = ns.lock('mylock'); - await lock.acquire(); - expect(await ns.get('mylock')).to.not.be.null; - expect(await client.get('user1/mylock')).to.not.be.null; - await lock.release(); - }); - - it('runs a simple if', async () => { - await ns.put('foo1').value('potatoes'); - await ns.if('foo1', 'Value', '==', 'potatoes') - .then(ns.put('foo1').value('tomatoes')) - .commit(); - - await assertEqualInNamespace('foo1', 'tomatoes'); - }); - }); - - describe('get() / getAll()', () => { - it('lists all values', async () => { - expect(await client.getAll().strings()).to.deep.equal({ - foo1: 'bar1', - foo2: 'bar2', - foo3: '{"value":"bar3"}', - baz: 'bar5', - }); - }); - - it('gets single keys with various encoding', async () => { - expect(await client.get('foo1').string()).to.equal('bar1'); - expect(await client.get('foo2').buffer()).to.deep.equal(Buffer.from('bar2')); - expect(await client.get('foo3').json()).to.deep.equal({ value: 'bar3' }); - expect(await client.get('wut').string()).to.be.null; - }); - - it('queries prefixes', async () => { - expect(await client.getAll().prefix('fo').strings()).to.deep.equal({ - foo1: 'bar1', - foo2: 'bar2', - foo3: '{"value":"bar3"}', - }); - }); - - it('gets keys', async () => { - expect(await client.getAll().keys()).to.have.members(['foo1', 'foo2', 'foo3', 'baz']); - expect(await client.getAll().keyBuffers()).to.have.deep.members([ - Buffer.from('foo1'), - Buffer.from('foo2'), - Buffer.from('foo3'), - Buffer.from('baz'), - ]); - }); - - it('counts', async () => { - expect(await client.getAll().count()).to.equal(4); - }); - - it('sorts', async () => { - expect(await client.getAll() - .prefix('foo') - .sort('Key', 'Ascend') - .limit(2) - .keys(), - ).to.deep.equal(['foo1', 'foo2']); - - expect(await client.getAll() - .prefix('foo') - .sort('Key', 'Descend') - .limit(2) - .keys(), - ).to.deep.equal(['foo3', 'foo2']); - }); - }); - - describe('delete()', () => { - it('deletes all', async () => { - await client.delete().all(); - expect(await client.getAll().count()).to.equal(0); - }); - - it('deletes prefix', async () => { - await client.delete().prefix('foo'); - expect(await client.getAll().keys()).to.deep.equal(['baz']); - }); - - it('gets previous', async () => { - expect(await client.delete().key('foo1').getPrevious()).to.containSubset([ - { - key: new Buffer('foo1'), - value: new Buffer('bar1'), - }, - ]); - }); - - describe('put()', () => { - it('allows touching key revisions', async () => { - const original = (await client.get('foo1').exec()).kvs[0].mod_revision; - await client.put('foo1').touch(); - const updated = (await client.get('foo1').exec()).kvs[0].mod_revision; - expect(Number(updated)).to.be.greaterThan(Number(original)); - }); - - it('updates key values', async () => { - await client.put('foo1').value('updated'); - expect(await client.get('foo1').string()).to.equal('updated'); - }); - - it('includes previous values', async () => { - expect(await client.put('foo1').value('updated').getPrevious()).to.containSubset({ - key: new Buffer('foo1'), - value: new Buffer('bar1'), - }); - }); - }); - }); - - describe('lease()', () => { - let lease: Lease; - - const watchEmission = (event: string): { data: any, fired: boolean } => { - const output = { data: null, fired: false }; - lease.once(event, (data: any) => { - output.data = data; - output.fired = true; - }); - - return output; - }; - - const onEvent = (event: string): Promise => { - return new Promise(resolve => lease.once(event, (data: any) => resolve(data))); - }; - - afterEach(async () => { - if (lease && !lease.revoked()) { - await lease.revoke(); - } - }); - - it('throws if trying to use too short of a ttl', () => { - expect(() => client.lease(0)).to.throw(/must be at least 1 second/); - }); - - it('reports a loss and errors if the client is invalid', async () => { - lease = badClient.lease(1); - const err = await onEvent('lost'); - expect(err).to.be.an.instanceof(GRPCConnectFailedError); - await lease.grant() - .then(() => { throw new Error('expected to reject'); }) - .catch(err2 => expect(err2).to.equal(err)); - }); - - it('provides basic lease lifecycle', async () => { - lease = client.lease(100); - await lease.put('leased').value('foo'); - expect((await client.get('leased').exec()).kvs[0].lease).to.equal(await lease.grant()); - await lease.revoke(); - expect(await client.get('leased').buffer()).to.be.null; - }); - - it('runs immediate keepalives', async () => { - lease = client.lease(100); - expect(await lease.keepaliveOnce()).to.containSubset({ - ID: await lease.grant(), - TTL: '100', - }); - await lease.keepaliveOnce(); - }); - - it('emits a lost event if the lease is invalidated', async () => { - lease = client.lease(100); - let err: Error; - lease.on('lost', e => err = e); - expect(lease.revoked()).to.be.false; - await client.leaseClient.leaseRevoke({ ID: await lease.grant() }); - - await lease.keepaliveOnce() - .then(() => { throw new Error('expected to reject'); }) - .catch(err2 => { - expect(err2).to.equal(err); - expect(err2).to.be.an.instanceof(EtcdLeaseInvalidError); - expect(lease.revoked()).to.be.true; - }); - }); - - describe('crons', () => { - let clock: sinon.SinonFakeTimers; - - beforeEach(async () => { - clock = sinon.useFakeTimers(); - lease = client.lease(60); - await onEvent('keepaliveEstablished'); - }); - - afterEach(() => clock.restore()); - - it('touches the lease ttl at the correct interval', async () => { - const kaFired = watchEmission('keepaliveFired'); - clock.tick(19999); - expect(kaFired.fired).to.be.false; - clock.tick(1); - expect(kaFired.fired).to.be.true; - - const res = await onEvent('keepaliveSucceeded'); - expect(res.TTL).to.equal('60'); - }); - - it('tears down if the lease gets revoked', async () => { - await client.leaseClient.leaseRevoke({ ID: await lease.grant() }); - clock.tick(20000); - expect(await onEvent('lost')).to.be.an.instanceof(EtcdLeaseInvalidError); - expect(lease.revoked()).to.be.true; - }); - }); - - describe('if()', () => { - it('runs a simple if', async () => { - await client.if('foo1', 'Value', '==', 'bar1') - .then(client.put('foo1').value('bar2')) - .commit(); - - expect(await client.get('foo1').string()).to.equal('bar2'); - }); - - it('runs consequents', async () => { - await client.if('foo1', 'Value', '==', 'bar1') - .then(client.put('foo1').value('bar2')) - .else(client.put('foo1').value('bar3')) - .commit(); - - expect(await client.get('foo1').string()).to.equal('bar2'); - }); - - it('runs multiple clauses and consequents', async () => { - const result = await client.if('foo1', 'Value', '==', 'bar1') - .and('foo2', 'Value', '==', 'wut') - .then(client.put('foo1').value('bar2')) - .else(client.put('foo1').value('bar3'), client.get('foo2')) - .commit(); - - expect(result.responses[1].response_range.kvs[0].value.toString()) - .to.equal('bar2'); - expect(await client.get('foo1').string()).to.equal('bar3'); - }); - }); - - describe('lock()', () => { - const assertCantLock = () => { - return client.lock('resource') - .acquire() - .then(() => { throw new Error('expected to throw'); }) - .catch(err => expect(err).to.be.an.instanceof(EtcdLockFailedError)); - }; - - const assertAbleToLock = async () => { - const lock = client.lock('resource'); - await lock.acquire(); - await lock.release(); - }; - - it('locks exclusively around a resource', async () => { - const lock1 = client.lock('resource'); - await lock1.acquire(); - - await assertCantLock(); - await lock1.release(); - - await assertAbleToLock(); - }); - - it('provides locking around functions', async () => { - await client.lock('resource').do(assertCantLock); - await assertAbleToLock(); - }); - }); - }); - - describe('roles', () => { - afterEach(() => wipeAll(client.getRoles())); - - const expectRoles = async (expected: string[]) => { - const list = await client.getRoles(); - expect(list.map(r => r.name)).to.deep.equal(expected); - }; - - it('create and deletes', async () => { - const fooRole = await client.role('foo').create(); - await expectRoles(['foo']); - await fooRole.delete(); - await expectRoles([]); - }); - - it('throws on existing roles', async () => { - await client.role('foo').create(); - await expectReject(client.role('foo').create(), EtcdRoleExistsError); - }); - - it('throws on deleting a non-existent role', async () => { - await expectReject(client.role('foo').delete(), EtcdRoleNotFoundError); - }); - - it('throws on granting permission to a non-existent role', async () => { - await expectReject( - client.role('foo').grant({ - permission: 'Read', - range: client.range({ prefix: '111' }), - }), - EtcdRoleNotFoundError, - ); - }); - - it('round trips permission grants', async () => { - const fooRole = await client.role('foo').create(); - await fooRole.grant({ - permission: 'Read', - range: client.range({ prefix: '111' }), - }); - - const perms = await fooRole.permissions(); - expect(perms).to.containSubset([ - { - permission: 'Read', - range: client.range({ prefix: '111' }), - }, - ]); - - await fooRole.revoke(perms[0]); - expect(await fooRole.permissions()).to.have.length(0); - }); - }); - - describe('users', () => { - let fooRole: Role; - beforeEach(async () => { - fooRole = client.role('foo'); - await fooRole.create(); - }); - - afterEach(async () => { - await fooRole.delete(); - await wipeAll(client.getUsers()); - }); - - it('creates users', async () => { - expect(await client.getUsers()).to.have.lengthOf(0); - await client.user('connor').create('password'); - expect(await client.getUsers()).to.containSubset([{ name: 'connor' }]); - }); - - it('throws on existing users', async () => { - await client.user('connor').create('password'); - await expectReject(client.user('connor').create('password'), EtcdUserExistsError); - }); - - it('throws on regranting the same role multiple times', async () => { - const user = await client.user('connor').create('password'); - await expectReject(user.removeRole(fooRole), EtcdRoleNotGrantedError); - }); - - it('throws on granting a non-existent role', async () => { - const user = await client.user('connor').create('password'); - await expectReject(user.addRole('wut'), EtcdRoleNotFoundError); - }); - - it('throws on deleting a non-existent user', async () => { - await expectReject(client.user('connor').delete(), EtcdUserNotFoundError); - }); - - it('round trips roles', async () => { - const user = await client.user('connor').create('password'); - await user.addRole(fooRole); - expect(await user.roles()).to.containSubset([{ name: 'foo' }]); - await user.removeRole(fooRole); - expect(await user.roles()).to.have.lengthOf(0); - }); - }); - - describe('password auth', () => { - beforeEach(async () => { - await wipeAll(client.getUsers()); - await wipeAll(client.getRoles()); - - // We need to set up a root user and root role first, otherwise etcd - // will yell at us. - const rootUser = await client.user('root').create('password'); - rootUser.addRole('root'); - - await client.user('connor').create('password'); - - const normalRole = await client.role('rw_prefix_f').create(); - await normalRole.grant({ - permission: 'Readwrite', - range: client.range({ prefix: 'f' }), - }); - await normalRole.addUser('connor'); - await client.auth.authEnable(); - }); - - afterEach(async () => { - const rootClient = new Etcd3(getOptions({ - auth: { - username: 'root', - password: 'password', - }, - })); - - await rootClient.auth.authDisable(); - rootClient.close(); - - await wipeAll(client.getUsers()); - await wipeAll(client.getRoles()); - }); - - it('allows authentication using the correct credentials', async () => { - const authedClient = new Etcd3(getOptions({ - auth: { - username: 'connor', - password: 'password', - }, - })); - - await authedClient.put('foo').value('bar'); - authedClient.close(); - }); - - it('rejects modifying a key the client has no access to', async () => { - const authedClient = new Etcd3(getOptions({ - auth: { - username: 'connor', - password: 'password', - }, - })); - - await expectReject( - authedClient.put('wut').value('bar').exec(), - EtcdPermissionDeniedError, - ); - - authedClient.close(); - }); - - it('throws when using incorrect credentials', async () => { - const authedClient = new Etcd3(getOptions({ - auth: { - username: 'connor', - password: 'bad password', - }, - })); - - await expectReject( - authedClient.put('foo').value('bar').exec(), - EtcdAuthenticationFailedError, - ); - - authedClient.close(); - }); - }); }); diff --git a/test/crud.test.ts b/test/crud.test.ts new file mode 100644 index 0000000..b219db9 --- /dev/null +++ b/test/crud.test.ts @@ -0,0 +1,122 @@ +import { expect } from 'chai'; + +import { Etcd3 } from '../src'; +import { createTestClientAndKeys, tearDownTestClient } from './util'; + +describe('crud', () => { + let client: Etcd3; + + beforeEach(async () => (client = await createTestClientAndKeys())); + afterEach(async () => await tearDownTestClient(client)); + + describe('get() / getAll()', () => { + it('lists all values', async () => { + expect(await client.getAll().strings()).to.deep.equal({ + foo1: 'bar1', + foo2: 'bar2', + foo3: '{"value":"bar3"}', + baz: 'bar5', + }); + }); + + it('gets single keys with various encoding', async () => { + expect(await client.get('foo1').string()).to.equal('bar1'); + expect(await client.get('foo2').buffer()).to.deep.equal( + Buffer.from('bar2'), + ); + expect(await client.get('foo3').json()).to.deep.equal({ value: 'bar3' }); + expect(await client.get('wut').string()).to.be.null; + }); + + it('queries prefixes', async () => { + expect(await client.getAll().prefix('fo').strings()).to.deep.equal({ + foo1: 'bar1', + foo2: 'bar2', + foo3: '{"value":"bar3"}', + }); + }); + + it('gets keys', async () => { + expect(await client.getAll().keys()).to.have.members([ + 'foo1', + 'foo2', + 'foo3', + 'baz', + ]); + expect(await client.getAll().keyBuffers()).to.have.deep.members([ + Buffer.from('foo1'), + Buffer.from('foo2'), + Buffer.from('foo3'), + Buffer.from('baz'), + ]); + }); + + it('counts', async () => { + expect(await client.getAll().count()).to.equal(4); + }); + + it('sorts', async () => { + expect( + await client + .getAll() + .prefix('foo') + .sort('Key', 'Ascend') + .limit(2) + .keys(), + ).to.deep.equal(['foo1', 'foo2']); + + expect( + await client + .getAll() + .prefix('foo') + .sort('Key', 'Descend') + .limit(2) + .keys(), + ).to.deep.equal(['foo3', 'foo2']); + }); + }); + + describe('delete()', () => { + it('deletes all', async () => { + await client.delete().all(); + expect(await client.getAll().count()).to.equal(0); + }); + + it('deletes prefix', async () => { + await client.delete().prefix('foo'); + expect(await client.getAll().keys()).to.deep.equal(['baz']); + }); + + it('gets previous', async () => { + expect(await client.delete().key('foo1').getPrevious()).to.containSubset([ + { + key: new Buffer('foo1'), + value: new Buffer('bar1'), + }, + ]); + }); + + describe('put()', () => { + it('allows touching key revisions', async () => { + const original = (await client.get('foo1').exec()).kvs[0].mod_revision; + await client.put('foo1').touch(); + const updated = (await client.get('foo1').exec()).kvs[0].mod_revision; + expect(Number(updated)).to.be.greaterThan(Number(original)); + }); + + it('updates key values', async () => { + await client.put('foo1').value('updated'); + expect(await client.get('foo1').string()).to.equal('updated'); + }); + + it('includes previous values', async () => { + expect( + await client.put('foo1').value('updated').getPrevious(), + ).to.containSubset({ + key: new Buffer('foo1'), + value: new Buffer('bar1'), + }); + }); + }); + }); +}); diff --git a/test/lease.test.ts b/test/lease.test.ts new file mode 100644 index 0000000..f8d2297 --- /dev/null +++ b/test/lease.test.ts @@ -0,0 +1,156 @@ +import { expect } from 'chai'; +import * as sinon from 'sinon'; + +import { + Etcd3, + EtcdLeaseInvalidError, + GRPCConnectFailedError, + Lease, +} from '../src'; +import { onceEvent } from '../src/util'; +import { + createTestClientAndKeys, + getOptions, + proxy, + tearDownTestClient, +} from './util'; + +describe('lease()', () => { + let client: Etcd3; + let lease: Lease; + + beforeEach(async () => (client = await createTestClientAndKeys())); + afterEach(async () => await tearDownTestClient(client)); + + const watchEmission = (event: string): { data: any; fired: boolean } => { + const output = { data: null, fired: false }; + lease.once(event, (data: any) => { + output.data = data; + output.fired = true; + }); + + return output; + }; + + afterEach(async () => { + if (lease && !lease.revoked()) { + await lease.revoke(); + } + }); + + it('throws if trying to use too short of a ttl', () => { + expect(() => client.lease(0)).to.throw(/must be at least 1 second/); + }); + + it('reports a loss and errors if the client is invalid', async () => { + const badClient = new Etcd3(getOptions({ hosts: '127.0.0.1:1' })); + lease = badClient.lease(1); + const err = await onceEvent(lease, 'lost'); + expect(err).to.be.an.instanceof(GRPCConnectFailedError); + await lease + .grant() + .then(() => { + throw new Error('expected to reject'); + }) + .catch(err2 => expect(err2).to.equal(err)); + badClient.close(); + }); + + it('provides basic lease lifecycle', async () => { + lease = client.lease(100); + await lease.put('leased').value('foo'); + expect((await client.get('leased').exec()).kvs[0].lease).to.equal( + await lease.grant(), + ); + await lease.revoke(); + expect(await client.get('leased').buffer()).to.be.null; + }); + + it('runs immediate keepalives', async () => { + lease = client.lease(100); + expect(await lease.keepaliveOnce()).to.containSubset({ + ID: await lease.grant(), + TTL: '100', + }); + await lease.keepaliveOnce(); + }); + + it('is resilient to network interruptions', async () => { + await proxy.activate(); + const proxiedClient = new Etcd3(getOptions()); + + lease = proxiedClient.lease(100); + await lease.grant(); + proxy.pause(); + await onceEvent(lease, 'keepaliveFailed'); + proxy.resume(); + await onceEvent(lease, 'keepaliveSucceeded'); + await lease.revoke(); + + proxiedClient.close(); + proxy.deactivate(); + }); + + it('marks leases as failed if the server is not contacted for a while', async () => { + await proxy.activate(); + const proxiedClient = new Etcd3(getOptions()); + + lease = proxiedClient.lease(1); + await lease.grant(); + proxy.pause(); + (lease).lastKeepAlive = Date.now() - 2000; // speed things up a little + const err = await onceEvent(lease, 'lost'); + expect(err.message).to.match(/our lease has expired/); + proxiedClient.close(); + proxy.deactivate(); + }); + + it('emits a lost event if the lease is invalidated', async () => { + lease = client.lease(100); + let err: Error; + lease.on('lost', e => (err = e)); + expect(lease.revoked()).to.be.false; + await client.leaseClient.leaseRevoke({ ID: await lease.grant() }); + + await lease + .keepaliveOnce() + .then(() => { + throw new Error('expected to reject'); + }) + .catch(err2 => { + expect(err2).to.equal(err); + expect(err2).to.be.an.instanceof(EtcdLeaseInvalidError); + expect(lease.revoked()).to.be.true; + }); + }); + + describe('crons', () => { + let clock: sinon.SinonFakeTimers; + + beforeEach(async () => { + clock = sinon.useFakeTimers(); + lease = client.lease(60); + await onceEvent(lease, 'keepaliveEstablished'); + }); + + afterEach(() => clock.restore()); + + it('touches the lease ttl at the correct interval', async () => { + const kaFired = watchEmission('keepaliveFired'); + clock.tick(19999); + expect(kaFired.fired).to.be.false; + clock.tick(1); + expect(kaFired.fired).to.be.true; + + const res = await onceEvent(lease, 'keepaliveSucceeded'); + expect(res.TTL).to.equal('60'); + }); + + it('tears down if the lease gets revoked', async () => { + await client.leaseClient.leaseRevoke({ ID: await lease.grant() }); + clock.tick(20000); + expect(await onceEvent(lease, 'lost')).to.be.an.instanceof(EtcdLeaseInvalidError); + expect(lease.revoked()).to.be.true; + }); + }); +}); diff --git a/test/lock.test.ts b/test/lock.test.ts new file mode 100644 index 0000000..d885fdd --- /dev/null +++ b/test/lock.test.ts @@ -0,0 +1,42 @@ +import { expect } from 'chai'; + +import { Etcd3, EtcdLockFailedError } from '../src'; +import { createTestClientAndKeys, tearDownTestClient } from './util'; + +describe('lock()', () => { + let client: Etcd3; + + beforeEach(async () => (client = await createTestClientAndKeys())); + afterEach(async () => await tearDownTestClient(client)); + + const assertCantLock = () => { + return client + .lock('resource') + .acquire() + .then(() => { + throw new Error('expected to throw'); + }) + .catch(err => expect(err).to.be.an.instanceof(EtcdLockFailedError)); + }; + + const assertAbleToLock = async () => { + const lock = client.lock('resource'); + await lock.acquire(); + await lock.release(); + }; + + it('locks exclusively around a resource', async () => { + const lock1 = client.lock('resource'); + await lock1.acquire(); + + await assertCantLock(); + await lock1.release(); + + await assertAbleToLock(); + }); + + it('provides locking around functions', async () => { + await client.lock('resource').do(assertCantLock); + await assertAbleToLock(); + }); +}); diff --git a/test/namespacing.test.ts b/test/namespacing.test.ts new file mode 100644 index 0000000..76876db --- /dev/null +++ b/test/namespacing.test.ts @@ -0,0 +1,64 @@ +import { expect } from 'chai'; + +import { Etcd3, Namespace } from '../src'; +import { createTestClientAndKeys, tearDownTestClient } from './util'; + +describe('namespacing', () => { + let client: Etcd3; + let ns: Namespace; + + beforeEach(async () => { + client = await createTestClientAndKeys(); + ns = client.namespace('user1/'); + }); + + afterEach(async () => await tearDownTestClient(client)); + + const assertEqualInNamespace = async (key: string, value: string) => { + expect(await ns.get(key)).to.equal(value); + expect(await client.get(`user1/${key}`)).to.equal(value); + }; + + it('puts and gets values in the namespace', async () => { + await ns.put('foo').value(''); + await assertEqualInNamespace('foo', ''); + expect(await ns.getAll().strings()).to.deep.equal({ foo: '' }); + }); + + it('deletes values in the namespace', async () => { + await ns.put('foo1').value(''); + await ns.put('foo2').value(''); + + await ns.delete().key('foo1'); + expect(await ns.getAll().strings()).to.deep.equal({ foo2: '' }); + await ns.delete().all(); + + expect(await ns.getAll().strings()).to.deep.equal({}); + expect(await client.getAll().keys()).to.have.length.greaterThan(0); + }); + + it('contains leases in the namespace', async () => { + const lease = ns.lease(100); + await lease.put('leased').value(''); + await assertEqualInNamespace('leased', ''); + await lease.revoke(); + }); + + it('contains locks in the namespace', async () => { + const lock = ns.lock('mylock'); + await lock.acquire(); + expect(await ns.get('mylock')).to.not.be.null; + expect(await client.get('user1/mylock')).to.not.be.null; + await lock.release(); + }); + + it('runs a simple if', async () => { + await ns.put('foo1').value('potatoes'); + await ns + .if('foo1', 'Value', '==', 'potatoes') + .then(ns.put('foo1').value('tomatoes')) + .commit(); + + await assertEqualInNamespace('foo1', 'tomatoes'); + }); +}); diff --git a/test/roles.test.ts b/test/roles.test.ts new file mode 100644 index 0000000..016a9a7 --- /dev/null +++ b/test/roles.test.ts @@ -0,0 +1,222 @@ +import { expect } from 'chai'; + +import { + Etcd3, + EtcdAuthenticationFailedError, + EtcdPermissionDeniedError, + EtcdRoleExistsError, + EtcdRoleNotFoundError, + EtcdRoleNotGrantedError, + EtcdUserExistsError, + EtcdUserNotFoundError, + Role, +} from '../src'; +import { + createTestClientAndKeys, + expectReject, + getOptions, + tearDownTestClient, +} from './util'; + +function wipeAll(things: Promise<{ delete(): any }[]>) { + return things.then(items => Promise.all(items.map(item => item.delete()))); +} + +describe('roles and auth', () => { + let client: Etcd3; + + beforeEach(async () => (client = await createTestClientAndKeys())); + afterEach(async () => await tearDownTestClient(client)); + + describe('management', () => { + afterEach(() => wipeAll(client.getRoles())); + + const expectRoles = async (expected: string[]) => { + const list = await client.getRoles(); + expect(list.map(r => r.name)).to.deep.equal(expected); + }; + + it('create and deletes', async () => { + const fooRole = await client.role('foo').create(); + await expectRoles(['foo']); + await fooRole.delete(); + await expectRoles([]); + }); + + it('throws on existing roles', async () => { + await client.role('foo').create(); + await expectReject(client.role('foo').create(), EtcdRoleExistsError); + }); + + it('throws on deleting a non-existent role', async () => { + await expectReject(client.role('foo').delete(), EtcdRoleNotFoundError); + }); + + it('throws on granting permission to a non-existent role', async () => { + await expectReject( + client.role('foo').grant({ + permission: 'Read', + range: client.range({ prefix: '111' }), + }), + EtcdRoleNotFoundError, + ); + }); + + it('round trips permission grants', async () => { + const fooRole = await client.role('foo').create(); + await fooRole.grant({ + permission: 'Read', + range: client.range({ prefix: '111' }), + }); + + const perms = await fooRole.permissions(); + expect(perms).to.containSubset([ + { + permission: 'Read', + range: client.range({ prefix: '111' }), + }, + ]); + + await fooRole.revoke(perms[0]); + expect(await fooRole.permissions()).to.have.length(0); + }); + }); + + describe('users', () => { + let fooRole: Role; + beforeEach(async () => { + fooRole = client.role('foo'); + await fooRole.create(); + }); + + afterEach(async () => { + await fooRole.delete(); + await wipeAll(client.getUsers()); + }); + + it('creates users', async () => { + expect(await client.getUsers()).to.have.lengthOf(0); + await client.user('connor').create('password'); + expect(await client.getUsers()).to.containSubset([{ name: 'connor' }]); + }); + + it('throws on existing users', async () => { + await client.user('connor').create('password'); + await expectReject( + client.user('connor').create('password'), + EtcdUserExistsError, + ); + }); + + it('throws on regranting the same role multiple times', async () => { + const user = await client.user('connor').create('password'); + await expectReject(user.removeRole(fooRole), EtcdRoleNotGrantedError); + }); + + it('throws on granting a non-existent role', async () => { + const user = await client.user('connor').create('password'); + await expectReject(user.addRole('wut'), EtcdRoleNotFoundError); + }); + + it('throws on deleting a non-existent user', async () => { + await expectReject(client.user('connor').delete(), EtcdUserNotFoundError); + }); + + it('round trips roles', async () => { + const user = await client.user('connor').create('password'); + await user.addRole(fooRole); + expect(await user.roles()).to.containSubset([{ name: 'foo' }]); + await user.removeRole(fooRole); + expect(await user.roles()).to.have.lengthOf(0); + }); + }); + + describe('password auth', () => { + beforeEach(async () => { + await wipeAll(client.getUsers()); + await wipeAll(client.getRoles()); + + // We need to set up a root user and root role first, otherwise etcd + // will yell at us. + const rootUser = await client.user('root').create('password'); + rootUser.addRole('root'); + + await client.user('connor').create('password'); + + const normalRole = await client.role('rw_prefix_f').create(); + await normalRole.grant({ + permission: 'Readwrite', + range: client.range({ prefix: 'f' }), + }); + await normalRole.addUser('connor'); + await client.auth.authEnable(); + }); + + afterEach(async () => { + const rootClient = new Etcd3( + getOptions({ + auth: { + username: 'root', + password: 'password', + }, + }), + ); + + await rootClient.auth.authDisable(); + rootClient.close(); + + await wipeAll(client.getUsers()); + await wipeAll(client.getRoles()); + }); + + it('allows authentication using the correct credentials', async () => { + const authedClient = new Etcd3( + getOptions({ + auth: { + username: 'connor', + password: 'password', + }, + }), + ); + + await authedClient.put('foo').value('bar'); + authedClient.close(); + }); + + it('rejects modifying a key the client has no access to', async () => { + const authedClient = new Etcd3( + getOptions({ + auth: { + username: 'connor', + password: 'password', + }, + }), + ); + + await expectReject( + authedClient.put('wut').value('bar').exec(), + EtcdPermissionDeniedError, + ); + + authedClient.close(); + }); + + it('throws when using incorrect credentials', async () => { + const authedClient = new Etcd3( + getOptions({ + auth: { + username: 'connor', + password: 'bad password', + }, + }), + ); + + await expectReject( + authedClient.put('foo').value('bar').exec(), + EtcdAuthenticationFailedError, + ); + + authedClient.close(); + }); + }); +}); diff --git a/test/transactions.test.ts b/test/transactions.test.ts new file mode 100644 index 0000000..dedbf98 --- /dev/null +++ b/test/transactions.test.ts @@ -0,0 +1,44 @@ +import { expect } from 'chai'; + +import { Etcd3 } from '../src'; +import { createTestClientAndKeys, tearDownTestClient } from './util'; + +describe('transactions', () => { + let client: Etcd3; + + beforeEach(async () => (client = await createTestClientAndKeys())); + afterEach(async () => await tearDownTestClient(client)); + + it('runs a simple if', async () => { + await client + .if('foo1', 'Value', '==', 'bar1') + .then(client.put('foo1').value('bar2')) + .commit(); + + expect(await client.get('foo1').string()).to.equal('bar2'); + }); + + it('runs consequents', async () => { + await client + .if('foo1', 'Value', '==', 'bar1') + .then(client.put('foo1').value('bar2')) + .else(client.put('foo1').value('bar3')) + .commit(); + + expect(await client.get('foo1').string()).to.equal('bar2'); + }); + + it('runs multiple clauses and consequents', async () => { + const result = await client + .if('foo1', 'Value', '==', 'bar1') + .and('foo2', 'Value', '==', 'wut') + .then(client.put('foo1').value('bar2')) + .else(client.put('foo1').value('bar3'), client.get('foo2')) + .commit(); + + expect(result.responses[1].response_range.kvs[0].value.toString()).to.equal( + 'bar2', + ); + expect(await client.get('foo1').string()).to.equal('bar3'); + }); +}); diff --git a/test/watch.test.ts b/test/watch.test.ts new file mode 100644 index 0000000..c07ad1e --- /dev/null +++ b/test/watch.test.ts @@ -0,0 +1,154 @@ +import { expect } from 'chai'; + +import { Etcd3, IKeyValue, IWatchResponse, Watcher } from '../src'; +import { onceEvent } from '../src/util'; +import { + createTestClientAndKeys, + getOptions, + proxy, + tearDownTestClient, +} from './util'; + +describe('watch', () => { + let client: Etcd3; + + beforeEach(async () => { + client = new Etcd3(getOptions()); + }); + afterEach(async () => { + await tearDownTestClient(client); + }); + + /** + * Returns the list of watchers currently attached and listening. + */ + function getWatchers(): Watcher[] { + return (client).watchManager.watchers; + } + + /** + * Checks that the watcher is getting updates for the given key. + */ + function expectWatching(watcher: Watcher, key: string): Promise { + return Promise.all([ + client.put(key).value('updated!'), + onceEvent(watcher, 'put').then((res: IKeyValue) => { + expect(res.key.toString()).to.equal(key); + expect(res.value.toString()).to.equal('updated!'); + }), + ]).then(() => watcher); + } + + /** + * Checks that the watcher is not getting updates for the given key. + */ + async function expectNotWatching( + watcher: Watcher, + key: string, + ): Promise { + let watching = false; + const listener = () => (watching = true); + watcher.on('put', listener); + await client.put(key).value('updated!'); + + return new Promise(resolve => { + setTimeout(() => { + expect(watching).to.equal(false, `expected not to be watching ${key}`); + resolve(watcher); + }, 200); + }); + } + + it('is resilient to network interruptions', async () => { + await proxy.activate(); + const proxiedClient = await createTestClientAndKeys(); + + const watcher = await proxiedClient.watch().key('foo1').create(); + proxy.pause(); + await onceEvent(watcher, 'disconnected'); + proxy.resume(); + await onceEvent(watcher, 'connected'); + await expectWatching(watcher, 'foo1'); + + proxiedClient.close(); + proxy.deactivate(); + }); + + it('replays historical updates.', async () => { + await proxy.activate(); + const proxiedClient = await createTestClientAndKeys(); + + const watcher = await proxiedClient.watch().key('foo1').create(); + + await Promise.all([ + client.put('foo1').value('update 1'), + onceEvent(watcher, 'data').then((res: IWatchResponse) => { + expect(watcher.request.start_revision).to.equal( 1 + Number(res.header.revision)); + }), + ]); + + proxy.pause(); + await onceEvent(watcher, 'disconnected'); + await client.put('foo1').value('update 2'); + proxy.resume(); + await onceEvent(watcher, 'put').then((res: IKeyValue) => { + expect(res.key.toString()).to.equal('foo1'); + expect(res.value.toString()).to.equal('update 2'); + }); + + proxiedClient.close(); + proxy.deactivate(); + }); + + describe('subscription', () => { + it('subscribes before the connection is established', async () => { + const watcher = await client.watch().key('foo1').create(); + await expectWatching(watcher, 'foo1'); + expect(getWatchers()).to.deep.equal([watcher]); + }); + + it('subscribes while the connection is still being established', async () => { + const watcher1 = client.watch().key('foo1').create(); + const watcher2 = client.watch().key('bar').create(); + + const watchers = await Promise.all([ + watcher1.then(w => expectWatching(w, 'foo1')), + watcher2.then(w => expectWatching(w, 'bar')), + ]); + + expect(getWatchers()).to.deep.equal(watchers); + }); + + it('subscribes after the connection is fully established', async () => { + const watcher1 = await client.watch().key('foo1').create(); + await expectWatching(watcher1, 'foo1'); + const watcher2 = await client.watch().key('bar').create(); + await expectWatching(watcher2, 'bar'); + expect(getWatchers()).to.deep.equal([watcher1, watcher2]); + }); + }); + + describe('unsubscribing', () => { + it('unsubscribes while the connection is established', async () => { + const watcher = await client.watch().key('foo1').create(); + await watcher.cancel(); + await expectNotWatching(watcher, 'foo1'); + expect(getWatchers()).to.deep.equal([]); + }); + + it('unsubscribes while the connection is being reestablished', async () => { + await proxy.activate(); + const proxiedClient = await createTestClientAndKeys(); + + const watcher = await proxiedClient.watch().key('foo1').create(); + proxy.pause(); + await watcher.cancel(); + + proxy.resume(); + expect(getWatchers()).to.deep.equal([]); + + proxiedClient.close(); + proxy.deactivate(); + }); + }); +});