feat: allow passing a set of default CallOptions in new Etcd3()

This commit is contained in:
Connor Peet 2020-09-19 12:38:13 -07:00
Родитель 66b17697c5
Коммит 5f973eb47a
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: CF8FD2EA0DBC61BD
21 изменённых файлов: 1334 добавлений и 748 удалений

14
.vscode/launch.json поставляемый
Просмотреть файл

@ -8,17 +8,13 @@
"type": "pwa-node",
"request": "launch",
"name": "Run Tests",
"trace": true,
"skipFiles": ["<node_internals>/**"],
"program": "${workspaceFolder}/node_modules/mocha/bin/mocha",
"preLaunchTask": "tsc: build - tsconfig.json",
"args": [
"--timeout",
"60000",
"--require",
"source-map-support/register",
"--require",
"./lib/test/_setup.js",
"\"lib/test/**/*.test.js\""
"args": ["--timeout", "10000", "--require", "./lib/test/_setup.js", "lib/test/**/*.test.js"],
"outFiles": [
"${workspaceFolder}/**/*.js",
"!**/node_modules/**"
]
}
]

10
.vscode/settings.json поставляемый
Просмотреть файл

@ -1,14 +1,16 @@
{
"typescript.tsdk": "node_modules\\typescript\\lib",
"editor.formatOnSave": true,
"editor.codeActionsOnSave": {
"source.organizeImports": true
},
"editor.defaultFormatter": "esbenp.prettier-vscode",
"search.exclude": {
"**/lib": true,
"**/.nyc_output": true,
"**/docs": true,
"**/docs": true
},
"files.exclude": {
"**/.nyc_output": true,
},
"debug.node.autoAttach": "off"
"**/.nyc_output": true
}
}

15
.vscode/tasks.json поставляемый Normal file
Просмотреть файл

@ -0,0 +1,15 @@
{
"version": "2.0.0",
"tasks": [
{
"type": "typescript",
"tsconfig": "tsconfig.json",
"option": "watch",
"problemMatcher": [
"$tsc-watch"
],
"group": "build",
"label": "tsc: watch - tsconfig.json"
}
]
}

Просмотреть файл

