diff --git a/changelog.md b/changelog.md index 3c02c4e..37ba480 100644 --- a/changelog.md +++ b/changelog.md @@ -2,6 +2,7 @@ - **bug**: fixed comparisons breaking in STM when using namespaces (see [#90](https://github.com/mixer/etcd3/issues/90)) - **feat**: allow retrieving lock lease IDs (see [#75](https://github.com/mixer/etcd3/issues/75)) + - **bug**: fixed using `lease.put` in transactions not applying the lease to the target key (see [#92](https://github.com/mixer/etcd3/issues/92)) ## 0.2.12 2019-07-03 diff --git a/src/builder.ts b/src/builder.ts index a26d091..a2278c4 100644 --- a/src/builder.ts +++ b/src/builder.ts @@ -24,7 +24,7 @@ export interface ICompareTarget { } export interface IOperation { - op(): RPC.IRequestOp; + op(): Promise; } /** @@ -121,8 +121,8 @@ export abstract class RangeBuilder extends PromiseWrap implements IOperati /** * Returns the request op for this builder, used in transactions. */ - public op(): RPC.IRequestOp { - return { request_range: this.namespace.applyToRequest(this.request) }; + public op(): Promise { + return Promise.resolve({ request_range: this.namespace.applyToRequest(this.request) }); } } @@ -416,10 +416,10 @@ export class DeleteBuilder extends PromiseWrap { /** * Returns the request op for this builder, used in transactions. */ - public op(): RPC.IRequestOp { - return { + public op(): Promise { + return Promise.resolve({ request_delete_range: this.namespace.applyToRequest(this.request), - }; + }); } /** @@ -435,6 +435,7 @@ export class DeleteBuilder extends PromiseWrap { */ export class PutBuilder extends PromiseWrap { private request: RPC.IPutRequest = {}; + private leaseFilter?: string | number | Promise; private callOptions: grpc.CallOptions | undefined; constructor( @@ -458,8 +459,8 @@ export class PutBuilder extends PromiseWrap { * Sets the lease value to use for storing the key. You usually don't * need to use this directly, use `client.lease()` instead! */ - public lease(lease: number | string): this { - this.request.lease = lease; + public lease(lease: number | string | Promise): this { + this.leaseFilter = lease; return this; } @@ -502,14 +503,16 @@ export class PutBuilder extends PromiseWrap { /** * exec runs the put request. */ - public exec(): Promise { + public async exec(): Promise { + await this.applyLease(); return this.kv.put(this.namespace.applyToRequest(this.request), this.callOptions); } /** * Returns the request op for this builder, used in transactions. */ - public op(): RPC.IRequestOp { + public async op(): Promise { + await this.applyLease(); return { request_put: this.namespace.applyToRequest(this.request) }; } @@ -519,6 +522,18 @@ export class PutBuilder extends PromiseWrap { protected createPromise(): Promise { return this.exec(); } + + private async applyLease() { + if (!this.leaseFilter) { + return; + } + + if (typeof this.leaseFilter === 'number' || typeof this.leaseFilter === 'string') { + this.request.lease = this.leaseFilter; + } + + this.request.lease = await this.leaseFilter; + } } /** @@ -544,7 +559,11 @@ export class PutBuilder extends PromiseWrap { * ``` */ export class ComparatorBuilder { - private request: RPC.ITxnRequest = {}; + private request: { + compare: Promise[]; + success: Promise[]; + failure: Promise[]; + } = { compare: [], success: [], failure: [] }; private callOptions: grpc.CallOptions | undefined; constructor(private readonly kv: RPC.KVClient, private readonly namespace: NSApplicator) {} @@ -573,13 +592,14 @@ export class ComparatorBuilder { value = toBuffer(value); } - this.request.compare = this.request.compare || []; - this.request.compare.push({ - key: this.namespace.applyKey(toBuffer(key)), - result: comparator[cmp], - target: RPC.CompareTarget[column], - [compareTarget[column]]: value, - }); + this.request.compare.push( + Promise.resolve({ + key: this.namespace.applyKey(toBuffer(key)), + result: comparator[cmp], + target: RPC.CompareTarget[column], + [compareTarget[column]]: value, + }), + ); return this; } @@ -604,20 +624,27 @@ export class ComparatorBuilder { /** * Runs the generated transaction and returns its result. */ - public commit(): Promise { - return this.kv.txn(this.request, this.callOptions); + public async commit(): Promise { + return this.kv.txn( + { + compare: await Promise.all(this.request.compare), + failure: await Promise.all(this.request.failure), + success: await Promise.all(this.request.success), + }, + this.callOptions, + ); } /** * Low-level method to add */ - public mapOperations(ops: (RPC.IRequestOp | IOperation)[]): RPC.IRequestOp[] { + public mapOperations(ops: (RPC.IRequestOp | IOperation)[]) { return ops.map(op => { if (typeof (op).op === 'function') { return (op).op(); } - return op; + return Promise.resolve(op); }); } } diff --git a/src/lease.ts b/src/lease.ts index 37533b0..53b388c 100644 --- a/src/lease.ts +++ b/src/lease.ts @@ -33,19 +33,13 @@ class LeaseClientWrapper implements RPC.ICallable { ) {} public exec(service: keyof typeof RPC.Services, method: string, payload: any): Promise { - return this.lease.leaseID - .then(throwIfError) - .then(lease => { - payload.lease = lease; - return this.pool.exec(service, method, payload); - }) - .catch(err => { - if (err instanceof EtcdLeaseInvalidError) { - this.lease.emitLoss(err); - } + return this.pool.exec(service, method, payload).catch(err => { + if (err instanceof EtcdLeaseInvalidError) { + this.lease.emitLoss(err); + } - throw err; - }); + throw err; + }); } public markFailed(host: Host): void { @@ -172,7 +166,7 @@ export class Lease extends EventEmitter { new RPC.KVClient(new LeaseClientWrapper(this.pool, this)), this.namespace, key, - ); + ).lease(this.grant()); } /** diff --git a/test/lease.test.ts b/test/lease.test.ts index c76d6e9..f4d47c9 100644 --- a/test/lease.test.ts +++ b/test/lease.test.ts @@ -55,6 +55,20 @@ describe('lease()', () => { expect(await client.get('leased').buffer()).to.be.null; }); + it('attaches leases through transactions', async () => { + lease = client.lease(100); + await lease.put('leased').value('foo'); + + const result = await client + .if('foo1', 'Value', '==', 'bar1') + .then(lease.put('leased').value('foo')) + .commit(); + expect(result.succeeded).to.equal(true, 'expected to have completed transaction'); + expect((await client.get('leased').exec()).kvs[0].lease).to.equal(await lease.grant()); + await lease.revoke(); + expect(await client.get('leased').buffer()).to.be.null; + }); + it('runs immediate keepalives', async () => { lease = client.lease(100); expect(await lease.keepaliveOnce()).to.containSubset({