Merge branch 'watchers-impl' into watchers-final

This commit is contained in:
Connor Peet 2017-06-21 21:16:32 -07:00
Родитель 1b88b6f1f9 898cfdf7ed
Коммит 548ce731d4
15 изменённых файлов: 1602 добавлений и 573 удалений

Просмотреть файл

@ -15,6 +15,7 @@ export class GRPCGenericError extends Error {}
/**
* GRPCConnectFailed is thrown when connecting to GRPC fails.
* @see https://github.com/grpc/grpc/blob/v1.4.x/src/node/src/constants.js#L151-L158
*/
export class GRPCConnectFailedError extends GRPCGenericError {}
@ -26,6 +27,7 @@ export class GRPCProtocolError extends GRPCGenericError {}
/**
* GRPCInternalError is thrown when a internal error occurs on either end.
* @see https://github.com/grpc/grpc/blob/v1.4.x/src/node/src/constants.js#L145-L150
*/
export class GRPCInternalError extends GRPCGenericError {}
@ -34,6 +36,61 @@ export class GRPCInternalError extends GRPCGenericError {}
*/
export class GRPCCancelledError extends GRPCGenericError {}
/**
* @see https://github.com/grpc/grpc/blob/v1.4.x/src/node/src/constants.js#L50-L57
*/
export class GRPCUnknownError extends GRPCGenericError {}
/**
* @see https://github.com/grpc/grpc/blob/v1.4.x/src/node/src/constants.js#L58-L64
*/
export class GRPCInvalidArgumentError extends GRPCGenericError {}
/**
* @see https://github.com/grpc/grpc/blob/v1.4.x/src/node/src/constants.js#L65-L72
*/
export class GRPCDeadlineExceededError extends GRPCGenericError {}
/**
* @see https://github.com/grpc/grpc/blob/v1.4.x/src/node/src/constants.js#L73-L74
*/
export class GRPCNotFoundError extends GRPCGenericError {}
/**
* @see https://github.com/grpc/grpc/blob/v1.4.x/src/node/src/constants.js#L75-L79
*/
export class GRPCAlreadyExistsError extends GRPCGenericError {}
/**
* @see https://github.com/grpc/grpc/blob/v1.4.x/src/node/src/constants.js#L89-L93
*/
export class GRPCResourceExhastedError extends GRPCGenericError {}
/**
* @see https://github.com/grpc/grpc/blob/v1.4.x/src/node/src/constants.js#L94-L116
*/
export class GRPCFailedPreconditionError extends GRPCGenericError {}
/**
* @see https://github.com/grpc/grpc/blob/v1.4.x/src/node/src/constants.js#L117-L124
*/
export class GRPCAbortedError extends GRPCGenericError {}
/**
* @see https://github.com/grpc/grpc/blob/v1.4.x/src/node/src/constants.js#L143-L144
*/
export class GRPCNotImplementedError extends GRPCGenericError {}
/**
* @see https://github.com/grpc/grpc/blob/v1.4.x/src/node/src/constants.js#L125-L142
*/
export class GRPCOutOfRangeError extends GRPCGenericError {}
/**
* @see https://github.com/grpc/grpc/blob/v1.4.x/src/node/src/constants.js#L159-L160
*/
export class GRPCDataLossError extends GRPCGenericError {}
/**
* EtcdError is an application error returned by etcd.
*/
@ -85,12 +142,18 @@ export class EtcdLockFailedError extends Error {}
/**
* EtcdAuthenticationFailedError is thrown when an invalid username/password
* combination is submitted.
*
* @see https://github.com/grpc/grpc/blob/v1.4.x/src/node/src/constants.js#L161-L165
*/
export class EtcdAuthenticationFailedError extends Error {}
/**
* EtcdPermissionDeniedError is thrown when the user attempts to modify a key
* that they don't have access to.
*
* Also can be emitted from GRPC.
*
* @see https://github.com/grpc/grpc/blob/v1.4.x/src/node/src/constants.js#L80-L88
*/
export class EtcdPermissionDeniedError extends Error {}
@ -135,9 +198,9 @@ const grpcMessageToError = new Map<string, IErrorCtor>([
['etcdserver: permission denied', EtcdPermissionDeniedError],
]);
function getMatchingGrpcError(err: Error): IErrorCtor | null {
function getMatchingGrpcError(message: string): IErrorCtor | null {
for (const [key, value] of grpcMessageToError) {
if (err.message.includes(key)) {
if (message.includes(key)) {
return value;
}
}
@ -149,6 +212,14 @@ function rewriteErrorName(str: string, ctor: new (...args: any[]) => Error): str
return str.replace(/^Error:/, `${ctor.name}:`);
}
/**
* Tries to convert an Etcd error string to an etcd error.
*/
export function castGrpcErrorMessage(message: string): Error {
const ctor = getMatchingGrpcError(message) || EtcdError;
return new ctor(message);
}
/**
* Tries to convert GRPC's generic, untyped errors to typed errors we can
* consume. Yes, this method is abhorrent.
@ -158,7 +229,7 @@ export function castGrpcError(err: Error): Error {
return err; // it looks like it's already some kind of typed error
}
let ctor = getMatchingGrpcError(err);
let ctor = getMatchingGrpcError(err.message);
if (!ctor) {
ctor = err.message.includes('etcdserver:') ? EtcdError : GRPCGenericError;
}

Просмотреть файл

@ -12,6 +12,7 @@ export * from './namespace';
export * from './options';
export * from './range';
export * from './rpc';
export { WatchBuilder, Watcher } from './watch';
/**
* Etcd3 is a high-level interface for interacting and calling etcd endpoints.

Просмотреть файл

@ -243,8 +243,8 @@ export class Lease extends EventEmitter {
// When the cluster goes down, we keep trying to reconnect. But if we're
// far past the end of our key's TTL, there's no way we're going to be
// able to renew it. Fire a "lost".
if (this.lastKeepAlive < Date.now() - 2 * 1000 * this.ttl) {
this.emit('list', new GRPCConnectFailedError('We lost connect to etcd and our lease has expired.'));
if (Date.now() - this.lastKeepAlive > 2 * 1000 * this.ttl) {
this.emit('lost', new GRPCConnectFailedError('We lost connection to etcd and our lease has expired.'));
return this.close();
}
@ -253,40 +253,49 @@ export class Lease extends EventEmitter {
return stream.end();
}
stream.on('data', res => {
if (leaseExpired(res)) {
const err = new EtcdLeaseInvalidError(res.ID);
this.emit('keepaliveFailed', err);
this.emit('lost', err);
return this.close();
}
this.lastKeepAlive = Date.now();
this.emit('keepaliveSucceeded', res);
});
stream.on('error', err => {
this.emit('keepaliveFailed', castGrpcError(err));
this.teardown();
this.keepalive();
});
const keepaliveTimer = setInterval(
() => {
this.emit('keepaliveFired');
this.grant()
.then(id => stream.write({ ID: id }))
.catch(() => this.close()); // will only throw if the initial grant failed
},
() => this.fireKeepAlive(stream),
1000 * this.ttl / 3,
);
this.teardown = () => {
this.teardown = () => undefined;
clearInterval(keepaliveTimer);
stream.end();
};
stream
.on('error', err => this.handleKeepaliveError(err))
.on('data', res => {
if (leaseExpired(res)) {
return this.handleKeepaliveError(new EtcdLeaseInvalidError(res.ID));
}
this.lastKeepAlive = Date.now();
this.emit('keepaliveSucceeded', res);
});
this.emit('keepaliveEstablished');
});
this.fireKeepAlive(stream);
}).catch(err => this.handleKeepaliveError(err));
}
private fireKeepAlive(stream: RPC.IRequestStream<RPC.ILeaseKeepAliveRequest>) {
this.emit('keepaliveFired');
this.grant()
.then(id => stream.write({ ID: id }))
.catch(() => this.close()); // will only throw if the initial grant failed
}
private handleKeepaliveError(err: Error) {
this.emit('keepaliveFailed', castGrpcError(err));
this.teardown();
if (err instanceof EtcdLeaseInvalidError) {
this.emit('lost', err);
this.close();
} else {
setTimeout(() => this.keepalive(), 100);
}
}
}

Просмотреть файл

@ -5,6 +5,7 @@ import { Lock } from './lock';
import { Rangable, Range } from './range';
import * as RPC from './rpc';
import { NSApplicator, toBuffer } from './util';
import { WatchBuilder, WatchManager } from './watch';
/**
* Namespace is the class on which CRUD operations can be invoked. The default
@ -30,13 +31,13 @@ export class Namespace {
private readonly nsApplicator = new NSApplicator(this.prefix);
public readonly kv = new RPC.KVClient(this.pool);
public readonly leaseClient = new RPC.LeaseClient(this.pool);
public readonly watch = new RPC.WatchClient(this.pool);
public readonly watchClient = new RPC.WatchClient(this.pool);
private readonly watchManager = new WatchManager(this.watchClient);
constructor(
protected readonly prefix: Buffer,
protected readonly pool: ConnectionPool,
) {}
/**
* `.get()` starts a query to look up a single key from etcd.
*/
@ -97,6 +98,14 @@ export class Namespace {
.and(key, column, cmp, value);
}
/**
* `.watch()` creates a new watch builder. See the documentation on the
* WatchBuilder for usage examples.
*/
public watch(): WatchBuilder {
return new WatchBuilder(this.watchManager, this.nsApplicator);
}
/**
* Creates a structure representing an etcd range. Used in permission grants
* and queries. This is a convenience method for `Etcd3.Range.from(...)`.

Просмотреть файл

@ -1,3 +1,5 @@
import { EventEmitter } from 'events';
import { ClientRuntimeError } from './errors';
export const zeroKey = Buffer.from([0]);
@ -150,6 +152,31 @@ export function forOwn<T>(obj: T, iterator: <K extends keyof T>(value: T[K], key
}
}
/**
* onceEvent returns a promise that resolves once any of the listed events
* fire on the emitter.
*/
export function onceEvent(emitter: EventEmitter, ...events: string[]): Promise<any> {
return new Promise((resolve, reject) => {
const teardown: (() => void)[] = [];
const handler = (data: any, event: string) => {
teardown.forEach(t => t());
if (event === 'error') {
reject(data);
} else {
resolve(data);
}
};
events.forEach(event => {
const fn = (data: any) => handler(data, event);
teardown.push(() => emitter.removeListener(event, fn));
emitter.once(event, fn);
});
});
}
/**
* PromiseWrap provides promise-like functions that auto-invoke an exec
* method when called.

215
src/watch.tla Normal file
Просмотреть файл

@ -0,0 +1,215 @@
---------------------------- MODULE Etcd3Watcher ----------------------------
EXTENDS TLC
(***************************************************************************)
(* Declarations of the GRPC connection state *)
(***************************************************************************)
CONSTANT Idle, Connected, Connecting
CONSTANT Unsubscribing, Unsubscribed, Subscribing, Subscribed
(***************************************************************************
--algorithm Watch {
variable socket = Idle, watcher = Unsubscribed;
(************************************************************************)
(* Small 'hack' to get around macro expansion only providing us access *)
(* to socket' rather than socket, which prevents us from verifying *)
(* transitions. *)
(************************************************************************)
define { prevWatcher == watcher }
macro CheckInvariants() {
assert socket \in {Idle, Connected, Connecting};
assert watcher \in {Subscribed, Subscribing, Unsubscribed, Unsubscribing};
assert socket = Connected \/ watcher # Subscribed;
if (prevWatcher = Unsubscribing) {
assert watcher \in {Unsubscribing, Unsubscribed}
} else if (prevWatcher = Subscribing) {
assert watcher \in {Subscribing, Subscribed}
}
}
(************************************************************************)
(* Watcher is the class attached to the manager that handles the state *)
(* of an individual subscription. It can requset to be subscribed or *)
(* unsubscribed at any time. *)
(************************************************************************)
process (Watcher = 0) { l0: while (TRUE) {
if (watcher = Subscribed) {
watcher := Unsubscribing
} else if (watcher = Unsubscribed) {
watcher := Subscribing
}
}
}
(************************************************************************)
(* The manager handles global socket state and reconnections. *)
(************************************************************************)
process (Manager = 1) { l1: while (TRUE) {
if (socket = Idle) {
goto connect
} else {
either goto connected or goto disconnect
};
disconnect:
(************************************************************)
(* Transitioning from "connected" or "connecting" to *)
(* "disonnectined". The watcher gets kicked off if they ask *)
(* to be, otherwise we mark it as reconnecting. *)
(************************************************************)
if (watcher \in {Unsubscribed, Unsubscribing}) {
watcher := Unsubscribed
} else {
watcher := Subscribing
};
socket := Idle;
CheckInvariants();
connect:
(************************************************************)
(* Transitioning from "disconnecting" to "connecting". If *)
(* the watcher wants to be unsubscribed, unsubscribe here! *)
(************************************************************)
assert socket = Idle;
if (watcher \in {Unsubscribed, Unsubscribing}) {
watcher := Unsubscribed
};
socket := Connecting;
CheckInvariants();
connected:
(************************************************************)
(* In contact the server, the watcher will be unsubscribed *)
(* if it asked to be, or connecting if it's subscribing. *)
(************************************************************)
socket := Connected;
if (watcher = Subscribing) {
watcher := Subscribed
} else if (watcher = Unsubscribing) {
watcher := Unsubscribed
};
CheckInvariants();
}
}
}
***************************************************************************)
\* BEGIN TRANSLATION
VARIABLES socket, watcher, pc
(* define statement *)
prevWatcher == watcher
vars == << socket, watcher, pc >>
ProcSet == {0} \cup {1}
Init == (* Global variables *)
/\ socket = Idle
/\ watcher = Unsubscribed
/\ pc = [self \in ProcSet |-> CASE self = 0 -> "l0"
[] self = 1 -> "l1"]
l0 == /\ pc[0] = "l0"
/\ IF watcher = Subscribed
THEN /\ watcher' = Unsubscribing
ELSE /\ IF watcher = Unsubscribed
THEN /\ watcher' = Subscribing
ELSE /\ TRUE
/\ UNCHANGED watcher
/\ pc' = [pc EXCEPT ![0] = "l0"]
/\ UNCHANGED socket
Watcher == l0
l1 == /\ pc[1] = "l1"
/\ IF socket = Idle
THEN /\ pc' = [pc EXCEPT ![1] = "connect"]
ELSE /\ \/ /\ pc' = [pc EXCEPT ![1] = "connected"]
\/ /\ pc' = [pc EXCEPT ![1] = "disconnect"]
/\ UNCHANGED << socket, watcher >>
disconnect == /\ pc[1] = "disconnect"
/\ IF watcher \in {Unsubscribed, Unsubscribing}
THEN /\ watcher' = Unsubscribed
ELSE /\ watcher' = Subscribing
/\ socket' = Idle
/\ Assert(socket' \in {Idle, Connected, Connecting},
"Failure of assertion at line 23, column 9 of macro called at line 71, column 13.")
/\ Assert(watcher' \in {Subscribed, Subscribing, Unsubscribed, Unsubscribing},
"Failure of assertion at line 24, column 9 of macro called at line 71, column 13.")
/\ Assert(socket' = Connected \/ watcher' # Subscribed,
"Failure of assertion at line 25, column 9 of macro called at line 71, column 13.")
/\ IF prevWatcher = Unsubscribing
THEN /\ Assert(watcher' \in {Unsubscribing, Unsubscribed},
"Failure of assertion at line 28, column 13 of macro called at line 71, column 13.")
ELSE /\ IF prevWatcher = Subscribing
THEN /\ Assert(watcher' \in {Subscribing, Subscribed},
"Failure of assertion at line 30, column 13 of macro called at line 71, column 13.")
ELSE /\ TRUE
/\ pc' = [pc EXCEPT ![1] = "connect"]
connect == /\ pc[1] = "connect"
/\ Assert(socket = Idle,
"Failure of assertion at line 78, column 13.")
/\ IF watcher \in {Unsubscribed, Unsubscribing}
THEN /\ watcher' = Unsubscribed
ELSE /\ TRUE
/\ UNCHANGED watcher
/\ socket' = Connecting
/\ Assert(socket' \in {Idle, Connected, Connecting},
"Failure of assertion at line 23, column 9 of macro called at line 83, column 13.")
/\ Assert(watcher' \in {Subscribed, Subscribing, Unsubscribed, Unsubscribing},
"Failure of assertion at line 24, column 9 of macro called at line 83, column 13.")
/\ Assert(socket' = Connected \/ watcher' # Subscribed,
"Failure of assertion at line 25, column 9 of macro called at line 83, column 13.")
/\ IF prevWatcher = Unsubscribing
THEN /\ Assert(watcher' \in {Unsubscribing, Unsubscribed},
"Failure of assertion at line 28, column 13 of macro called at line 83, column 13.")
ELSE /\ IF prevWatcher = Subscribing
THEN /\ Assert(watcher' \in {Subscribing, Subscribed},
"Failure of assertion at line 30, column 13 of macro called at line 83, column 13.")
ELSE /\ TRUE
/\ pc' = [pc EXCEPT ![1] = "connected"]
connected == /\ pc[1] = "connected"
/\ socket' = Connected
/\ IF watcher = Subscribing
THEN /\ watcher' = Subscribed
ELSE /\ IF watcher = Unsubscribing
THEN /\ watcher' = Unsubscribed
ELSE /\ TRUE
/\ UNCHANGED watcher
/\ Assert(socket' \in {Idle, Connected, Connecting},
"Failure of assertion at line 23, column 9 of macro called at line 97, column 13.")
/\ Assert(watcher' \in {Subscribed, Subscribing, Unsubscribed, Unsubscribing},
"Failure of assertion at line 24, column 9 of macro called at line 97, column 13.")
/\ Assert(socket' = Connected \/ watcher' # Subscribed,
"Failure of assertion at line 25, column 9 of macro called at line 97, column 13.")
/\ IF prevWatcher = Unsubscribing
THEN /\ Assert(watcher' \in {Unsubscribing, Unsubscribed},
"Failure of assertion at line 28, column 13 of macro called at line 97, column 13.")
ELSE /\ IF prevWatcher = Subscribing
THEN /\ Assert(watcher' \in {Subscribing, Subscribed},
"Failure of assertion at line 30, column 13 of macro called at line 97, column 13.")
ELSE /\ TRUE
/\ pc' = [pc EXCEPT ![1] = "l1"]
Manager == l1 \/ disconnect \/ connect \/ connected
Next == Watcher \/ Manager
Spec == Init /\ [][Next]_vars
\* END TRANSLATION
=============================================================================
\* Modification History
\* Last modified Thu Jun 15 21:12:44 PDT 2017 by Connor
\* Created Thu Jun 15 08:07:31 PDT 2017 by Connor

430
src/watch.ts Normal file
Просмотреть файл

@ -0,0 +1,430 @@
import { EventEmitter } from 'events';
import { castGrpcErrorMessage, ClientRuntimeError, EtcdError } from './errors';
import { Rangable, Range } from './range';
import * as RPC from './rpc';
import { NSApplicator, onceEvent, toBuffer } from './util';
const enum State {
Idle,
Connecting,
Connected,
}
/**
* The WatchManager is a singleton that exists in namespaces to handle watching
* multiple keys in a single GRPC stream. The underlying stream will only be
* alive if there's at least one watcher.
*
* This class is not exposed externally.
*/
export class WatchManager {
/**
* Current state of the watcher.
*/
private state = State.Idle;
/**
* The current GRPC stream, if any.
*/
private stream: null | RPC.IDuplexStream<RPC.IWatchRequest, RPC.IWatchResponse>;
/**
* List of attached watchers.
*/
private watchers: Watcher[] = [];
/**
* Set of watchers we're currently closing.
*/
private expectedClosers = new Set<Watcher>();
constructor(private readonly client: RPC.WatchClient) {}
/**
* Attach registers the watcher on the connection.
*/
public attach(watcher: Watcher) {
this.watchers.push(watcher);
switch (this.state) {
case State.Idle:
this.establishStream();
break;
case State.Connecting:
break;
case State.Connected:
this.getStream().write({ create_request: watcher.request });
break;
default:
throw new ClientRuntimeError(`Unknown watcher state ${this.state}`);
}
}
/**
* Detaches a watcher from the connection.
*/
public detach(watcher: Watcher): Promise<void> {
// If we aren't connected, just remove the watcher, easy.
if (this.state !== State.Connected) {
this.watchers = this.watchers.filter(w => w !== watcher);
return Promise.resolve();
}
// If we're awaiting an ID to come back, wait for that to happen or for
// us to lose connection, whichever happens first.
if (watcher.id === null) {
return onceEvent(watcher, 'connected', 'disconnected')
.then(() => this.detach(watcher));
}
// If the watcher does have an ID, mark that we expect to close it and
// run the cancellation request. The 'end' event will get fired when
// the cancellation comes back, or if we reconnect and see that we
// wanted to cancel the Watcher.
this.expectedClosers.add(watcher);
this.getStream().write({ cancel_request: { watch_id: watcher.id } });
return onceEvent(watcher, 'end').then(() => undefined);
}
/**
* Returns the current GRPC stream, *throwing* if we aren't in a state where
* we can get the stream. Calls here are only valid if state == Connected
*/
private getStream() {
if (this.state !== State.Connected) {
throw new ClientRuntimeError('Cannot call getStream() if state != Connected');
}
if (!this.stream) {
throw new ClientRuntimeError('Expected the watcher stream to exist while state == Connected');
}
return this.stream;
}
/**
* Establishes a GRPC watcher stream, if there are any active watcher.
*/
private establishStream() {
if (this.state !== State.Idle) {
throw new ClientRuntimeError('Cannot call establishStream() if state != Idle');
}
// possible we reconnect and watchers are removed in the meantime
if (this.watchers.length === 0) {
return;
}
// clear anyone who is in the process of closing, we won't re-add them
this.expectedClosers.forEach(watcher => {
this.watchers = this.watchers.filter(w => w !== watcher);
watcher.emit('end');
});
this.expectedClosers.clear();
this.state = State.Connecting;
this.client.watch()
.then(stream => {
this.state = State.Connected;
this.stream = stream
.on('data', res => this.handleResponse(res))
.on('error', err => this.handleError(err));
// possible watchers are remove while we're connecting.
if (this.watchers.length === 0) {
return this.destroyStream();
}
this.watchers.forEach(watcher => {
stream.write({ create_request: watcher.request });
});
})
.catch(err => this.handleError(err));
}
/**
* Closes the currently attached watcher stream.
*/
private destroyStream() {
if (this.state !== State.Connected) {
throw new ClientRuntimeError('Cannot call establishStream() if state != Connected');
}
if (this.watchers.length > 0) {
throw new ClientRuntimeError('Cannot call destroyStream() with active watchers');
}
this.getStream().end();
}
/**
* Handles an error emission on the current stream. Emits a message out to
* all attached watchers and tries to reconnect.
*/
private handleError(err: Error) {
if (this.state === State.Connected) {
this.getStream().end();
}
this.state = State.Idle;
this.watchers.forEach(watcher => {
watcher.emit('disconnected', err);
(<{ id: null }> watcher).id = null;
});
this.establishStream();
}
/**
* Handles a create response, assigning the watcher ID to the next pending
* watcher.
*/
private handleCreateResponse(res: RPC.IWatchResponse) {
// responses are in-order, the response we get will be for the first
// watcher that's waiting for its ID.
const target = this.watchers.find(watcher => watcher.id === null);
if (!target) {
throw new ClientRuntimeError('Could not find watcher corresponding to create response');
}
(<{ id: string }> target).id = res.watch_id;
target.emit('connected', res);
}
/**
* Handles a cancel update, gracefully closing the watcher if it's expected,
* or emitting an error if it's not.
*/
private handleCancelResponse(watcher: Watcher, res: RPC.IWatchResponse) {
this.watchers = this.watchers.filter(w => w !== watcher);
if (this.expectedClosers.has(watcher)) {
this.expectedClosers.delete(watcher);
watcher.emit('end');
return;
}
watcher.emit('error', castGrpcErrorMessage(`Watcher canceled: ${res.cancel_reason}`));
}
/**
* Emits a data update on the target watcher.
*/
private handleUpdateResponse(watcher: Watcher, res: RPC.IWatchResponse) {
watcher.emit('data', res);
}
/**
* Dispatches some watch response on the event stream.
*/
private handleResponse(res: RPC.IWatchResponse) {
if (res.created) {
this.handleCreateResponse(res);
return;
}
const watcher = this.watchers.find(w => w.id === res.watch_id);
if (!watcher) {
throw new ClientRuntimeError('Failed to find watcher for IWatchResponse');
}
if (!res.canceled) {
this.handleUpdateResponse(watcher, res);
return;
}
this.handleCancelResponse(watcher, res);
if (this.watchers.length === 0) {
this.destroyStream();
}
}
}
export const operationNames = {
put: RPC.FilterType.Nodelete,
delete: RPC.FilterType.Noput,
};
/**
* WatchBuilder is used for creating etcd watchers. The created watchers are
* resilient against disconnections, automatically resubscribing and replaying
* changes when reconnecting.
*
* ```
* const client = new Etcd3();
*
* client.watch()
* .key('foo')
* .create()
* .then(watcher => {
* watcher
* .on('disconnected', () => console.log('disconnected...'))
* .on('connected', () => console.log('successfully reconnected!'))
* .on('put', res => console.log('foo got set to:', res.value.toString());
* });
* ```
*/
export class WatchBuilder {
private request: RPC.IWatchCreateRequest = { progress_notify: true };
constructor(
private readonly manager: WatchManager,
private readonly namespace: NSApplicator,
) {}
/**
* Sets a single key to be watched.
*/
public key(key: string | Buffer): this {
this.request.key = toBuffer(key);
return this;
}
/**
* Prefix instructs the watcher to watch all keys with the given prefix.
*/
public prefix(value: string | Buffer): this {
return this.inRange(Range.prefix(value));
}
/**
* inRange instructs the builder to watch keys in the specified byte range.
*/
public inRange(r: Rangable): this {
const range = Range.from(r);
this.request.key = range.start;
this.request.range_end = range.end;
return this;
}
/**
* ignore omits certain operation kinds from the watch stream.
*/
public ignore(...operations: (keyof typeof operationNames)[]): this {
this.request.filters = operations.map(op => operationNames[op]);
return this;
}
/**
* Instructs the watcher to return the previous key/value pair in updates.
*/
public withPreviousKV(): this {
this.request.prev_kv = true;
return this;
}
/**
* Resolves the watch request into a Watcher, and fires off to etcd.
*/
public create(): Promise<Watcher> {
const watcher = new Watcher(
this.manager,
this.namespace,
this.namespace.applyToRequest(this.request),
);
return onceEvent(watcher, 'connected').then(() => watcher);
}
}
/**
* The Watcher encapsulates
*/
export class Watcher extends EventEmitter {
/**
* id is the watcher's ID in etcd. This is `null` initially and during
* reconnections, only populated while the watcher is idle.
*/
public readonly id: string | null = null;
constructor(
private readonly manager: WatchManager,
private readonly namespace: NSApplicator,
public readonly request: RPC.IWatchCreateRequest,
) {
super();
this.manager.attach(this);
this.on('data', changes => {
changes.events.forEach(ev => {
ev.kv.key = this.namespace.unprefix(ev.kv.key);
if (ev.prev_kv) {
ev.prev_kv.key = this.namespace.unprefix(ev.prev_kv.key);
}
this.emit(ev.type.toLowerCase(), ev.kv, ev.prev_kv);
});
this.request.start_revision = Number(changes.header.revision) + 1;
});
this.on('connected', changes => {
this.request.start_revision = Number(changes.header.revision) + 1;
});
}
/**
* connected is fired after etcd knowledges the watcher is connected.
* When this event is fired, `id` will already be populated.
*/
public on(event: 'connected', handler: (res: RPC.IWatchResponse) => void): this;
/**
* data is fired when etcd reports an update
* on one of the keys being watched.
*/
public on(event: 'data', handler: (res: RPC.IWatchResponse) => void): this;
/**
* put is fired, in addition to `data`, when a key is created
* or updated in etcd.
*/
public on(event: 'put', handler: (kv: RPC.IKeyValue, previous?: RPC.IKeyValue) => void): this;
/**
* put is fired, in addition to `data`, when a key is deleted from etcd.
*/
public on(event: 'delete', handler: (kv: RPC.IKeyValue, previous?: RPC.IKeyValue) => void): this;
/**
* end is fired after the watcher is closed normally. Like Node.js streams,
* end is NOT fired if `error` is fired.
*/
public on(event: 'end', handler: () => void): this;
/**
* disconnected is fired if the watcher is disconnected from etcd. The
* watcher will automatically attempt to reconnect when this occurs. When
* this event is fired, `id` will still be populated if it was previously.
*/
public on(event: 'disconnected', handler: (res: EtcdError) => void): this;
/**
* error is fired if a non-recoverable error occurs that prevents the watcher
* from functioning. This generally occurs if etcd unexpectedly canceled our
* lease, which can occur if (for example) we don't have permission to read
* the watched key or range. When this event is fired, `id` will still be
* populated if it was previously.
*/
public on(event: 'error', handler: (err: EtcdError) => void): this;
/**
* Implements EventEmitter.on(...).
*/
public on(event: string, handler: Function): this { // tslint:disable-line
return super.on(event, handler);
}
/**
* lastRevision returns the latest etcd cluster revision that this
* watcher observed. This will be `null` if the watcher has not yet
* connected.
*/
public lastRevision(): number | null {
return <number> this.request.start_revision;
}
/**
* Cancels the watcher.
*/
public cancel(): Promise<void> {
return this.manager.detach(this);
}
}

Просмотреть файл

@ -1,59 +1,14 @@
import { expect } from 'chai';
import * as sinon from 'sinon';
import {
Etcd3,
EtcdAuthenticationFailedError,
EtcdLeaseInvalidError,
EtcdLockFailedError,
EtcdPermissionDeniedError,
EtcdRoleExistsError,
EtcdRoleNotFoundError,
EtcdRoleNotGrantedError,
EtcdUserExistsError,
EtcdUserNotFoundError,
GRPCConnectFailedError,
Lease,
Namespace,
Role,
} from '../src';
import { getOptions } from './util';
function expectReject(promise: Promise<any>, err: { new (message: string): Error }) {
return promise
.then(() => { throw new Error('expected to reject'); })
.catch(actualErr => {
if (!(actualErr instanceof err)) {
console.error(actualErr.stack);
expect(actualErr).to.be.an.instanceof(err);
}
});
}
function wipeAll(things: Promise<{ delete(): any }[]>) {
return things.then(items => Promise.all(items.map(item => item.delete())));
}
import { Etcd3 } from '../src';
import { createTestClientAndKeys, tearDownTestClient } from './util';
describe('client', () => {
let client: Etcd3;
let badClient: Etcd3;
beforeEach(() => {
client = new Etcd3(getOptions());
badClient = new Etcd3(getOptions({ hosts: '127.0.0.1:1' }));
return Promise.all([
client.put('foo1').value('bar1'),
client.put('foo2').value('bar2'),
client.put('foo3').value('{"value":"bar3"}'),
client.put('baz').value('bar5'),
]);
});
afterEach(async () => {
await client.delete().all();
client.close();
badClient.close();
});
beforeEach(async () => (client = await createTestClientAndKeys()));
afterEach(async () => await tearDownTestClient(client));
it('allows mocking', async () => {
const mock = client.mock({
@ -67,496 +22,4 @@ describe('client', () => {
client.unmock();
expect(await client.get('foo1').string()).to.equal('bar1');
});
describe('namespacing', () => {
let ns: Namespace;
beforeEach(() => ns = client.namespace('user1/'));
const assertEqualInNamespace = async (key: string, value: string) => {
expect(await ns.get(key)).to.equal(value);
expect(await client.get(`user1/${key}`)).to.equal(value);
};
it('puts and gets values in the namespace', async () => {
await ns.put('foo').value('');
await assertEqualInNamespace('foo', '');
expect(await ns.getAll().strings()).to.deep.equal({ foo: '' });
});
it('deletes values in the namespace', async () => {
await ns.put('foo1').value('');
await ns.put('foo2').value('');
await ns.delete().key('foo1');
expect(await ns.getAll().strings()).to.deep.equal({ foo2: '' });
await ns.delete().all();
expect(await ns.getAll().strings()).to.deep.equal({});
expect(await client.getAll().keys()).to.have.length.greaterThan(0);
});
it('contains leases in the namespace', async () => {
const lease = ns.lease(100);
await lease.put('leased').value('');
await assertEqualInNamespace('leased', '');
await lease.revoke();
});
it('contains locks in the namespace', async () => {
const lock = ns.lock('mylock');
await lock.acquire();
expect(await ns.get('mylock')).to.not.be.null;
expect(await client.get('user1/mylock')).to.not.be.null;
await lock.release();
});
it('runs a simple if', async () => {
await ns.put('foo1').value('potatoes');
await ns.if('foo1', 'Value', '==', 'potatoes')
.then(ns.put('foo1').value('tomatoes'))
.commit();
await assertEqualInNamespace('foo1', 'tomatoes');
});
});
describe('get() / getAll()', () => {
it('lists all values', async () => {
expect(await client.getAll().strings()).to.deep.equal({
foo1: 'bar1',
foo2: 'bar2',
foo3: '{"value":"bar3"}',
baz: 'bar5',
});
});
it('gets single keys with various encoding', async () => {
expect(await client.get('foo1').string()).to.equal('bar1');
expect(await client.get('foo2').buffer()).to.deep.equal(Buffer.from('bar2'));
expect(await client.get('foo3').json()).to.deep.equal({ value: 'bar3' });
expect(await client.get('wut').string()).to.be.null;
});
it('queries prefixes', async () => {
expect(await client.getAll().prefix('fo').strings()).to.deep.equal({
foo1: 'bar1',
foo2: 'bar2',
foo3: '{"value":"bar3"}',
});
});
it('gets keys', async () => {
expect(await client.getAll().keys()).to.have.members(['foo1', 'foo2', 'foo3', 'baz']);
expect(await client.getAll().keyBuffers()).to.have.deep.members([
Buffer.from('foo1'),
Buffer.from('foo2'),
Buffer.from('foo3'),
Buffer.from('baz'),
]);
});
it('counts', async () => {
expect(await client.getAll().count()).to.equal(4);
});
it('sorts', async () => {
expect(await client.getAll()
.prefix('foo')
.sort('Key', 'Ascend')
.limit(2)
.keys(),
).to.deep.equal(['foo1', 'foo2']);
expect(await client.getAll()
.prefix('foo')
.sort('Key', 'Descend')
.limit(2)
.keys(),
).to.deep.equal(['foo3', 'foo2']);
});
});
describe('delete()', () => {
it('deletes all', async () => {
await client.delete().all();
expect(await client.getAll().count()).to.equal(0);
});
it('deletes prefix', async () => {
await client.delete().prefix('foo');
expect(await client.getAll().keys()).to.deep.equal(['baz']);
});
it('gets previous', async () => {
expect(await client.delete().key('foo1').getPrevious()).to.containSubset([
{
key: new Buffer('foo1'),
value: new Buffer('bar1'),
},
]);
});
describe('put()', () => {
it('allows touching key revisions', async () => {
const original = (await client.get('foo1').exec()).kvs[0].mod_revision;
await client.put('foo1').touch();
const updated = (await client.get('foo1').exec()).kvs[0].mod_revision;
expect(Number(updated)).to.be.greaterThan(Number(original));
});
it('updates key values', async () => {
await client.put('foo1').value('updated');
expect(await client.get('foo1').string()).to.equal('updated');
});
it('includes previous values', async () => {
expect(await client.put('foo1').value('updated').getPrevious()).to.containSubset({
key: new Buffer('foo1'),
value: new Buffer('bar1'),
});
});
});
});
describe('lease()', () => {
let lease: Lease;
const watchEmission = (event: string): { data: any, fired: boolean } => {
const output = { data: null, fired: false };
lease.once(event, (data: any) => {
output.data = data;
output.fired = true;
});
return output;
};
const onEvent = (event: string): Promise<any> => {
return new Promise(resolve => lease.once(event, (data: any) => resolve(data)));
};
afterEach(async () => {
if (lease && !lease.revoked()) {
await lease.revoke();
}
});
it('throws if trying to use too short of a ttl', () => {
expect(() => client.lease(0)).to.throw(/must be at least 1 second/);
});
it('reports a loss and errors if the client is invalid', async () => {
lease = badClient.lease(1);
const err = await onEvent('lost');
expect(err).to.be.an.instanceof(GRPCConnectFailedError);
await lease.grant()
.then(() => { throw new Error('expected to reject'); })
.catch(err2 => expect(err2).to.equal(err));
});
it('provides basic lease lifecycle', async () => {
lease = client.lease(100);
await lease.put('leased').value('foo');
expect((await client.get('leased').exec()).kvs[0].lease).to.equal(await lease.grant());
await lease.revoke();
expect(await client.get('leased').buffer()).to.be.null;
});
it('runs immediate keepalives', async () => {
lease = client.lease(100);
expect(await lease.keepaliveOnce()).to.containSubset({
ID: await lease.grant(),
TTL: '100',
});
await lease.keepaliveOnce();
});
it('emits a lost event if the lease is invalidated', async () => {
lease = client.lease(100);
let err: Error;
lease.on('lost', e => err = e);
expect(lease.revoked()).to.be.false;
await client.leaseClient.leaseRevoke({ ID: await lease.grant() });
await lease.keepaliveOnce()
.then(() => { throw new Error('expected to reject'); })
.catch(err2 => {
expect(err2).to.equal(err);
expect(err2).to.be.an.instanceof(EtcdLeaseInvalidError);
expect(lease.revoked()).to.be.true;
});
});
describe('crons', () => {
let clock: sinon.SinonFakeTimers;
beforeEach(async () => {
clock = sinon.useFakeTimers();
lease = client.lease(60);
await onEvent('keepaliveEstablished');
});
afterEach(() => clock.restore());
it('touches the lease ttl at the correct interval', async () => {
const kaFired = watchEmission('keepaliveFired');
clock.tick(19999);
expect(kaFired.fired).to.be.false;
clock.tick(1);
expect(kaFired.fired).to.be.true;
const res = await onEvent('keepaliveSucceeded');
expect(res.TTL).to.equal('60');
});
it('tears down if the lease gets revoked', async () => {
await client.leaseClient.leaseRevoke({ ID: await lease.grant() });
clock.tick(20000);
expect(await onEvent('lost')).to.be.an.instanceof(EtcdLeaseInvalidError);
expect(lease.revoked()).to.be.true;
});
});
describe('if()', () => {
it('runs a simple if', async () => {
await client.if('foo1', 'Value', '==', 'bar1')
.then(client.put('foo1').value('bar2'))
.commit();
expect(await client.get('foo1').string()).to.equal('bar2');
});
it('runs consequents', async () => {
await client.if('foo1', 'Value', '==', 'bar1')
.then(client.put('foo1').value('bar2'))
.else(client.put('foo1').value('bar3'))
.commit();
expect(await client.get('foo1').string()).to.equal('bar2');
});
it('runs multiple clauses and consequents', async () => {
const result = await client.if('foo1', 'Value', '==', 'bar1')
.and('foo2', 'Value', '==', 'wut')
.then(client.put('foo1').value('bar2'))
.else(client.put('foo1').value('bar3'), client.get('foo2'))
.commit();
expect(result.responses[1].response_range.kvs[0].value.toString())
.to.equal('bar2');
expect(await client.get('foo1').string()).to.equal('bar3');
});
});
describe('lock()', () => {
const assertCantLock = () => {
return client.lock('resource')
.acquire()
.then(() => { throw new Error('expected to throw'); })
.catch(err => expect(err).to.be.an.instanceof(EtcdLockFailedError));
};
const assertAbleToLock = async () => {
const lock = client.lock('resource');
await lock.acquire();
await lock.release();
};
it('locks exclusively around a resource', async () => {
const lock1 = client.lock('resource');
await lock1.acquire();
await assertCantLock();
await lock1.release();
await assertAbleToLock();
});
it('provides locking around functions', async () => {
await client.lock('resource').do(assertCantLock);
await assertAbleToLock();
});
});
});
describe('roles', () => {
afterEach(() => wipeAll(client.getRoles()));
const expectRoles = async (expected: string[]) => {
const list = await client.getRoles();
expect(list.map(r => r.name)).to.deep.equal(expected);
};
it('create and deletes', async () => {
const fooRole = await client.role('foo').create();
await expectRoles(['foo']);
await fooRole.delete();
await expectRoles([]);
});
it('throws on existing roles', async () => {
await client.role('foo').create();
await expectReject(client.role('foo').create(), EtcdRoleExistsError);
});
it('throws on deleting a non-existent role', async () => {
await expectReject(client.role('foo').delete(), EtcdRoleNotFoundError);
});
it('throws on granting permission to a non-existent role', async () => {
await expectReject(
client.role('foo').grant({
permission: 'Read',
range: client.range({ prefix: '111' }),
}),
EtcdRoleNotFoundError,
);
});
it('round trips permission grants', async () => {
const fooRole = await client.role('foo').create();
await fooRole.grant({
permission: 'Read',
range: client.range({ prefix: '111' }),
});
const perms = await fooRole.permissions();
expect(perms).to.containSubset([
{
permission: 'Read',
range: client.range({ prefix: '111' }),
},
]);
await fooRole.revoke(perms[0]);
expect(await fooRole.permissions()).to.have.length(0);
});
});
describe('users', () => {
let fooRole: Role;
beforeEach(async () => {
fooRole = client.role('foo');
await fooRole.create();
});
afterEach(async () => {
await fooRole.delete();
await wipeAll(client.getUsers());
});
it('creates users', async () => {
expect(await client.getUsers()).to.have.lengthOf(0);
await client.user('connor').create('password');
expect(await client.getUsers()).to.containSubset([{ name: 'connor' }]);
});
it('throws on existing users', async () => {
await client.user('connor').create('password');
await expectReject(client.user('connor').create('password'), EtcdUserExistsError);
});
it('throws on regranting the same role multiple times', async () => {
const user = await client.user('connor').create('password');
await expectReject(user.removeRole(fooRole), EtcdRoleNotGrantedError);
});
it('throws on granting a non-existent role', async () => {
const user = await client.user('connor').create('password');
await expectReject(user.addRole('wut'), EtcdRoleNotFoundError);
});
it('throws on deleting a non-existent user', async () => {
await expectReject(client.user('connor').delete(), EtcdUserNotFoundError);
});
it('round trips roles', async () => {
const user = await client.user('connor').create('password');
await user.addRole(fooRole);
expect(await user.roles()).to.containSubset([{ name: 'foo' }]);
await user.removeRole(fooRole);
expect(await user.roles()).to.have.lengthOf(0);
});
});
describe('password auth', () => {
beforeEach(async () => {
await wipeAll(client.getUsers());
await wipeAll(client.getRoles());
// We need to set up a root user and root role first, otherwise etcd
// will yell at us.
const rootUser = await client.user('root').create('password');
rootUser.addRole('root');
await client.user('connor').create('password');
const normalRole = await client.role('rw_prefix_f').create();
await normalRole.grant({
permission: 'Readwrite',
range: client.range({ prefix: 'f' }),
});
await normalRole.addUser('connor');
await client.auth.authEnable();
});
afterEach(async () => {
const rootClient = new Etcd3(getOptions({
auth: {
username: 'root',
password: 'password',
},
}));
await rootClient.auth.authDisable();
rootClient.close();
await wipeAll(client.getUsers());
await wipeAll(client.getRoles());
});
it('allows authentication using the correct credentials', async () => {
const authedClient = new Etcd3(getOptions({
auth: {
username: 'connor',
password: 'password',
},
}));
await authedClient.put('foo').value('bar');
authedClient.close();
});
it('rejects modifying a key the client has no access to', async () => {
const authedClient = new Etcd3(getOptions({
auth: {
username: 'connor',
password: 'password',
},
}));
await expectReject(
authedClient.put('wut').value('bar').exec(),
EtcdPermissionDeniedError,
);
authedClient.close();
});
it('throws when using incorrect credentials', async () => {
const authedClient = new Etcd3(getOptions({
auth: {
username: 'connor',
password: 'bad password',
},
}));
await expectReject(
authedClient.put('foo').value('bar').exec(),
EtcdAuthenticationFailedError,
);
authedClient.close();
});
});
});

122
test/crud.test.ts Normal file
Просмотреть файл

@ -0,0 +1,122 @@
import { expect } from 'chai';
import { Etcd3 } from '../src';
import { createTestClientAndKeys, tearDownTestClient } from './util';
describe('crud', () => {
let client: Etcd3;
beforeEach(async () => (client = await createTestClientAndKeys()));
afterEach(async () => await tearDownTestClient(client));
describe('get() / getAll()', () => {
it('lists all values', async () => {
expect(await client.getAll().strings()).to.deep.equal({
foo1: 'bar1',
foo2: 'bar2',
foo3: '{"value":"bar3"}',
baz: 'bar5',
});
});
it('gets single keys with various encoding', async () => {
expect(await client.get('foo1').string()).to.equal('bar1');
expect(await client.get('foo2').buffer()).to.deep.equal(
Buffer.from('bar2'),
);
expect(await client.get('foo3').json()).to.deep.equal({ value: 'bar3' });
expect(await client.get('wut').string()).to.be.null;
});
it('queries prefixes', async () => {
expect(await client.getAll().prefix('fo').strings()).to.deep.equal({
foo1: 'bar1',
foo2: 'bar2',
foo3: '{"value":"bar3"}',
});
});
it('gets keys', async () => {
expect(await client.getAll().keys()).to.have.members([
'foo1',
'foo2',
'foo3',
'baz',
]);
expect(await client.getAll().keyBuffers()).to.have.deep.members([
Buffer.from('foo1'),
Buffer.from('foo2'),
Buffer.from('foo3'),
Buffer.from('baz'),
]);
});
it('counts', async () => {
expect(await client.getAll().count()).to.equal(4);
});
it('sorts', async () => {
expect(
await client
.getAll()
.prefix('foo')
.sort('Key', 'Ascend')
.limit(2)
.keys(),
).to.deep.equal(['foo1', 'foo2']);
expect(
await client
.getAll()
.prefix('foo')
.sort('Key', 'Descend')
.limit(2)
.keys(),
).to.deep.equal(['foo3', 'foo2']);
});
});
describe('delete()', () => {
it('deletes all', async () => {
await client.delete().all();
expect(await client.getAll().count()).to.equal(0);
});
it('deletes prefix', async () => {
await client.delete().prefix('foo');
expect(await client.getAll().keys()).to.deep.equal(['baz']);
});
it('gets previous', async () => {
expect(await client.delete().key('foo1').getPrevious()).to.containSubset([
{
key: new Buffer('foo1'),
value: new Buffer('bar1'),
},
]);
});
describe('put()', () => {
it('allows touching key revisions', async () => {
const original = (await client.get('foo1').exec()).kvs[0].mod_revision;
await client.put('foo1').touch();
const updated = (await client.get('foo1').exec()).kvs[0].mod_revision;
expect(Number(updated)).to.be.greaterThan(Number(original));
});
it('updates key values', async () => {
await client.put('foo1').value('updated');
expect(await client.get('foo1').string()).to.equal('updated');
});
it('includes previous values', async () => {
expect(
await client.put('foo1').value('updated').getPrevious(),
).to.containSubset({
key: new Buffer('foo1'),
value: new Buffer('bar1'),
});
});
});
});
});

156
test/lease.test.ts Normal file
Просмотреть файл

@ -0,0 +1,156 @@
import { expect } from 'chai';
import * as sinon from 'sinon';
import {
Etcd3,
EtcdLeaseInvalidError,
GRPCConnectFailedError,
Lease,
} from '../src';
import { onceEvent } from '../src/util';
import {
createTestClientAndKeys,
getOptions,
proxy,
tearDownTestClient,
} from './util';
describe('lease()', () => {
let client: Etcd3;
let lease: Lease;
beforeEach(async () => (client = await createTestClientAndKeys()));
afterEach(async () => await tearDownTestClient(client));
const watchEmission = (event: string): { data: any; fired: boolean } => {
const output = { data: null, fired: false };
lease.once(event, (data: any) => {
output.data = data;
output.fired = true;
});
return output;
};
afterEach(async () => {
if (lease && !lease.revoked()) {
await lease.revoke();
}
});
it('throws if trying to use too short of a ttl', () => {
expect(() => client.lease(0)).to.throw(/must be at least 1 second/);
});
it('reports a loss and errors if the client is invalid', async () => {
const badClient = new Etcd3(getOptions({ hosts: '127.0.0.1:1' }));
lease = badClient.lease(1);
const err = await onceEvent(lease, 'lost');
expect(err).to.be.an.instanceof(GRPCConnectFailedError);
await lease
.grant()
.then(() => {
throw new Error('expected to reject');
})
.catch(err2 => expect(err2).to.equal(err));
badClient.close();
});
it('provides basic lease lifecycle', async () => {
lease = client.lease(100);
await lease.put('leased').value('foo');
expect((await client.get('leased').exec()).kvs[0].lease).to.equal(
await lease.grant(),
);
await lease.revoke();
expect(await client.get('leased').buffer()).to.be.null;
});
it('runs immediate keepalives', async () => {
lease = client.lease(100);
expect(await lease.keepaliveOnce()).to.containSubset({
ID: await lease.grant(),
TTL: '100',
});
await lease.keepaliveOnce();
});
it('is resilient to network interruptions', async () => {
await proxy.activate();
const proxiedClient = new Etcd3(getOptions());
lease = proxiedClient.lease(100);
await lease.grant();
proxy.pause();
await onceEvent(lease, 'keepaliveFailed');
proxy.resume();
await onceEvent(lease, 'keepaliveSucceeded');
await lease.revoke();
proxiedClient.close();
proxy.deactivate();
});
it('marks leases as failed if the server is not contacted for a while', async () => {
await proxy.activate();
const proxiedClient = new Etcd3(getOptions());
lease = proxiedClient.lease(1);
await lease.grant();
proxy.pause();
(<any>lease).lastKeepAlive = Date.now() - 2000; // speed things up a little
const err = await onceEvent(lease, 'lost');
expect(err.message).to.match(/our lease has expired/);
proxiedClient.close();
proxy.deactivate();
});
it('emits a lost event if the lease is invalidated', async () => {
lease = client.lease(100);
let err: Error;
lease.on('lost', e => (err = e));
expect(lease.revoked()).to.be.false;
await client.leaseClient.leaseRevoke({ ID: await lease.grant() });
await lease
.keepaliveOnce()
.then(() => {
throw new Error('expected to reject');
})
.catch(err2 => {
expect(err2).to.equal(err);
expect(err2).to.be.an.instanceof(EtcdLeaseInvalidError);
expect(lease.revoked()).to.be.true;
});
});
describe('crons', () => {
let clock: sinon.SinonFakeTimers;
beforeEach(async () => {
clock = sinon.useFakeTimers();
lease = client.lease(60);
await onceEvent(lease, 'keepaliveEstablished');
});
afterEach(() => clock.restore());
it('touches the lease ttl at the correct interval', async () => {
const kaFired = watchEmission('keepaliveFired');
clock.tick(19999);
expect(kaFired.fired).to.be.false;
clock.tick(1);
expect(kaFired.fired).to.be.true;
const res = await onceEvent(lease, 'keepaliveSucceeded');
expect(res.TTL).to.equal('60');
});
it('tears down if the lease gets revoked', async () => {
await client.leaseClient.leaseRevoke({ ID: await lease.grant() });
clock.tick(20000);
expect(await onceEvent(lease, 'lost')).to.be.an.instanceof(EtcdLeaseInvalidError);
expect(lease.revoked()).to.be.true;
});
});
});

42
test/lock.test.ts Normal file
Просмотреть файл

@ -0,0 +1,42 @@
import { expect } from 'chai';
import { Etcd3, EtcdLockFailedError } from '../src';
import { createTestClientAndKeys, tearDownTestClient } from './util';
describe('lock()', () => {
let client: Etcd3;
beforeEach(async () => (client = await createTestClientAndKeys()));
afterEach(async () => await tearDownTestClient(client));
const assertCantLock = () => {
return client
.lock('resource')
.acquire()
.then(() => {
throw new Error('expected to throw');
})
.catch(err => expect(err).to.be.an.instanceof(EtcdLockFailedError));
};
const assertAbleToLock = async () => {
const lock = client.lock('resource');
await lock.acquire();
await lock.release();
};
it('locks exclusively around a resource', async () => {
const lock1 = client.lock('resource');
await lock1.acquire();
await assertCantLock();
await lock1.release();
await assertAbleToLock();
});
it('provides locking around functions', async () => {
await client.lock('resource').do(assertCantLock);
await assertAbleToLock();
});
});

64
test/namespacing.test.ts Normal file
Просмотреть файл

@ -0,0 +1,64 @@
import { expect } from 'chai';
import { Etcd3, Namespace } from '../src';
import { createTestClientAndKeys, tearDownTestClient } from './util';
describe('namespacing', () => {
let client: Etcd3;
let ns: Namespace;
beforeEach(async () => {
client = await createTestClientAndKeys();
ns = client.namespace('user1/');
});
afterEach(async () => await tearDownTestClient(client));
const assertEqualInNamespace = async (key: string, value: string) => {
expect(await ns.get(key)).to.equal(value);
expect(await client.get(`user1/${key}`)).to.equal(value);
};
it('puts and gets values in the namespace', async () => {
await ns.put('foo').value('');
await assertEqualInNamespace('foo', '');
expect(await ns.getAll().strings()).to.deep.equal({ foo: '' });
});
it('deletes values in the namespace', async () => {
await ns.put('foo1').value('');
await ns.put('foo2').value('');
await ns.delete().key('foo1');
expect(await ns.getAll().strings()).to.deep.equal({ foo2: '' });
await ns.delete().all();
expect(await ns.getAll().strings()).to.deep.equal({});
expect(await client.getAll().keys()).to.have.length.greaterThan(0);
});
it('contains leases in the namespace', async () => {
const lease = ns.lease(100);
await lease.put('leased').value('');
await assertEqualInNamespace('leased', '');
await lease.revoke();
});
it('contains locks in the namespace', async () => {
const lock = ns.lock('mylock');
await lock.acquire();
expect(await ns.get('mylock')).to.not.be.null;
expect(await client.get('user1/mylock')).to.not.be.null;
await lock.release();
});
it('runs a simple if', async () => {
await ns.put('foo1').value('potatoes');
await ns
.if('foo1', 'Value', '==', 'potatoes')
.then(ns.put('foo1').value('tomatoes'))
.commit();
await assertEqualInNamespace('foo1', 'tomatoes');
});
});

222
test/roles.test.ts Normal file
Просмотреть файл

@ -0,0 +1,222 @@
import { expect } from 'chai';
import {
Etcd3,
EtcdAuthenticationFailedError,
EtcdPermissionDeniedError,
EtcdRoleExistsError,
EtcdRoleNotFoundError,
EtcdRoleNotGrantedError,
EtcdUserExistsError,
EtcdUserNotFoundError,
Role,
} from '../src';
import {
createTestClientAndKeys,
expectReject,
getOptions,
tearDownTestClient,
} from './util';
function wipeAll(things: Promise<{ delete(): any }[]>) {
return things.then(items => Promise.all(items.map(item => item.delete())));
}
describe('roles and auth', () => {
let client: Etcd3;
beforeEach(async () => (client = await createTestClientAndKeys()));
afterEach(async () => await tearDownTestClient(client));
describe('management', () => {
afterEach(() => wipeAll(client.getRoles()));
const expectRoles = async (expected: string[]) => {
const list = await client.getRoles();
expect(list.map(r => r.name)).to.deep.equal(expected);
};
it('create and deletes', async () => {
const fooRole = await client.role('foo').create();
await expectRoles(['foo']);
await fooRole.delete();
await expectRoles([]);
});
it('throws on existing roles', async () => {
await client.role('foo').create();
await expectReject(client.role('foo').create(), EtcdRoleExistsError);
});
it('throws on deleting a non-existent role', async () => {
await expectReject(client.role('foo').delete(), EtcdRoleNotFoundError);
});
it('throws on granting permission to a non-existent role', async () => {
await expectReject(
client.role('foo').grant({
permission: 'Read',
range: client.range({ prefix: '111' }),
}),
EtcdRoleNotFoundError,
);
});
it('round trips permission grants', async () => {
const fooRole = await client.role('foo').create();
await fooRole.grant({
permission: 'Read',
range: client.range({ prefix: '111' }),
});
const perms = await fooRole.permissions();
expect(perms).to.containSubset([
{
permission: 'Read',
range: client.range({ prefix: '111' }),
},
]);
await fooRole.revoke(perms[0]);
expect(await fooRole.permissions()).to.have.length(0);
});
});
describe('users', () => {
let fooRole: Role;
beforeEach(async () => {
fooRole = client.role('foo');
await fooRole.create();
});
afterEach(async () => {
await fooRole.delete();
await wipeAll(client.getUsers());
});
it('creates users', async () => {
expect(await client.getUsers()).to.have.lengthOf(0);
await client.user('connor').create('password');
expect(await client.getUsers()).to.containSubset([{ name: 'connor' }]);
});
it('throws on existing users', async () => {
await client.user('connor').create('password');
await expectReject(
client.user('connor').create('password'),
EtcdUserExistsError,
);
});
it('throws on regranting the same role multiple times', async () => {
const user = await client.user('connor').create('password');
await expectReject(user.removeRole(fooRole), EtcdRoleNotGrantedError);
});
it('throws on granting a non-existent role', async () => {
const user = await client.user('connor').create('password');
await expectReject(user.addRole('wut'), EtcdRoleNotFoundError);
});
it('throws on deleting a non-existent user', async () => {
await expectReject(client.user('connor').delete(), EtcdUserNotFoundError);
});
it('round trips roles', async () => {
const user = await client.user('connor').create('password');
await user.addRole(fooRole);
expect(await user.roles()).to.containSubset([{ name: 'foo' }]);
await user.removeRole(fooRole);
expect(await user.roles()).to.have.lengthOf(0);
});
});
describe('password auth', () => {
beforeEach(async () => {
await wipeAll(client.getUsers());
await wipeAll(client.getRoles());
// We need to set up a root user and root role first, otherwise etcd
// will yell at us.
const rootUser = await client.user('root').create('password');
rootUser.addRole('root');
await client.user('connor').create('password');
const normalRole = await client.role('rw_prefix_f').create();
await normalRole.grant({
permission: 'Readwrite',
range: client.range({ prefix: 'f' }),
});
await normalRole.addUser('connor');
await client.auth.authEnable();
});
afterEach(async () => {
const rootClient = new Etcd3(
getOptions({
auth: {
username: 'root',
password: 'password',
},
}),
);
await rootClient.auth.authDisable();
rootClient.close();
await wipeAll(client.getUsers());
await wipeAll(client.getRoles());
});
it('allows authentication using the correct credentials', async () => {
const authedClient = new Etcd3(
getOptions({
auth: {
username: 'connor',
password: 'password',
},
}),
);
await authedClient.put('foo').value('bar');
authedClient.close();
});
it('rejects modifying a key the client has no access to', async () => {
const authedClient = new Etcd3(
getOptions({
auth: {
username: 'connor',
password: 'password',
},
}),
);
await expectReject(
authedClient.put('wut').value('bar').exec(),
EtcdPermissionDeniedError,
);
authedClient.close();
});
it('throws when using incorrect credentials', async () => {
const authedClient = new Etcd3(
getOptions({
auth: {
username: 'connor',
password: 'bad password',
},
}),
);
await expectReject(
authedClient.put('foo').value('bar').exec(),
EtcdAuthenticationFailedError,
);
authedClient.close();
});
});
});

44
test/transactions.test.ts Normal file
Просмотреть файл

@ -0,0 +1,44 @@
import { expect } from 'chai';
import { Etcd3 } from '../src';
import { createTestClientAndKeys, tearDownTestClient } from './util';
describe('transactions', () => {
let client: Etcd3;
beforeEach(async () => (client = await createTestClientAndKeys()));
afterEach(async () => await tearDownTestClient(client));
it('runs a simple if', async () => {
await client
.if('foo1', 'Value', '==', 'bar1')
.then(client.put('foo1').value('bar2'))
.commit();
expect(await client.get('foo1').string()).to.equal('bar2');
});
it('runs consequents', async () => {
await client
.if('foo1', 'Value', '==', 'bar1')
.then(client.put('foo1').value('bar2'))
.else(client.put('foo1').value('bar3'))
.commit();
expect(await client.get('foo1').string()).to.equal('bar2');
});
it('runs multiple clauses and consequents', async () => {
const result = await client
.if('foo1', 'Value', '==', 'bar1')
.and('foo2', 'Value', '==', 'wut')
.then(client.put('foo1').value('bar2'))
.else(client.put('foo1').value('bar3'), client.get('foo2'))
.commit();
expect(result.responses[1].response_range.kvs[0].value.toString()).to.equal(
'bar2',
);
expect(await client.get('foo1').string()).to.equal('bar3');
});
});

154
test/watch.test.ts Normal file
Просмотреть файл

@ -0,0 +1,154 @@
import { expect } from 'chai';
import { Etcd3, IKeyValue, IWatchResponse, Watcher } from '../src';
import { onceEvent } from '../src/util';
import {
createTestClientAndKeys,
getOptions,
proxy,
tearDownTestClient,
} from './util';
describe('watch', () => {
let client: Etcd3;
beforeEach(async () => {
client = new Etcd3(getOptions());
});
afterEach(async () => {
await tearDownTestClient(client);
});
/**
* Returns the list of watchers currently attached and listening.
*/
function getWatchers(): Watcher[] {
return (<any>client).watchManager.watchers;
}
/**
* Checks that the watcher is getting updates for the given key.
*/
function expectWatching(watcher: Watcher, key: string): Promise<Watcher> {
return Promise.all([
client.put(key).value('updated!'),
onceEvent(watcher, 'put').then((res: IKeyValue) => {
expect(res.key.toString()).to.equal(key);
expect(res.value.toString()).to.equal('updated!');
}),
]).then(() => watcher);
}
/**
* Checks that the watcher is not getting updates for the given key.
*/
async function expectNotWatching(
watcher: Watcher,
key: string,
): Promise<Watcher> {
let watching = false;
const listener = () => (watching = true);
watcher.on('put', listener);
await client.put(key).value('updated!');
return new Promise<Watcher>(resolve => {
setTimeout(() => {
expect(watching).to.equal(false, `expected not to be watching ${key}`);
resolve(watcher);
}, 200);
});
}
it('is resilient to network interruptions', async () => {
await proxy.activate();
const proxiedClient = await createTestClientAndKeys();
const watcher = await proxiedClient.watch().key('foo1').create();
proxy.pause();
await onceEvent(watcher, 'disconnected');
proxy.resume();
await onceEvent(watcher, 'connected');
await expectWatching(watcher, 'foo1');
proxiedClient.close();
proxy.deactivate();
});
it('replays historical updates.', async () => {
await proxy.activate();
const proxiedClient = await createTestClientAndKeys();
const watcher = await proxiedClient.watch().key('foo1').create();
await Promise.all([
client.put('foo1').value('update 1'),
onceEvent(watcher, 'data').then((res: IWatchResponse) => {
expect(watcher.request.start_revision).to.equal( 1 + Number(res.header.revision));
}),
]);
proxy.pause();
await onceEvent(watcher, 'disconnected');
await client.put('foo1').value('update 2');
proxy.resume();
await onceEvent(watcher, 'put').then((res: IKeyValue) => {
expect(res.key.toString()).to.equal('foo1');
expect(res.value.toString()).to.equal('update 2');
});
proxiedClient.close();
proxy.deactivate();
});
describe('subscription', () => {
it('subscribes before the connection is established', async () => {
const watcher = await client.watch().key('foo1').create();
await expectWatching(watcher, 'foo1');
expect(getWatchers()).to.deep.equal([watcher]);
});
it('subscribes while the connection is still being established', async () => {
const watcher1 = client.watch().key('foo1').create();
const watcher2 = client.watch().key('bar').create();
const watchers = await Promise.all([
watcher1.then(w => expectWatching(w, 'foo1')),
watcher2.then(w => expectWatching(w, 'bar')),
]);
expect(getWatchers()).to.deep.equal(watchers);
});
it('subscribes after the connection is fully established', async () => {
const watcher1 = await client.watch().key('foo1').create();
await expectWatching(watcher1, 'foo1');
const watcher2 = await client.watch().key('bar').create();
await expectWatching(watcher2, 'bar');
expect(getWatchers()).to.deep.equal([watcher1, watcher2]);
});
});
describe('unsubscribing', () => {
it('unsubscribes while the connection is established', async () => {
const watcher = await client.watch().key('foo1').create();
await watcher.cancel();
await expectNotWatching(watcher, 'foo1');
expect(getWatchers()).to.deep.equal([]);
});
it('unsubscribes while the connection is being reestablished', async () => {
await proxy.activate();
const proxiedClient = await createTestClientAndKeys();
const watcher = await proxiedClient.watch().key('foo1').create();
proxy.pause();
await watcher.cancel();
proxy.resume();
expect(getWatchers()).to.deep.equal([]);
proxiedClient.close();
proxy.deactivate();
});
});
});