@ -13,6 +13,7 @@
* create the output ourselves since it's pretty simple (~100 lines of code).
*/
const prettier = require('prettier');
const pbjs = require('protobufjs');
const fs = require('fs');
const _ = require('lodash');
@ -66,14 +67,26 @@ class MessageCollection {
const messages = new MessageCollection();
let result = '';
function emit(string) {
if (string) {
process.stdout.write(string.replace(/\n\n+/g, '\n\n'));
result += string;
}
return emit;
}
function writeOut() {
fs.writeFileSync(
`${__dirname}/../src/rpc.ts`,
prettier.format(result, {
...require('../package.json').prettier,
parser: 'typescript',
}),
);
}
function template(name, params) {
if (!templates[name]) {
templates[name] = _.template(fs.readFileSync(`${__dirname}/template/${name}.tmpl`, 'utf8'));
@ -89,7 +102,7 @@ function template(name, params) {
emit(
templates[name](params)
.replace(/^\-\- *\n/gm, '')
.replace(/^\-\-/gm, '')
.replace(/^\-\-/gm, ''),
);
}
@ -169,7 +182,7 @@ function getCommentPrefixing(substring, from = 0, indentation = 1) {
}
function generateMethodCalls(node, name) {
services[name] = `${name}Client`;
const service = (services[name] = { cls: `${name}Client`, methods: new Map() });
template('class-header', { name });
_.forOwn(node.methods, (method, mname) => {
@ -183,8 +196,12 @@ function generateMethodCalls(node, name) {
res,
responseTsType: res.empty ? 'void' : formatType(method.responseType),
service: name,
responseStream: method.responseStream,
requestStream: method.requestStream,
};
service.methods.set(method, params);
if (method.responseStream && !method.requestStream) {
template('response-stream-method', params);
} else if (method.responseStream && method.requestStream) {
@ -199,6 +216,18 @@ function generateMethodCalls(node, name) {
emit('}\n\n');
}
function generateCallContext() {
emit('export type CallContext = \n');
for (const service of Object.values(services)) {
for (const params of service.methods.values()) {
template('call-context', params);
}
}
emit(';\n');
}
function generateInterface(node, name) {
const message = messages.find(name);
template('interface', { name, node, message });
@ -276,6 +305,7 @@ function codeGen(ast) {
});
template('service-map', { services });
generateCallContext();
}
new pbjs.Root()
@ -284,5 +314,6 @@ new pbjs.Root()
prepareForGeneration(ast.nested);
template('rpc-prefix');
codeGen(ast.nested);
writeOut();
})
.catch(err => console.error(err.stack));

Просмотреть файл

@ -0,0 +1 @@
| { service: '<%= service %>', method: '<%= _.lowerFirst(name) %>', isStream: <%= !!(responseStream || requestStream) %>, <%= req.empty || requestStream ? '' : `params: ${requestTsType}` %> }

Просмотреть файл

@ -1,2 +1,2 @@
export class <%= name %>Client {
constructor(private client: ICallable<unknown>) {}
constructor(private readonly client: ICallable<unknown>) {}

Просмотреть файл

@ -2,7 +2,13 @@
public <%= _.lowerFirst(name) %>(options?: grpc.CallOptions): Promise<IDuplexStream<<%= requestTsType %>, <%= responseTsType %>>> {
return this.client.withConnection('<%= service %>', ({ resource, client, metadata }) => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const stream = (<any> client).<%= _.lowerFirst(name) %>(metadata, options);
const resolved = resolveCallOptions(options, this.client.callOptionsFactory, {
service: '<%= service %>',
method: '<%= _.lowerFirst(name) %>',
isStream: true,
});
const stream = (<any> client).<%= _.lowerFirst(name) %>(metadata, resolved);
stream.on('error', (err: Error) => stream.writable && this.client.markFailed(resource, err));
return stream;
});

Просмотреть файл

@ -1,8 +1,13 @@
--<%= getCommentPrefixing(`rpc ${name}(`) %>
public <%= _.lowerFirst(name) %>(<%= req.empty ? '' : `req: ${requestTsType}, ` %>options?: grpc.CallOptions): Promise<IResponseStream<<%= responseTsType %>>> {
return this.client.withConnection('<%= service %>', ({ resource, client, metadata }) => {
const resolved = resolveCallOptions(options, this.client.callOptionsFactory, {
service: '<%= service %>',
method: '<%= _.lowerFirst(name) %>', <%= req.empty ? '' : 'params: req, ' %> isStream: true,
});
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const stream = (<any> client).<%= _.lowerFirst(name) %>(metadata, options, <%= req.empty ? '{}' : 'req' %>);
const stream = (<any> client).<%= _.lowerFirst(name) %>(metadata, resolved, <%= req.empty ? '{}' : 'req' %>);
stream.on('error', (err: Error) => this.client.markFailed(resource, err));
return stream;
});

Просмотреть файл

@ -7,6 +7,8 @@
/* eslint-disable @typescript-eslint/no-empty-interface */
import * as grpc from '@grpc/grpc-js';
import type { CallOptionsFactory } from './options';
import { resolveCallOptions } from './util';
export interface ICallable<T> {
exec<T>(
@ -22,6 +24,8 @@ export interface ICallable<T> {
): Promise<R>;
markFailed(resource: T, error: Error): void;
readonly callOptionsFactory: CallOptionsFactory | undefined;
}
export interface IResponseStream<T> {

Просмотреть файл

@ -1,5 +1,5 @@
export const Services = {
--<% _.forOwn(services, (clientName, serviceName) => { %>
<%= serviceName %>: <%= clientName %>,
<%= serviceName %>: <%= clientName.cls %>,
--<% }) %>
};

Просмотреть файл

@ -2,6 +2,32 @@
- **fix:** update version of cockatiel to fix incompatible TypeScript types (see [#128](https://github.com/microsoft/etcd3/issues/128))
- **fix:** don't include the deadline in inherited lease call options (see [#131](https://github.com/microsoft/etcd3/issues/131))
- **feat:** allow passing a set of default CallOptions in new Etcd3() (see [#133](https://github.com/microsoft/etcd3/issues/133))
When constructing `Etcd3`, you can now pass `defaultCallOptions`. This can be an object, or a function which will be called for each etcd method call and should return an object. As a function, it will be called with a context object, which looks like:
```js
{
service: 'KV', // etcd service name
method: 'range', // etcd method name
isStream: false, // whether the call create a stream
params: { ... }, // arguments given to the call
}
```
For example, this will set a 10 second timeout on all calls which are not streams:
```js
const etcd3 = new Etcd3({
defaultCallOptions: context => context.isStream ? {} : Date.now() + 10000,
});
```
The default options are shallow merged with any call-specific options. For example this will always result in a 5 second timeout, regardless of what the `defaultCallOptions` contains:
```js
etcd3.get('foo').options({ deadline: Date.now() + 5000 })
```
## 1.0.1 2020-06-21

1700
package-lock.json сгенерированный

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -10,7 +10,7 @@
"test:cover": "nyc mocha",
"test:lint": "eslint \"src/**/*.ts\"",
"watch": "tsc --watch",
"build:proto": "node ./bin/update-proto ./proto && node bin/generate-methods.js ./proto/rpc.proto > src/rpc.ts && npm run fmt",
"build:proto": "node ./bin/update-proto ./proto && node bin/generate-methods.js",
"build:doc": "rimraf docs && typedoc --theme minimal --mode library --exclude \"src/test/*\" --excludePrivate --stripInternal --out ./docs ./src/index.ts && node bin/tame-typedoc",
"build:ts": "tsc",
"fmt": "prettier --write \"src/**/*.{ts,js}\" && npm run -s test:lint -- --fix",
@ -68,7 +68,7 @@
"rimraf": "^3.0.2",
"sinon": "^9.0.3",
"ts-node": "^9.0.0",
"typedoc": "^0.19.1",
"typedoc": " 0.17.0-3",
"typescript": "^4.0.3"
},
"dependencies": {

Просмотреть файл

@ -1,6 +1,6 @@
# etcd3 [![Run Tests](https://github.com/microsoft/etcd3/workflows/Run%20Tests/badge.svg)](https://github.com/microsoft/etcd3/actions?query=workflow%3A%22Run+Tests%22)
etcd3 aims to be (with its first stable release) a high-quality, production-ready client for the Protocol Buffer-based etcdv3 API. It includes:
etcd3 aims is a high-quality, production-ready client for the Protocol Buffer-based etcdv3 API. It includes:
- [load balancing](https://microsoft.github.io/etcd3/interfaces/ioptions.html)
- [fault handling and reconnections](https://microsoft.github.io/etcd3/interfaces/ioptions.html#faulthandling)

Просмотреть файл

@ -1,26 +1,26 @@
/*---------------------------------------------------------
* Copyright (C) Microsoft Corporation. All rights reserved.
*--------------------------------------------------------*/
import { loadSync } from '@grpc/proto-loader';
import * as grpc from '@grpc/grpc-js';
import { ChannelOptions } from '@grpc/grpc-js/build/src/channel-options';
import { loadSync } from '@grpc/proto-loader';
import {
isBrokenCircuitError,
Policy,
IPolicy,
ConsecutiveBreaker,
IDefaultPolicyContext,
IPolicy,
isBrokenCircuitError,
Policy,
} from 'cockatiel';
import {
castGrpcError,
EtcdInvalidAuthTokenError,
ClientRuntimeError,
ClientClosedError,
ClientRuntimeError,
EtcdInvalidAuthTokenError,
isRecoverableError,
} from './errors';
import { IOptions } from './options';
import { ICallable, Services } from './rpc';
import { CallContext, ICallable, Services } from './rpc';
import { resolveCallOptions } from './util';
const packageDefinition = loadSync(`${__dirname}/../proto/rpc.proto`, {
keepCase: true,
@ -108,11 +108,22 @@ class Authenticator {
const meta = new grpc.Metadata();
const host = removeProtocolPrefix(hosts[index]);
const context: CallContext = {
method: 'authenticate',
params: { name: auth.username, password: auth.password },
service: 'Auth',
isStream: false,
};
return this.getCredentialsFromHost(
host,
auth.username,
auth.password,
auth.callOptions,
resolveCallOptions(
resolveCallOptions(undefined, auth.callOptions, context),
resolveCallOptions(undefined, this.options.defaultCallOptions, context),
context,
),
this.credentials,
)
.then(token => {
@ -199,7 +210,7 @@ export class Host {
// workaround: https://github.com/grpc/grpc-node/issues/1487
const state = service.getChannel().getConnectivityState(false);
if (state === grpc.connectivityState.CONNECTING) {
service.waitForReady(Date.now() + 10_0000, () => service.close());
service.waitForReady(Date.now() + 10_00, () => service.close());
} else {
service.close();
}
@ -230,6 +241,8 @@ export class ConnectionPool implements ICallable<Host> {
*/
public static deterministicOrder = false;
public readonly callOptionsFactory = this.options.defaultCallOptions;
private readonly hosts: Host[];
private readonly globalPolicy: IPolicy<IDefaultPolicyContext> =
this.options.faultHandling?.global ?? Policy.handleWhen(isRecoverableError).retry().attempts(3);
@ -295,8 +308,15 @@ export class ConnectionPool implements ICallable<Host> {
this.withConnection(
serviceName,
async ({ client, metadata }) => {
const resolvedOpts = resolveCallOptions(options, this.callOptionsFactory, {
service: serviceName,
method,
params: payload,
isStream: false,
} as CallContext);
try {
return await runServiceCall(client, metadata, options, method, payload);
return await runServiceCall(client, metadata, resolvedOpts, method, payload);
} catch (err) {
if (err instanceof EtcdInvalidAuthTokenError) {
this.authenticator.invalidateMetadata();

Просмотреть файл

@ -27,6 +27,8 @@ function leaseExpired(lease: RPC.ILeaseKeepAliveResponse) {
* put requests before executing them.
*/
class LeaseClientWrapper implements RPC.ICallable<Host> {
public readonly callOptionsFactory = this.pool.callOptionsFactory;
constructor(
private pool: ConnectionPool,
private readonly lease: {

Просмотреть файл

@ -1,9 +1,13 @@
/*---------------------------------------------------------
* Copyright (C) Microsoft Corporation. All rights reserved.
*--------------------------------------------------------*/
import { ChannelOptions } from '@grpc/grpc-js/build/src/channel-options';
import { CallOptions } from '@grpc/grpc-js';
import { IPolicy, IBackoff, IDefaultPolicyContext } from 'cockatiel';
import { CallContext } from './rpc';
export type CallOptionsFactory = CallOptions | ((context: CallContext) => CallOptions);
/**
* IOptions are passed into the client constructor to configure how the client
@ -60,9 +64,40 @@ export interface IOptions {
/**
* Call options to use for the password-to-token exchange.
*/
callOptions?: CallOptions;
callOptions?: CallOptionsFactory;
};
/**
* Default call options used for all requests. This can be an object, or a
* function which will be called for each etcd method call. As a function,
* it will be called with a context object, which looks like:
* ```js
* {
* service: 'KV', // etcd service name
* method: 'range', // etcd method name
* isStream: false, // whether the call create a stream
* params: { ... }, // arguments given to the call
* }
* ```
*
* For example, this will set a 10 second timeout on all calls which are not streams:
*
* ```js
* const etcd3 = new Etcd3({
* defaultCallOptions: context => context.isStream ? {} : Date.now() + 10000,
* });
* ```
*
* The default options are shallow merged with any call-specific options.
* For example this will always result in a 5 second timeout, regardless of
* what the `defaultCallOptions` contains:
*
* ```js
* etcd3.get('foo').options({ deadline: Date.now() + 5000 })
* ```
*/
defaultCallOptions?: CallOptionsFactory;
/**
* A list of hosts to connect to. Hosts should include the `https?://` prefix.
*/

Просмотреть файл

@ -7,6 +7,8 @@
/* eslint-disable @typescript-eslint/no-empty-interface */
import * as grpc from '@grpc/grpc-js';
import type { CallOptionsFactory } from './options';
import { resolveCallOptions } from './util';
export interface ICallable<T> {
exec<T>(
@ -22,6 +24,8 @@ export interface ICallable<T> {
): Promise<R>;
markFailed(resource: T, error: Error): void;
readonly callOptionsFactory: CallOptionsFactory | undefined;
}
export interface IResponseStream<T> {
@ -39,7 +43,7 @@ export interface IRequestStream<T> {
export interface IDuplexStream<T, R> extends IRequestStream<T>, IResponseStream<R> {}
export class KVClient {
constructor(private client: ICallable<unknown>) {}
constructor(private readonly client: ICallable<unknown>) {}
/**
* Range gets the keys in the range from the key-value store.
*/
@ -88,7 +92,7 @@ export class KVClient {
}
export class WatchClient {
constructor(private client: ICallable<unknown>) {}
constructor(private readonly client: ICallable<unknown>) {}
/**
* Watch watches for events happening or that have happened. Both input and output
* are streams; the input stream is for creating and canceling watchers and the output
@ -99,7 +103,13 @@ export class WatchClient {
public watch(options?: grpc.CallOptions): Promise<IDuplexStream<IWatchRequest, IWatchResponse>> {
return this.client.withConnection('Watch', ({ resource, client, metadata }) => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const stream = (<any>client).watch(metadata, options);
const resolved = resolveCallOptions(options, this.client.callOptionsFactory, {
service: 'Watch',
method: 'watch',
isStream: true,
});
const stream = (<any>client).watch(metadata, resolved);
stream.on('error', (err: Error) => stream.writable && this.client.markFailed(resource, err));
return stream;
});
@ -107,7 +117,7 @@ export class WatchClient {
}
export class LeaseClient {
constructor(private client: ICallable<unknown>) {}
constructor(private readonly client: ICallable<unknown>) {}
/**
* LeaseGrant creates a lease which expires if the server does not receive a keepAlive
* within a given time to live period. All keys attached to the lease will be expired and
@ -137,7 +147,13 @@ export class LeaseClient {
): Promise<IDuplexStream<ILeaseKeepAliveRequest, ILeaseKeepAliveResponse>> {
return this.client.withConnection('Lease', ({ resource, client, metadata }) => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const stream = (<any>client).leaseKeepAlive(metadata, options);
const resolved = resolveCallOptions(options, this.client.callOptionsFactory, {
service: 'Lease',
method: 'leaseKeepAlive',
isStream: true,
});
const stream = (<any>client).leaseKeepAlive(metadata, resolved);
stream.on('error', (err: Error) => stream.writable && this.client.markFailed(resource, err));
return stream;
});
@ -160,7 +176,7 @@ export class LeaseClient {
}
export class ClusterClient {
constructor(private client: ICallable<unknown>) {}
constructor(private readonly client: ICallable<unknown>) {}
/**
* MemberAdd adds a member into the cluster.
*/
@ -209,7 +225,7 @@ export class ClusterClient {
}
export class MaintenanceClient {
constructor(private client: ICallable<unknown>) {}
constructor(private readonly client: ICallable<unknown>) {}
/**
* Alarm activates, deactivates, and queries alarms regarding cluster health.
*/
@ -251,8 +267,14 @@ export class MaintenanceClient {
*/
public snapshot(options?: grpc.CallOptions): Promise<IResponseStream<ISnapshotResponse>> {
return this.client.withConnection('Maintenance', ({ resource, client, metadata }) => {
const resolved = resolveCallOptions(options, this.client.callOptionsFactory, {
service: 'Maintenance',
method: 'snapshot',
isStream: true,
});
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const stream = (<any>client).snapshot(metadata, options, {});
const stream = (<any>client).snapshot(metadata, resolved, {});
stream.on('error', (err: Error) => this.client.markFailed(resource, err));
return stream;
});
@ -278,7 +300,7 @@ export class MaintenanceClient {
}
export class AuthClient {
constructor(private client: ICallable<unknown>) {}
constructor(private readonly client: ICallable<unknown>) {}
/**
* AuthEnable enables authentication.
*/
@ -1405,3 +1427,70 @@ export const Services = {
Maintenance: MaintenanceClient,
Auth: AuthClient,
};
export type CallContext =
| { service: 'KV'; method: 'range'; isStream: false; params: IRangeRequest }
| { service: 'KV'; method: 'put'; isStream: false; params: IPutRequest }
| { service: 'KV'; method: 'deleteRange'; isStream: false; params: IDeleteRangeRequest }
| { service: 'KV'; method: 'txn'; isStream: false; params: ITxnRequest }
| { service: 'KV'; method: 'compact'; isStream: false; params: ICompactionRequest }
| { service: 'Watch'; method: 'watch'; isStream: true }
| { service: 'Lease'; method: 'leaseGrant'; isStream: false; params: ILeaseGrantRequest }
| { service: 'Lease'; method: 'leaseRevoke'; isStream: false; params: ILeaseRevokeRequest }
| { service: 'Lease'; method: 'leaseKeepAlive'; isStream: true }
| {
service: 'Lease';
method: 'leaseTimeToLive';
isStream: false;
params: ILeaseTimeToLiveRequest;
}
| { service: 'Lease'; method: 'leaseLeases'; isStream: false }
| { service: 'Cluster'; method: 'memberAdd'; isStream: false; params: IMemberAddRequest }
| { service: 'Cluster'; method: 'memberRemove'; isStream: false; params: IMemberRemoveRequest }
| { service: 'Cluster'; method: 'memberUpdate'; isStream: false; params: IMemberUpdateRequest }
| { service: 'Cluster'; method: 'memberList'; isStream: false; params: IMemberListRequest }
| { service: 'Cluster'; method: 'memberPromote'; isStream: false; params: IMemberPromoteRequest }
| { service: 'Maintenance'; method: 'alarm'; isStream: false; params: IAlarmRequest }
| { service: 'Maintenance'; method: 'status'; isStream: false }
| { service: 'Maintenance'; method: 'defragment'; isStream: false }
| { service: 'Maintenance'; method: 'hash'; isStream: false }
| { service: 'Maintenance'; method: 'hashKV'; isStream: false; params: IHashKVRequest }
| { service: 'Maintenance'; method: 'snapshot'; isStream: true }
| { service: 'Maintenance'; method: 'moveLeader'; isStream: false; params: IMoveLeaderRequest }
| { service: 'Maintenance'; method: 'downgrade'; isStream: false; params: IDowngradeRequest }
| { service: 'Auth'; method: 'authEnable'; isStream: false }
| { service: 'Auth'; method: 'authDisable'; isStream: false }
| { service: 'Auth'; method: 'authStatus'; isStream: false }
| { service: 'Auth'; method: 'authenticate'; isStream: false; params: IAuthenticateRequest }
| { service: 'Auth'; method: 'userAdd'; isStream: false; params: IAuthUserAddRequest }
| { service: 'Auth'; method: 'userGet'; isStream: false; params: IAuthUserGetRequest }
| { service: 'Auth'; method: 'userList'; isStream: false }
| { service: 'Auth'; method: 'userDelete'; isStream: false; params: IAuthUserDeleteRequest }
| {
service: 'Auth';
method: 'userChangePassword';
isStream: false;
params: IAuthUserChangePasswordRequest;
}
| { service: 'Auth'; method: 'userGrantRole'; isStream: false; params: IAuthUserGrantRoleRequest }
| {
service: 'Auth';
method: 'userRevokeRole';
isStream: false;
params: IAuthUserRevokeRoleRequest;
}
| { service: 'Auth'; method: 'roleAdd'; isStream: false; params: IAuthRoleAddRequest }
| { service: 'Auth'; method: 'roleGet'; isStream: false; params: IAuthRoleGetRequest }
| { service: 'Auth'; method: 'roleList'; isStream: false }
| { service: 'Auth'; method: 'roleDelete'; isStream: false; params: IAuthRoleDeleteRequest }
| {
service: 'Auth';
method: 'roleGrantPermission';
isStream: false;
params: IAuthRoleGrantPermissionRequest;
}
| {
service: 'Auth';
method: 'roleRevokePermission';
isStream: false;
params: IAuthRoleRevokePermissionRequest;
};

Просмотреть файл

@ -1,6 +1,11 @@
/*---------------------------------------------------------
* Copyright (C) Microsoft Corporation. All rights reserved.
*--------------------------------------------------------*/
// Uncomment for verbose GRPC traces:
// process.env.GRPC_VERBOSITY = 'DEBUG';
// process.env.GRPC_TRACE = 'all';
import * as chai from 'chai';
import { ConnectionPool } from '../connection-pool';

Просмотреть файл

@ -2,12 +2,12 @@
* Copyright (C) Microsoft Corporation. All rights reserved.
*--------------------------------------------------------*/
import { expect } from 'chai';
import { Policy } from 'cockatiel';
import { stub } from 'sinon';
import { IOptions, KVClient } from '..';
import { ConnectionPool } from '../connection-pool';
import { GRPCDeadlineExceededError, GRPCUnavailableError } from '../errors';
import { getHost, getOptions } from './util';
import { Policy } from 'cockatiel';
import { GRPCUnavailableError } from '../errors';
function getOptionsWithBadHost(options: Partial<IOptions> = {}): IOptions {
return getOptions({
@ -38,6 +38,20 @@ describe('connection pool', () => {
await kv.deleteRange({ key });
});
it('applies call options', async () => {
const optsStub = stub()
.onFirstCall()
.returns({ deadline: new Date(0) })
.onSecondCall()
.returns({ deadline: new Date(Date.now() + 30_000) });
const pool = new ConnectionPool({ ...getOptions(), defaultCallOptions: optsStub });
const kv = new KVClient(pool);
await expect(kv.range({ key })).to.be.rejectedWith(GRPCDeadlineExceededError);
expect(await kv.range({ key })).be.ok;
});
it('rejects instantiating with a mix of secure and unsecure hosts', () => {
expect(
() =>

Просмотреть файл

@ -1,9 +1,12 @@
/*---------------------------------------------------------
* Copyright (C) Microsoft Corporation. All rights reserved.
*--------------------------------------------------------*/
import { EventEmitter } from 'events';
import { CallOptions } from '@grpc/grpc-js';
import { EventEmitter } from 'events';
import { ClientRuntimeError } from './errors';
import { CallOptionsFactory } from './options';
import { CallContext, Services } from './rpc';
export const zeroKey = Buffer.from([0]);
export const emptyKey = Buffer.from([]);
@ -237,3 +240,29 @@ export abstract class PromiseWrap<T> implements PromiseLike<T> {
*/
protected abstract createPromise(): Promise<T>;
}
export interface ICallContext {
service: keyof typeof Services;
method: string;
params: unknown;
}
/**
* Applies the defaultOptions or defaultOptions factory to the given
* call-specific options.
*/
export const resolveCallOptions = (
callOptions: CallOptions | undefined,
defaultOptions: undefined | CallOptionsFactory,
context: CallContext,
): CallOptions | undefined => {
if (defaultOptions === undefined) {
return callOptions;
}
if (typeof defaultOptions === 'function') {
defaultOptions = defaultOptions(context);
}
return callOptions ? { ...defaultOptions, ...callOptions } : defaultOptions;
};