fix(lease): lease not attaching through put in transactions

This commit is contained in:
Connor Peet 2019-07-03 12:03:36 -07:00
Родитель 56ea496bbe
Коммит 5b72261859
4 изменённых файлов: 71 добавлений и 35 удалений

Просмотреть файл

@ -2,6 +2,7 @@
- **bug**: fixed comparisons breaking in STM when using namespaces (see [#90](https://github.com/mixer/etcd3/issues/90))
- **feat**: allow retrieving lock lease IDs (see [#75](https://github.com/mixer/etcd3/issues/75))
- **bug**: fixed using `lease.put` in transactions not applying the lease to the target key (see [#92](https://github.com/mixer/etcd3/issues/92))
## 0.2.12 2019-07-03

Просмотреть файл

@ -24,7 +24,7 @@ export interface ICompareTarget {
}
export interface IOperation {
op(): RPC.IRequestOp;
op(): Promise<RPC.IRequestOp>;
}
/**
@ -121,8 +121,8 @@ export abstract class RangeBuilder<T> extends PromiseWrap<T> implements IOperati
/**
* Returns the request op for this builder, used in transactions.
*/
public op(): RPC.IRequestOp {
return { request_range: this.namespace.applyToRequest(this.request) };
public op(): Promise<RPC.IRequestOp> {
return Promise.resolve({ request_range: this.namespace.applyToRequest(this.request) });
}
}
@ -416,10 +416,10 @@ export class DeleteBuilder extends PromiseWrap<RPC.IDeleteRangeResponse> {
/**
* Returns the request op for this builder, used in transactions.
*/
public op(): RPC.IRequestOp {
return {
public op(): Promise<RPC.IRequestOp> {
return Promise.resolve({
request_delete_range: this.namespace.applyToRequest(this.request),
};
});
}
/**
@ -435,6 +435,7 @@ export class DeleteBuilder extends PromiseWrap<RPC.IDeleteRangeResponse> {
*/
export class PutBuilder extends PromiseWrap<RPC.IPutResponse> {
private request: RPC.IPutRequest = {};
private leaseFilter?: string | number | Promise<string | number>;
private callOptions: grpc.CallOptions | undefined;
constructor(
@ -458,8 +459,8 @@ export class PutBuilder extends PromiseWrap<RPC.IPutResponse> {
* Sets the lease value to use for storing the key. You usually don't
* need to use this directly, use `client.lease()` instead!
*/
public lease(lease: number | string): this {
this.request.lease = lease;
public lease(lease: number | string | Promise<string | number>): this {
this.leaseFilter = lease;
return this;
}
@ -502,14 +503,16 @@ export class PutBuilder extends PromiseWrap<RPC.IPutResponse> {
/**
* exec runs the put request.
*/
public exec(): Promise<RPC.IPutResponse> {
public async exec(): Promise<RPC.IPutResponse> {
await this.applyLease();
return this.kv.put(this.namespace.applyToRequest(this.request), this.callOptions);
}
/**
* Returns the request op for this builder, used in transactions.
*/
public op(): RPC.IRequestOp {
public async op(): Promise<RPC.IRequestOp> {
await this.applyLease();
return { request_put: this.namespace.applyToRequest(this.request) };
}
@ -519,6 +522,18 @@ export class PutBuilder extends PromiseWrap<RPC.IPutResponse> {
protected createPromise(): Promise<RPC.IPutResponse> {
return this.exec();
}
private async applyLease() {
if (!this.leaseFilter) {
return;
}
if (typeof this.leaseFilter === 'number' || typeof this.leaseFilter === 'string') {
this.request.lease = this.leaseFilter;
}
this.request.lease = await this.leaseFilter;
}
}
/**
@ -544,7 +559,11 @@ export class PutBuilder extends PromiseWrap<RPC.IPutResponse> {
* ```
*/
export class ComparatorBuilder {
private request: RPC.ITxnRequest = {};
private request: {
compare: Promise<RPC.ICompare>[];
success: Promise<RPC.IRequestOp>[];
failure: Promise<RPC.IRequestOp>[];
} = { compare: [], success: [], failure: [] };
private callOptions: grpc.CallOptions | undefined;
constructor(private readonly kv: RPC.KVClient, private readonly namespace: NSApplicator) {}
@ -573,13 +592,14 @@ export class ComparatorBuilder {
value = toBuffer(<string | Buffer>value);
}
this.request.compare = this.request.compare || [];
this.request.compare.push({
key: this.namespace.applyKey(toBuffer(key)),
result: comparator[cmp],
target: RPC.CompareTarget[column],
[compareTarget[column]]: value,
});
this.request.compare.push(
Promise.resolve({
key: this.namespace.applyKey(toBuffer(key)),
result: comparator[cmp],
target: RPC.CompareTarget[column],
[compareTarget[column]]: value,
}),
);
return this;
}
@ -604,20 +624,27 @@ export class ComparatorBuilder {
/**
* Runs the generated transaction and returns its result.
*/
public commit(): Promise<RPC.ITxnResponse> {
return this.kv.txn(this.request, this.callOptions);
public async commit(): Promise<RPC.ITxnResponse> {
return this.kv.txn(
{
compare: await Promise.all(this.request.compare),
failure: await Promise.all(this.request.failure),
success: await Promise.all(this.request.success),
},
this.callOptions,
);
}
/**
* Low-level method to add
*/
public mapOperations(ops: (RPC.IRequestOp | IOperation)[]): RPC.IRequestOp[] {
public mapOperations(ops: (RPC.IRequestOp | IOperation)[]) {
return ops.map(op => {
if (typeof (<IOperation>op).op === 'function') {
return (<IOperation>op).op();
}
return <RPC.IRequestOp>op;
return Promise.resolve(<RPC.IRequestOp>op);
});
}
}

Просмотреть файл

@ -33,19 +33,13 @@ class LeaseClientWrapper implements RPC.ICallable<Host> {
) {}
public exec(service: keyof typeof RPC.Services, method: string, payload: any): Promise<any> {
return this.lease.leaseID
.then(throwIfError)
.then(lease => {
payload.lease = lease;
return this.pool.exec(service, method, payload);
})
.catch(err => {
if (err instanceof EtcdLeaseInvalidError) {
this.lease.emitLoss(err);
}
return this.pool.exec(service, method, payload).catch(err => {
if (err instanceof EtcdLeaseInvalidError) {
this.lease.emitLoss(err);
}
throw err;
});
throw err;
});
}
public markFailed(host: Host): void {
@ -172,7 +166,7 @@ export class Lease extends EventEmitter {
new RPC.KVClient(new LeaseClientWrapper(this.pool, <any>this)),
this.namespace,
key,
);
).lease(this.grant());
}
/**

Просмотреть файл

@ -55,6 +55,20 @@ describe('lease()', () => {
expect(await client.get('leased').buffer()).to.be.null;
});
it('attaches leases through transactions', async () => {
lease = client.lease(100);
await lease.put('leased').value('foo');
const result = await client
.if('foo1', 'Value', '==', 'bar1')
.then(lease.put('leased').value('foo'))
.commit();
expect(result.succeeded).to.equal(true, 'expected to have completed transaction');
expect((await client.get('leased').exec()).kvs[0].lease).to.equal(await lease.grant());
await lease.revoke();
expect(await client.get('leased').buffer()).to.be.null;
});
it('runs immediate keepalives', async () => {
lease = client.lease(100);
expect(await lease.keepaliveOnce()).to.containSubset({