refactor: solidify observer side of elections

Previously observation was done "magically" by listening to an event.
This made it hard to monitor status and lifecycle. This moves the logic
into a subclass. Additionally, revisions were not being maintained
between different queries introducing the possibility for race
conditions. The inner event loop is functionally identical to the
one in the etcd3 official client.
This commit is contained in:
Connor Peet 2020-11-28 14:50:05 -08:00
Родитель cead70a0d0
Коммит a7ae3d67ec
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: CF8FD2EA0DBC61BD
6 изменённых файлов: 326 добавлений и 148 удалений

Просмотреть файл

@ -66,6 +66,7 @@
"prettier": "^2.1.2",
"protobufjs": "^6.10.1",
"rimraf": "^3.0.2",
"rxjs": "^6.6.3",
"sinon": "^9.0.3",
"ts-node": "^9.0.0",
"typedoc": " 0.17.0-3",

Просмотреть файл

@ -1,10 +1,16 @@
/*---------------------------------------------------------
* Copyright (C) Microsoft Corporation. All rights reserved.
*--------------------------------------------------------*/
import BigNumber from 'bignumber.js';
import { EventEmitter } from 'events';
import { EtcdNoLeaderError, EtcdNotLeaderError } from './errors';
import { Lease } from './lease';
import { ClientRuntimeError, EtcdNotLeaderError } from './errors';
import { Lease, LeaseState } from './lease';
import { Namespace } from './namespace';
import { IKeyValue } from './rpc';
export interface Election { // tslint:disable-line interface-name
export interface Election {
// tslint:disable-line interface-name
/**
* fired after leader elected
*/
@ -15,7 +21,163 @@ export interface Election { // tslint:disable-line interface-name
* - recreate lease fail after lease lost
*/
on(event: 'error', listener: (error: any) => void): this;
on(event: string|symbol, listener: Function): this;
on(event: string | symbol, listener: Function): this;
}
const UnsetCurrent = Symbol('unset');
/**
* Object returned from election.observer() that exposees information about
* the current election.
*/
export class ElectionObserver extends EventEmitter {
/**
* Gets whether the election has any leader.
*/
public get hasLeader() {
return !!this.current;
}
private running = true;
private runLoop: Promise<void>;
private disposer?: () => void;
private current: IKeyValue | typeof UnsetCurrent | undefined = UnsetCurrent;
constructor(private readonly namespace: Namespace) {
super();
this.runLoop = this.loop().catch(err => {
this.emit('error', err);
});
}
/**
* change is fired when the elected value changes. It can be fired with
* undefined if there's no longer a leader.
*/
public on(event: 'change', handler: (value: string | undefined) => void): this;
/**
* error is fired if the underlying election watcher experiences an error.
*/
public on(event: 'error', handler: (value: string | undefined) => void): this;
/**
* Implements EventEmitter.on(...).
*/
public on(event: string, handler: (...args: any[]) => void): this {
return super.on(event, handler);
}
/**
* Closes the election observer.
*/
public async cancel() {
this.running = false;
this.disposer?.();
await this.runLoop;
}
/**
* Returns the currently-elected leader value (passed to `campaign()` or
* `proclaim()`), or undefined if there's no elected leader.
*/
public leader(encoding?: BufferEncoding): string | undefined;
/**
* Returns the currently-elected leader value (passed to `campaign()` or
* `proclaim()`), or undefined if there's no elected leader.
*/
public leader(encoding: 'buffer'): Buffer | undefined;
public leader(encoding: BufferEncoding | 'buffer' = 'utf-8') {
const leader = this.current;
if (!leader || leader === UnsetCurrent) {
return undefined;
}
return encoding === 'buffer' ? leader.value : leader.value.toString(encoding);
}
private setLeader(kv: IKeyValue | undefined) {
const prev = this.current;
this.current = kv;
if (prev === UnsetCurrent) {
this.emit('change', undefined);
} else if (kv === undefined) {
if (prev !== undefined) {
this.emit('change', undefined);
}
} else if (!prev || !kv.value.equals(prev.value)) {
this.emit('change', kv.value.toString());
}
}
private async loop() {
// @see https://github.com/etcd-io/etcd/blob/28d1af294e4394df1ed967a4ac4fbaf437be3463/client/v3/concurrency/election.go#L177
while (this.running) {
const allKeys = await this.namespace.getAll().sort('Create', 'Ascend').limit(1).exec();
let leader: IKeyValue | undefined = allKeys.kvs[0];
let revision = allKeys.header.revision;
if (!this.running) {
return; // if closed when doing async work
}
if (!leader) {
this.setLeader(undefined);
const watcher = this.namespace
.watch()
.startRevision(allKeys.header.revision)
.prefix('')
.ignore('put')
.watcher();
await new Promise<void>((resolve, reject) => {
watcher.on('data', data => {
let done = false;
for (const event of data.events) {
if (event.type === 'Put') {
leader = event.kv;
revision = event.kv.mod_revision;
done = true;
}
}
if (done) {
resolve();
}
});
watcher.on('error', reject);
this.disposer = resolve;
}).finally(() => watcher.cancel());
if (!this.running) {
return; // if closed when doing async work
}
}
if (!leader) {
throw new ClientRuntimeError('unreachable lack of election leader');
}
this.setLeader(leader);
const watcher = this.namespace
.watch()
.startRevision(new BigNumber(revision).plus(1).toString())
.key(leader.key)
.watcher();
await new Promise<void>((resolve, reject) => {
watcher!.on('put', kv => this.setLeader(kv));
watcher!.on('delete', () => resolve());
watcher!.on('error', reject);
this.disposer = () => {
resolve();
return watcher.cancel();
};
}).finally(() => watcher.cancel());
}
}
}
/**
@ -31,49 +193,60 @@ export interface Election { // tslint:disable-line interface-name
* await election.campaign(id)
*/
export class Election extends EventEmitter {
/**
* Prefix used in the namespace for election-based operations.
*/
public static readonly prefix = 'election';
private readonly namespace: Namespace;
private lease: Lease | null = null;
private lease?: Lease;
private leaseId = '';
private _leaderKey = '';
private _leaderRevision = '';
private _isCampaigning = false;
private _isObserving = false;
public get leaderKey(): string { return this._leaderKey; }
public get leaderRevision(): string { return this._leaderRevision; }
public get isReady(): boolean { return this.leaseId.length > 0; }
public get isCampaigning(): boolean { return this._isCampaigning; }
public get isObserving(): boolean { return this._isObserving; }
public get leaderKey(): string {
return this._leaderKey;
}
public get leaderRevision(): string {
return this._leaderRevision;
}
public get isReady(): boolean {
return this.lease?.state === LeaseState.Alive;
}
public get isCampaigning(): boolean {
return this._isCampaigning;
}
constructor(public readonly parent: Namespace,
public readonly name: string,
public readonly ttl: number = 60) {
constructor(
public readonly parent: Namespace,
public readonly name: string,
public readonly ttl: number = 60,
) {
super();
this.namespace = parent.namespace(this.getPrefix());
this.on('newListener', (event: string) => this.onNewListener(event));
}
public async initialize() {
if (!this.lease) {
this.lease = this.namespace.lease(this.ttl);
this.lease.on('lost', () => this.onLeaseLost());
this.leaseId = await this.lease.grant();
}
}
/**
* Puts the value as eligible for election. Multiple sessions can participate
* in the election for the same prefix, but only one can be the leader at a
* time.
*
* A common pattern in cluster-based applications is to campaign the hostname
* or IP of the current server, and allow the leader server to be elected
* among them.
*
* This will block until the node is elected.
*/
public async campaign(value: string) {
await this.initialize();
const leaseId = await this.acquireLease();
const result = await this.namespace
.if(this.leaseId, 'Create', '==', 0)
.then(this.namespace.put(this.leaseId).value(value).lease(this.leaseId))
.else(this.namespace.get(this.leaseId))
.if(leaseId, 'Create', '==', 0)
.then(this.namespace.put(leaseId).value(value).lease(leaseId))
.else(this.namespace.get(leaseId))
.commit();
this._leaderKey = `${this.getPrefix()}${this.leaseId}`;
this._leaderKey = `${this.getPrefix()}${leaseId}`;
this._leaderRevision = result.header.revision;
this._isCampaigning = true;
@ -103,9 +276,10 @@ export class Election extends EventEmitter {
throw new EtcdNotLeaderError();
}
const leaseId = await this.lease!.grant();
const r = await this.namespace
.if(this.leaseId, 'Create', '==', this._leaderRevision)
.then(this.namespace.put(this.leaseId).value(value).lease(this.leaseId))
.if(leaseId, 'Create', '==', this._leaderRevision)
.then(this.namespace.put(leaseId).value(value).lease(leaseId))
.commit();
if (!r.succeeded) {
@ -119,9 +293,10 @@ export class Election extends EventEmitter {
return;
}
const leaseId = await this.lease!.grant();
const r = await this.namespace
.if(this.leaseId, 'Create', '==', this._leaderRevision)
.then(this.namespace.delete().key(this.leaseId))
.if(leaseId, 'Create', '==', this._leaderRevision)
.then(this.namespace.delete().key(leaseId))
.commit();
if (!r.succeeded) {
@ -131,8 +306,7 @@ export class Election extends EventEmitter {
// If fail, revoke lease for performing resigning
await this.lease.revoke();
this.lease = this.namespace.lease(this.ttl);
this.lease.on('lost', () => this.onLeaseLost());
this.leaseId = '';
this.lease.on('lost', err => this.onLeaseLost(err));
}
this._leaderKey = '';
@ -140,18 +314,44 @@ export class Election extends EventEmitter {
this._isCampaigning = false;
}
public async getLeader() {
const result = await this.namespace.getAll().sort('Create', 'Ascend').keys();
if (result.length === 0) {
throw new EtcdNoLeaderError();
/**
* Returns the currently-elected leader value (passed to `campaign()` or
* `proclaim()`), or undefined if there's no elected leader.
*/
public async leader(encoding?: BufferEncoding): Promise<string | undefined>;
/**
* Returns the currently-elected leader value (passed to `campaign()` or
* `proclaim()`), or undefined if there's no elected leader.
*/
public async leader(encoding: 'buffer'): Promise<Buffer | undefined>;
public async leader(encoding: BufferEncoding | 'buffer' = 'utf-8') {
const result = await this.namespace.getAll().sort('Create', 'Ascend').limit(1).exec();
const leader = result.kvs[0];
if (leader === undefined) {
return undefined;
}
return `${this.getPrefix()}${result[0]}`;
return encoding === 'buffer' ? leader.value : leader.value.toString();
}
public getPrefix() {
return `${Election.prefix}/${this.name}/`;
}
/**
* Creates the lease for a campaign, if it does not exist, and returns the
* lease ID once available.
*/
private acquireLease() {
if (!this.lease) {
this.lease = this.namespace.lease(this.ttl);
this.lease.on('lost', err => this.onLeaseLost(err));
}
return this.lease.grant();
}
private async waitForElected() {
// find last create before this
const lastRevision = new BigNumber(this.leaderRevision).minus(1).toString();
@ -159,88 +359,46 @@ export class Election extends EventEmitter {
.getAll()
.maxCreateRevision(lastRevision)
.sort('Create', 'Descend')
.keys();
.exec();
// no one before this, elected
if (result.length === 0) {
if (result.kvs.length === 0) {
return;
}
// wait all keys created ealier are deleted
await waitForDeletes(this.namespace, result);
await waitForDeletes(
this.namespace,
result.kvs.map(k => k.key),
result.header.revision,
);
}
private async observe() {
if (this._isObserving) {
return;
}
try {
this._isObserving = true;
// looking for current leader
let leaderKey = '';
const result = await this.namespace.getAll().sort('Create', 'Ascend').keys();
if (result.length === 0) {
// if not found, wait for leader
const watcher = await this.parent.watch().prefix(this.getPrefix()).create();
try {
leaderKey = await new Promise<string>((resolve, reject) => {
watcher.on('put', kv => resolve(kv.key.toString()));
watcher.on('error', reject);
});
} finally {
await watcher.cancel();
}
} else {
leaderKey = `${this.getPrefix()}${result[0]}`;
}
// emit current leader
this.emit('leader', leaderKey);
// wait for delete event
await waitForDelete(this.parent, leaderKey);
} finally {
this._isObserving = false;
}
// only keep watch if listened
if (this.listenerCount('leader') > 0) {
this.tryObserve();
}
}
private tryObserve(): void {
this.observe().catch(error => {
this.emit('error', error);
this.tryObserve();
/**
* Creates an observer for the election, which emits events when results
* change. The observer must be closed using `observer.cancel()` when
* you're finished with it.
*/
public async observe() {
const observer = new ElectionObserver(this.namespace);
return new Promise<ElectionObserver>((resolve, reject) => {
observer.once('change', () => resolve(observer));
observer.once('error', reject);
});
}
private shouldObserve(event: string|symbol): boolean {
return event === 'leader';
}
private onLeaseLost() {
private onLeaseLost(error: Error) {
if (this.lease) {
this.lease.removeAllListeners();
this.lease = null;
this.leaseId = '';
this.lease = undefined;
}
this.initialize().catch(error => this.emit('error', error));
}
private onNewListener(event: string) {
if (this.shouldObserve(event)) {
this.tryObserve();
}
this.emit('error', error);
}
}
async function waitForDelete(namespace: Namespace, key: string) {
const watcher = await namespace.watch().key(key).create();
async function waitForDelete(namespace: Namespace, key: Buffer, rev: string) {
const watcher = await namespace.watch().key(key).startRevision(rev).create();
const deleteOrError = new Promise((resolve, reject) => {
// waiting for deleting of that key
watcher.once('delete', resolve);
@ -254,21 +412,21 @@ async function waitForDelete(namespace: Namespace, key: string) {
}
}
async function waitForDeletes(namespace: Namespace, keys: string[]) {
async function waitForDeletes(namespace: Namespace, keys: Buffer[], rev: string) {
if (keys.length === 0) {
return;
}
if (keys.length === 1) {
return waitForDelete(namespace, keys[0]);
return waitForDelete(namespace, keys[0], rev);
}
const tasks = keys.map(key => async () => {
const keyExisted = await namespace.get(key).string() !== null;
const keyExisted = (await namespace.get(key).string()) !== null;
if (!keyExisted) {
return;
}
await waitForDelete(namespace, key);
await waitForDelete(namespace, key, rev);
});
let task = tasks.shift();

Просмотреть файл

@ -56,7 +56,8 @@ class LeaseClientWrapper implements RPC.ICallable<Host> {
}
}
const enum State {
export enum LeaseState {
Pending,
Alive,
Revoked,
}
@ -103,12 +104,16 @@ export interface ILeaseOptions extends grpc.CallOptions {
*/
export class Lease extends EventEmitter {
private leaseID: Promise<string | Error>;
private state = State.Alive;
private innerState = LeaseState.Pending;
private client = new RPC.LeaseClient(this.pool);
private lastKeepAlive: number;
private defaultOptions: grpc.CallOptions;
public get state() {
return this.innerState;
}
constructor(
private readonly pool: ConnectionPool,
private readonly namespace: NSApplicator,
@ -121,13 +126,13 @@ export class Lease extends EventEmitter {
this.defaultOptions = rest;
if (!ttl || ttl < 1) {
throw new Error(`The TTL in an etcd lease must be at least 1 second. Got: ${ttl}`);
throw new RangeError(`The TTL in an etcd lease must be at least 1 second. Got: ${ttl}`);
}
this.leaseID = this.client
.leaseGrant({ TTL: ttl }, options)
.then(res => {
this.state = State.Alive;
this.innerState = LeaseState.Alive;
this.lastKeepAlive = Date.now();
if (autoKeepAlive !== false) {
this.keepalive();
@ -227,7 +232,7 @@ export class Lease extends EventEmitter {
* Returns whether etcd has told us that this lease revoked.
*/
public revoked(): boolean {
return this.state === State.Revoked;
return this.innerState === LeaseState.Revoked;
}
/**
@ -283,7 +288,7 @@ export class Lease extends EventEmitter {
* Tears down resources associated with the lease.
*/
private close() {
this.state = State.Revoked;
this.innerState = LeaseState.Revoked;
this.teardown();
}
@ -315,7 +320,7 @@ export class Lease extends EventEmitter {
this.client
.leaseKeepAlive()
.then(stream => {
if (this.state !== State.Alive) {
if (this.innerState === LeaseState.Revoked) {
return stream.end();
}
@ -361,7 +366,7 @@ export class Lease extends EventEmitter {
}
private handleKeepaliveError(err: Error) {
if (this.state === State.Revoked) {
if (this.innerState === LeaseState.Revoked) {
return; // often write-after-end, or something along those lines
}

Просмотреть файл

@ -6,4 +6,5 @@ CMD etcd \
--advertise-client-urls 'https://0.0.0.0:2379' \
--listen-client-urls 'https://0.0.0.0:2379' \
--cert-file /root/etcd0.localhost.crt \
--key-file /root/etcd0.localhost.key
--key-file /root/etcd0.localhost.key \
--debug

Просмотреть файл

@ -1,6 +1,9 @@
import { expect } from 'chai';
import { fromEvent } from 'rxjs';
import { take } from 'rxjs/operators';
import * as sinon from 'sinon';
import { Election, Etcd3 } from '../';
import { delay } from '../util';
import { getOptions, tearDownTestClient } from './util';
const sleep = (t: number) => new Promise(resolve => setTimeout(resolve, t));
@ -130,46 +133,58 @@ describe('election', () => {
});
describe('observe', () => {
it('should emit leader event', async () => {
it('emits when existing leader resigns and other in queue', async () => {
const client2 = new Etcd3(getOptions());
const election2 = new Election(client2, 'test-election', 1);
let currentLeaderKey = '';
election.on('leader', leaderKey => (currentLeaderKey = leaderKey));
const observer = await election.observe();
const changeEvent = fromEvent(observer, 'change');
expect(election.isObserving).to.be.true;
// looking for current leader and emit it
await sleep(30);
expect(currentLeaderKey).to.equal(election.leaderKey);
expect(observer.leader()).to.equal('candidate');
const waitElection2 = election2.campaign('candidate2');
while ((await client2.getAll().prefix('election').keys()).length < 2) {
await delay(5);
}
await election.resign();
const [newLeader] = await Promise.all([
changeEvent.pipe(take(1)).toPromise(),
election.resign(),
waitElection2,
]);
await waitElection2;
// waiting for watcher
await sleep(30);
expect(currentLeaderKey).to.equal(election2.leaderKey);
expect(newLeader).to.equal('candidate2');
await observer.cancel();
});
it('should wait for leader', async () => {
it('emits when leader steps down', async () => {
const observer = await election.observe();
expect(observer.leader()).to.equal('candidate');
const changeEvent = fromEvent(observer, 'change');
const [newLeader] = await Promise.all([
changeEvent.pipe(take(1)).toPromise(),
election.resign(),
]);
expect(newLeader).to.be.undefined;
});
it('emits when leader is newly elected', async () => {
await election.resign();
let currentLeaderKey = '';
election.on('leader', leaderKey => (currentLeaderKey = leaderKey));
const observer = await election.observe();
const changeEvent = fromEvent(observer, 'change');
// waiting for watcher created
await sleep(30);
expect(observer.leader()).to.be.undefined;
await election.campaign('candidate');
const [, newLeader] = await Promise.all([
election.campaign('candidate'),
changeEvent.pipe(take(1)).toPromise(),
]);
await sleep(30);
expect(currentLeaderKey).to.equal(election.leaderKey);
expect(newLeader).to.equal('candidate');
await observer.cancel();
});
});
});

Просмотреть файл

@ -2,8 +2,8 @@
* Copyright (C) Microsoft Corporation. All rights reserved.
*--------------------------------------------------------*/
import BigNumber from 'bignumber.js';
import { IBackoff } from 'cockatiel';
import { EventEmitter } from 'events';
import {
castGrpcErrorMessage,
ClientRuntimeError,
@ -13,7 +13,6 @@ import {
import { Rangable, Range } from './range';
import * as RPC from './rpc';
import { NSApplicator, onceEvent, toBuffer } from './util';
import { IBackoff } from 'cockatiel';
const enum State {
Idle,
@ -307,7 +306,7 @@ export class WatchManager {
*/
private handleUpdateResponse(watcher: Watcher, res: RPC.IWatchResponse) {
try {
watcher.emit('data', res);
watcher.emit('data', res);
} catch (e) {
// throw any user errors in a new microtask so they don't get handled
// as a stream error.
@ -434,7 +433,7 @@ export class WatchBuilder {
* Watch starting from a specific revision.
*/
public startRevision(revision: string): this {
this.request.start_revision = revision;
this.request.start_revision = Number(revision);
return this;
}
@ -546,7 +545,6 @@ export class Watcher extends EventEmitter {
* Implements EventEmitter.on(...).
*/
public on(event: string, handler: (...args: any[]) => void): this {
// tslint:disable-line
return super.on(event, handler);
}