From e6eaff250ad0c6ade82906f9cec25bdfe2bd724b Mon Sep 17 00:00:00 2001 From: Connor Peet Date: Wed, 17 Jun 2020 22:34:31 -0700 Subject: [PATCH] fix: mark leases as lost if the watch connection is alive but etcd is unresponsive Fixes #110 --- changelog.md | 3 ++- src/lease.ts | 17 +++++++++++++++-- src/test/lease.test.ts | 39 ++++++++++++++++++++++++++++++++++----- src/test/util.ts | 40 +++++++++++++++++++++++++++++++++------- src/test/watch.test.ts | 16 ++++++++-------- src/util.ts | 21 +++++++++++++++++++++ 6 files changed, 113 insertions(+), 23 deletions(-) diff --git a/changelog.md b/changelog.md index 5ef7bba..814bfd6 100644 --- a/changelog.md +++ b/changelog.md @@ -8,8 +8,9 @@ Thank you to [@pauliusuza](https://github.com/pauliusuza) for his help updating everything - **feat**: add `SingleRangeBuilder.exists()` that returns if the given key exists -- **fix**: errors when creating watchers not being handled correctly (see [#114](https://github.com/microsoft/etcd3/issues/114)) - **feat**: allow apply call options to authentication token exchange (see [#111](https://github.com/microsoft/etcd3/issues/111)) +- **fix**: errors when creating watchers not being handled correctly (see [#114](https://github.com/microsoft/etcd3/issues/114)) +- **fix**: mark leases as lost if the watch connection is alive but etcd is unresponsive (see [#110](https://github.com/microsoft/etcd3/issues/110)) ## 0.2.13 2019-07-03 diff --git a/src/lease.ts b/src/lease.ts index 1642c4d..e86373b 100644 --- a/src/lease.ts +++ b/src/lease.ts @@ -8,7 +8,7 @@ import { PutBuilder } from './builder'; import { ConnectionPool, Host } from './connection-pool'; import { castGrpcError, EtcdError, EtcdLeaseInvalidError, GRPCConnectFailedError } from './errors'; import * as RPC from './rpc'; -import { NSApplicator } from './util'; +import { NSApplicator, debounce } from './util'; function throwIfError(value: T | Error): T { if (value instanceof Error) { @@ -293,14 +293,22 @@ export class Lease extends EventEmitter { return stream.end(); } - const keepaliveTimer = setInterval(() => this.fireKeepAlive(stream), (1000 * this.ttl) / 3); + // this is what the official Go client uses, good enough: + const keepAliveInterval = (1000 * this.ttl) / 3; + const keepaliveTimer = setInterval(() => this.fireKeepAlive(stream), keepAliveInterval); + const keepAliveTimeout = debounce(1000 * this.ttl, () => + this.handleKeepaliveError(new GRPCConnectFailedError('GRPC watch stream has timed out.')), + ); this.teardown = () => { this.teardown = () => undefined; + keepAliveTimeout.cancel(); clearInterval(keepaliveTimer); stream.end(); }; + keepAliveTimeout(); // start the debounce + stream .on('error', err => this.handleKeepaliveError(err)) .on('data', res => { @@ -309,6 +317,7 @@ export class Lease extends EventEmitter { } this.lastKeepAlive = Date.now(); + keepAliveTimeout(); this.emit('keepaliveSucceeded', res); }); @@ -326,6 +335,10 @@ export class Lease extends EventEmitter { } private handleKeepaliveError(err: Error) { + if (this.state === State.Revoked) { + return; // often write-after-end, or something along those lines + } + this.emit('keepaliveFailed', castGrpcError(err)); this.teardown(); diff --git a/src/test/lease.test.ts b/src/test/lease.test.ts index 7498b99..fee3449 100644 --- a/src/test/lease.test.ts +++ b/src/test/lease.test.ts @@ -5,8 +5,14 @@ import { expect } from 'chai'; import * as sinon from 'sinon'; import { Etcd3, EtcdLeaseInvalidError, GRPCConnectFailedError, Lease } from '..'; -import { onceEvent } from '../util'; -import { createTestClientAndKeys, getOptions, proxy, tearDownTestClient } from './util'; +import { onceEvent, delay } from '../util'; +import { + createTestClientAndKeys, + getOptions, + proxy, + tearDownTestClient, + TrafficDirection, +} from './util'; describe('lease()', () => { let client: Etcd3; @@ -87,9 +93,9 @@ describe('lease()', () => { lease = proxiedClient.lease(100); await lease.grant(); - proxy.pause(); + proxy.suspend(); await onceEvent(lease, 'keepaliveFailed'); - proxy.resume(); + proxy.unsuspend(); await onceEvent(lease, 'keepaliveSucceeded'); await lease.revoke(); @@ -103,7 +109,7 @@ describe('lease()', () => { lease = proxiedClient.lease(1); await lease.grant(); - proxy.pause(); + proxy.suspend(); (lease as any).lastKeepAlive = Date.now() - 2000; // speed things up a little const err = await onceEvent(lease, 'lost'); expect(err.message).to.match(/our lease has expired/); @@ -178,6 +184,29 @@ describe('lease()', () => { expect(kaFired.fired).to.be.false; }); + it('marks leases as failed if etcd does not respond to keepalives in time (#110)', async () => { + await lease.revoke(); + + await proxy.activate(); + const proxiedClient = new Etcd3(getOptions()); + lease = proxiedClient.lease(1); + await lease.grant(); + proxy.pause(TrafficDirection.FromEtcd); + + const failedEvent = watchEmission('keepaliveFailed'); + clock.tick(50000); + await Promise.resolve(); // drain task queues + + expect(failedEvent.fired).to.be.false; + clock.tick(10000); + await Promise.resolve(); // drain task queues + + expect(failedEvent.fired).to.be.true; + lease.release(); + proxiedClient.close(); + await proxy.deactivate(); + }); + it('tears down if the lease gets revoked', async () => { await client.leaseClient.leaseRevoke({ ID: await lease.grant() }); clock.tick(20000); diff --git a/src/test/util.ts b/src/test/util.ts index 81412c9..0fe041b 100644 --- a/src/test/util.ts +++ b/src/test/util.ts @@ -16,6 +16,11 @@ const tlsKey = fs.readFileSync(`${rootPath}/src/test/certs/private/etcd0.localho const etcdSourceAddress = process.env.ETCD_ADDR || '127.0.0.1:2379'; const [etcdSourceHost, etcdSourcePort] = etcdSourceAddress.split(':'); +export const enum TrafficDirection { + ToEtcd, + FromEtcd, +} + export const etcdVersion = process.env.ETCD_VERSION || '3.3.9'; /** @@ -29,6 +34,7 @@ export class Proxy { private server: tls.Server; private host: string; private port: number; + private enabledDataFlows = new Set([TrafficDirection.FromEtcd, TrafficDirection.ToEtcd]); /** * activate creates the proxy server. @@ -54,11 +60,11 @@ export class Proxy { } /** - * pause temporarily shuts down the server, but does not 'deactivate' the + * suspend temporarily shuts down the server, but does not 'deactivate' the * proxy; new connections will still try to hit it. Can be restored with * resume(). */ - public pause() { + public suspend() { this.server.close(); this.connections.forEach(cnx => cnx.end()); this.connections = []; @@ -67,10 +73,24 @@ export class Proxy { /** * Starts up a previously stopped server. */ - public resume() { + public unsuspend() { this.server.listen(this.port, this.host); } + /** + * Disables data flowing in one direction on the connection. + */ + public pause(direction: TrafficDirection) { + this.enabledDataFlows.delete(direction); + } + + /** + * Reenables data flow on the connection. + */ + public resume(direction: TrafficDirection) { + this.enabledDataFlows.add(direction); + } + /** * Destroys a previously-active proxy server. */ @@ -118,17 +138,23 @@ export class Proxy { }; serverCnx.on('data', (data: Buffer) => { - if (!ended) { - clientCnx.write(data); + if (ended || !this.enabledDataFlows.has(TrafficDirection.FromEtcd)) { + return; } + + clientCnx.write(data); }); serverCnx.on('end', end); serverCnx.on('error', end); clientCnx.on('data', (data: Buffer) => { - if (serverConnected && !ended) { + if (ended || !this.enabledDataFlows.has(TrafficDirection.ToEtcd)) { + return; + } + + if (serverConnected) { serverCnx.write(data); - } else if (!ended) { + } else { serverBuffer.push(data); } }); diff --git a/src/test/watch.test.ts b/src/test/watch.test.ts index 06c15bd..dafa3c1 100644 --- a/src/test/watch.test.ts +++ b/src/test/watch.test.ts @@ -71,9 +71,9 @@ describe('watch()', () => { const watcher = await proxiedClient.watch().key('foo1').create(); - proxy.pause(); + proxy.suspend(); await onceEvent(watcher, 'disconnected'); - proxy.resume(); + proxy.unsuspend(); await onceEvent(watcher, 'connected'); await expectWatching(watcher, 'foo1'); @@ -98,9 +98,9 @@ describe('watch()', () => { }), ]); - proxy.pause(); + proxy.suspend(); await onceEvent(watcher, 'disconnected'); - proxy.resume(); + proxy.unsuspend(); await onceEvent(watcher, 'put').then((res: IKeyValue) => { expect(res.key.toString()).to.equal('foo1'); expect(res.value.toString()).to.equal('update 2'); @@ -115,11 +115,11 @@ describe('watch()', () => { const proxiedClient = await createTestClientAndKeys(); const watcher = await proxiedClient.watch().key('foo1').create(); - proxy.pause(); + proxy.suspend(); await onceEvent(watcher, 'disconnected'); const actualRevision = Number(watcher.request.start_revision); watcher.request.start_revision = 999999; - proxy.resume(); + proxy.unsuspend(); await onceEvent(watcher, 'connected'); expect(Number(watcher.request.start_revision)).to.equal(actualRevision); }); @@ -213,10 +213,10 @@ describe('watch()', () => { const proxiedClient = await createTestClientAndKeys(); const watcher = await proxiedClient.watch().key('foo1').create(); - proxy.pause(); + proxy.suspend(); await watcher.cancel(); - proxy.resume(); + proxy.unsuspend(); expect(getWatchers()).to.deep.equal([]); proxiedClient.close(); diff --git a/src/util.ts b/src/util.ts index 22cf7f5..83d536d 100644 --- a/src/util.ts +++ b/src/util.ts @@ -189,6 +189,27 @@ export function onceEvent(emitter: EventEmitter, ...events: string[]): Promise void) { + let timeout: NodeJS.Timeout | undefined; + + const wrapper = () => { + wrapper.cancel(); + timeout = setTimeout(fn, duration); + }; + + wrapper.cancel = () => { + if (timeout) { + clearTimeout(timeout); + timeout = undefined; + } + }; + + return wrapper; +} + /** * PromiseWrap provides promise-like functions that auto-invoke an exec * method when called.