fix: mark leases as lost if the watch connection is alive but etcd is unresponsive

Fixes #110
This commit is contained in:
Connor Peet 2020-06-17 22:34:31 -07:00
Родитель d50fb8a270
Коммит e6eaff250a
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: CF8FD2EA0DBC61BD
6 изменённых файлов: 113 добавлений и 23 удалений

Просмотреть файл

@ -8,8 +8,9 @@
Thank you to [@pauliusuza](https://github.com/pauliusuza) for his help updating everything
- **feat**: add `SingleRangeBuilder.exists()` that returns if the given key exists
- **fix**: errors when creating watchers not being handled correctly (see [#114](https://github.com/microsoft/etcd3/issues/114))
- **feat**: allow apply call options to authentication token exchange (see [#111](https://github.com/microsoft/etcd3/issues/111))
- **fix**: errors when creating watchers not being handled correctly (see [#114](https://github.com/microsoft/etcd3/issues/114))
- **fix**: mark leases as lost if the watch connection is alive but etcd is unresponsive (see [#110](https://github.com/microsoft/etcd3/issues/110))
## 0.2.13 2019-07-03

Просмотреть файл

@ -8,7 +8,7 @@ import { PutBuilder } from './builder';
import { ConnectionPool, Host } from './connection-pool';
import { castGrpcError, EtcdError, EtcdLeaseInvalidError, GRPCConnectFailedError } from './errors';
import * as RPC from './rpc';
import { NSApplicator } from './util';
import { NSApplicator, debounce } from './util';
function throwIfError<T>(value: T | Error): T {
if (value instanceof Error) {
@ -293,14 +293,22 @@ export class Lease extends EventEmitter {
return stream.end();
}
const keepaliveTimer = setInterval(() => this.fireKeepAlive(stream), (1000 * this.ttl) / 3);
// this is what the official Go client uses, good enough:
const keepAliveInterval = (1000 * this.ttl) / 3;
const keepaliveTimer = setInterval(() => this.fireKeepAlive(stream), keepAliveInterval);
const keepAliveTimeout = debounce(1000 * this.ttl, () =>
this.handleKeepaliveError(new GRPCConnectFailedError('GRPC watch stream has timed out.')),
);
this.teardown = () => {
this.teardown = () => undefined;
keepAliveTimeout.cancel();
clearInterval(keepaliveTimer);
stream.end();
};
keepAliveTimeout(); // start the debounce
stream
.on('error', err => this.handleKeepaliveError(err))
.on('data', res => {
@ -309,6 +317,7 @@ export class Lease extends EventEmitter {
}
this.lastKeepAlive = Date.now();
keepAliveTimeout();
this.emit('keepaliveSucceeded', res);
});
@ -326,6 +335,10 @@ export class Lease extends EventEmitter {
}
private handleKeepaliveError(err: Error) {
if (this.state === State.Revoked) {
return; // often write-after-end, or something along those lines
}
this.emit('keepaliveFailed', castGrpcError(err));
this.teardown();

Просмотреть файл

@ -5,8 +5,14 @@ import { expect } from 'chai';
import * as sinon from 'sinon';
import { Etcd3, EtcdLeaseInvalidError, GRPCConnectFailedError, Lease } from '..';
import { onceEvent } from '../util';
import { createTestClientAndKeys, getOptions, proxy, tearDownTestClient } from './util';
import { onceEvent, delay } from '../util';
import {
createTestClientAndKeys,
getOptions,
proxy,
tearDownTestClient,
TrafficDirection,
} from './util';
describe('lease()', () => {
let client: Etcd3;
@ -87,9 +93,9 @@ describe('lease()', () => {
lease = proxiedClient.lease(100);
await lease.grant();
proxy.pause();
proxy.suspend();
await onceEvent(lease, 'keepaliveFailed');
proxy.resume();
proxy.unsuspend();
await onceEvent(lease, 'keepaliveSucceeded');
await lease.revoke();
@ -103,7 +109,7 @@ describe('lease()', () => {
lease = proxiedClient.lease(1);
await lease.grant();
proxy.pause();
proxy.suspend();
(lease as any).lastKeepAlive = Date.now() - 2000; // speed things up a little
const err = await onceEvent(lease, 'lost');
expect(err.message).to.match(/our lease has expired/);
@ -178,6 +184,29 @@ describe('lease()', () => {
expect(kaFired.fired).to.be.false;
});
it('marks leases as failed if etcd does not respond to keepalives in time (#110)', async () => {
await lease.revoke();
await proxy.activate();
const proxiedClient = new Etcd3(getOptions());
lease = proxiedClient.lease(1);
await lease.grant();
proxy.pause(TrafficDirection.FromEtcd);
const failedEvent = watchEmission('keepaliveFailed');
clock.tick(50000);
await Promise.resolve(); // drain task queues
expect(failedEvent.fired).to.be.false;
clock.tick(10000);
await Promise.resolve(); // drain task queues
expect(failedEvent.fired).to.be.true;
lease.release();
proxiedClient.close();
await proxy.deactivate();
});
it('tears down if the lease gets revoked', async () => {
await client.leaseClient.leaseRevoke({ ID: await lease.grant() });
clock.tick(20000);

Просмотреть файл

@ -16,6 +16,11 @@ const tlsKey = fs.readFileSync(`${rootPath}/src/test/certs/private/etcd0.localho
const etcdSourceAddress = process.env.ETCD_ADDR || '127.0.0.1:2379';
const [etcdSourceHost, etcdSourcePort] = etcdSourceAddress.split(':');
export const enum TrafficDirection {
ToEtcd,
FromEtcd,
}
export const etcdVersion = process.env.ETCD_VERSION || '3.3.9';
/**
@ -29,6 +34,7 @@ export class Proxy {
private server: tls.Server;
private host: string;
private port: number;
private enabledDataFlows = new Set([TrafficDirection.FromEtcd, TrafficDirection.ToEtcd]);
/**
* activate creates the proxy server.
@ -54,11 +60,11 @@ export class Proxy {
}
/**
* pause temporarily shuts down the server, but does not 'deactivate' the
* suspend temporarily shuts down the server, but does not 'deactivate' the
* proxy; new connections will still try to hit it. Can be restored with
* resume().
*/
public pause() {
public suspend() {
this.server.close();
this.connections.forEach(cnx => cnx.end());
this.connections = [];
@ -67,10 +73,24 @@ export class Proxy {
/**
* Starts up a previously stopped server.
*/
public resume() {
public unsuspend() {
this.server.listen(this.port, this.host);
}
/**
* Disables data flowing in one direction on the connection.
*/
public pause(direction: TrafficDirection) {
this.enabledDataFlows.delete(direction);
}
/**
* Reenables data flow on the connection.
*/
public resume(direction: TrafficDirection) {
this.enabledDataFlows.add(direction);
}
/**
* Destroys a previously-active proxy server.
*/
@ -118,17 +138,23 @@ export class Proxy {
};
serverCnx.on('data', (data: Buffer) => {
if (!ended) {
clientCnx.write(data);
if (ended || !this.enabledDataFlows.has(TrafficDirection.FromEtcd)) {
return;
}
clientCnx.write(data);
});
serverCnx.on('end', end);
serverCnx.on('error', end);
clientCnx.on('data', (data: Buffer) => {
if (serverConnected && !ended) {
if (ended || !this.enabledDataFlows.has(TrafficDirection.ToEtcd)) {
return;
}
if (serverConnected) {
serverCnx.write(data);
} else if (!ended) {
} else {
serverBuffer.push(data);
}
});

Просмотреть файл

@ -71,9 +71,9 @@ describe('watch()', () => {
const watcher = await proxiedClient.watch().key('foo1').create();
proxy.pause();
proxy.suspend();
await onceEvent(watcher, 'disconnected');
proxy.resume();
proxy.unsuspend();
await onceEvent(watcher, 'connected');
await expectWatching(watcher, 'foo1');
@ -98,9 +98,9 @@ describe('watch()', () => {
}),
]);
proxy.pause();
proxy.suspend();
await onceEvent(watcher, 'disconnected');
proxy.resume();
proxy.unsuspend();
await onceEvent(watcher, 'put').then((res: IKeyValue) => {
expect(res.key.toString()).to.equal('foo1');
expect(res.value.toString()).to.equal('update 2');
@ -115,11 +115,11 @@ describe('watch()', () => {
const proxiedClient = await createTestClientAndKeys();
const watcher = await proxiedClient.watch().key('foo1').create();
proxy.pause();
proxy.suspend();
await onceEvent(watcher, 'disconnected');
const actualRevision = Number(watcher.request.start_revision);
watcher.request.start_revision = 999999;
proxy.resume();
proxy.unsuspend();
await onceEvent(watcher, 'connected');
expect(Number(watcher.request.start_revision)).to.equal(actualRevision);
});
@ -213,10 +213,10 @@ describe('watch()', () => {
const proxiedClient = await createTestClientAndKeys();
const watcher = await proxiedClient.watch().key('foo1').create();
proxy.pause();
proxy.suspend();
await watcher.cancel();
proxy.resume();
proxy.unsuspend();
expect(getWatchers()).to.deep.equal([]);
proxiedClient.close();

Просмотреть файл

@ -189,6 +189,27 @@ export function onceEvent(emitter: EventEmitter, ...events: string[]): Promise<a
});
}
/**
* A trailing-edge debounce function.
*/
export function debounce(duration: number, fn: () => void) {
let timeout: NodeJS.Timeout | undefined;
const wrapper = () => {
wrapper.cancel();
timeout = setTimeout(fn, duration);
};
wrapper.cancel = () => {
if (timeout) {
clearTimeout(timeout);
timeout = undefined;
}
};
return wrapper;
}
/**
* PromiseWrap provides promise-like functions that auto-invoke an exec
* method when called.