зеркало из
1
0
Форкнуть 0
* initial commit

* update tests and fix bugs of datetime parse and when switch devices

* update Message type in electron

* add return type to eventHubHandler
This commit is contained in:
YingXue 2021-02-18 14:09:24 -08:00 коммит произвёл GitHub
Родитель 0a462ca90b
Коммит ea9e18d17b
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
14 изменённых файлов: 259 добавлений и 70 удалений

Просмотреть файл

@ -9,6 +9,8 @@ export const PLATFORMS = {
export const MESSAGE_CHANNELS = {
DEVICE_SEND_MESSAGE: 'device_sendMessage',
DIRECTORY_GET_DIRECTORIES: 'directory_getDirectories',
EVENTHUB_START_MONITORING: 'eventhub_startMonitoring',
EVENTHUB_STOP_MONITORING: 'eventhub_stopMonitoring',
MODEL_REPOSITORY_GET_DEFINITION: 'model_definition',
SETTING_HIGH_CONTRAST: 'setting_highContrast',
};
@ -16,6 +18,7 @@ export const MESSAGE_CHANNELS = {
export const API_INTERFACES = {
DEVICE: 'api_device',
DIRECTORY: 'api_directory',
EVENTHUB: 'api_eventhub',
MODEL_DEFINITION: 'api_modelDefinition',
SETTINGS: 'api_settings'
};

Просмотреть файл

@ -7,9 +7,11 @@ import { generateSettingsInterface } from './factories/settingsInterfaceFactory'
import { generateDirectoryInterface } from './factories/directoryInterfaceFactory';
import { generateModelRepositoryInterface } from './factories/modelRepositoryInterfaceFactory';
import { generateDeviceInterface } from './factories/deviceInterfaceFactory';
import { generateEventHubInterface } from './factories/eventHubInterfaceFactory';
import { API_INTERFACES } from './constants';
contextBridge.exposeInMainWorld(API_INTERFACES.DEVICE, generateDeviceInterface());
contextBridge.exposeInMainWorld(API_INTERFACES.DIRECTORY, generateDirectoryInterface());
contextBridge.exposeInMainWorld(API_INTERFACES.EVENTHUB, generateEventHubInterface());
contextBridge.exposeInMainWorld(API_INTERFACES.MODEL_DEFINITION, generateModelRepositoryInterface());
contextBridge.exposeInMainWorld(API_INTERFACES.SETTINGS, generateSettingsInterface());

Просмотреть файл

@ -10,6 +10,7 @@ import { onSettingsHighContrast } from './handlers/settingsHandler';
import { onGetInterfaceDefinition } from './handlers/modelRepositoryHandler';
import { onGetDirectories } from './handlers/directoryHandler';
import { onSendMessageToDevice } from './handlers/deviceHandler';
import { onStartMonitoring, onStopMonitoring } from './handlers/eventHubHandler';
import { formatError } from './utils/errorHelper';
import '../dist/server/serverElectron';
@ -31,6 +32,8 @@ class Main {
Main.registerHandler(MESSAGE_CHANNELS.MODEL_REPOSITORY_GET_DEFINITION, onGetInterfaceDefinition);
Main.registerHandler(MESSAGE_CHANNELS.DIRECTORY_GET_DIRECTORIES, onGetDirectories);
Main.registerHandler(MESSAGE_CHANNELS.DEVICE_SEND_MESSAGE, onSendMessageToDevice);
Main.registerHandler(MESSAGE_CHANNELS.EVENTHUB_START_MONITORING, onStartMonitoring);
Main.registerHandler(MESSAGE_CHANNELS.EVENTHUB_STOP_MONITORING, onStopMonitoring);
}
private static setApplicationLock(): void {

Просмотреть файл

@ -0,0 +1,18 @@
/***********************************************************
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License
**********************************************************/
import { MESSAGE_CHANNELS } from '../constants';
import { EventHubInterface, StartEventHubMonitoringParameters, Message } from '../interfaces/eventHubInterface';
import { invokeInMainWorld } from '../utils/invokeHelper';
export const generateEventHubInterface = (): EventHubInterface => {
return {
startEventHubMonitoring: async (params: StartEventHubMonitoringParameters): Promise<Message[]> => {
return invokeInMainWorld<Message[]>(MESSAGE_CHANNELS.EVENTHUB_START_MONITORING, params);
},
stopEventHubMonitoring: async (): Promise<void> => {
return invokeInMainWorld<void>(MESSAGE_CHANNELS.EVENTHUB_STOP_MONITORING);
}
};
};

Просмотреть файл

@ -0,0 +1,142 @@
/***********************************************************
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License
**********************************************************/
import { IpcMainInvokeEvent } from 'electron';
import { EventHubClient, EventPosition, ReceiveHandler } from '@azure/event-hubs';
import { Message, StartEventHubMonitoringParameters } from '../interfaces/eventHubInterface';
let client: EventHubClient = null;
let messages: Message[] = [];
let receivers: ReceiveHandler[] = [];
let connectionString: string = ''; // would equal `${hubConnectionString}` or `${customEventHubConnectionString}/${customEventHubName}`
let deviceId: string = '';
const IOTHUB_CONNECTION_DEVICE_ID = 'iothub-connection-device-id';
export const onStartMonitoring = async (event: IpcMainInvokeEvent, params: StartEventHubMonitoringParameters): Promise<Message[]>=> {
return eventHubProvider(params).then(result => {
return result;
});
}
export const onStopMonitoring = async (): Promise<void> => {
try {
return stopClient();
} catch (error) {
// swallow the error as we set client to null anyways
}
}
const eventHubProvider = async (params: StartEventHubMonitoringParameters) => {
if (needToCreateNewEventHubClient(params))
{
// hub has changed, reinitialize client, receivers and mesages
client = params.customEventHubConnectionString ?
await EventHubClient.createFromConnectionString(params.customEventHubConnectionString, params.customEventHubName) :
await EventHubClient.createFromIotHubConnectionString(params.hubConnectionString);
connectionString = params.customEventHubConnectionString ?
`${params.customEventHubConnectionString}/${params.customEventHubName}` :
params.hubConnectionString;
receivers = [];
messages = [];
}
updateDeviceIdIfNecessary(params);
return listeningToMessages(client, params);
};
const listeningToMessages = async (eventHubClient: EventHubClient, params: StartEventHubMonitoringParameters) => {
if (params.startListeners || !receivers) {
const partitionIds = await client.getPartitionIds();
const hubInfo = await client.getHubRuntimeInformation();
const startTime = params.startTime ? Date.parse(params.startTime) : Date.now();
partitionIds && partitionIds.forEach(async (partitionId: string) => {
const receiveOptions = {
consumerGroup: params.consumerGroup,
enableReceiverRuntimeMetric: true,
eventPosition: EventPosition.fromEnqueuedTime(startTime),
name: `${hubInfo.path}_${partitionId}`,
};
const receiver = eventHubClient.receive(
partitionId,
onMessageReceived,
(err: object) => {},
receiveOptions);
receivers.push(receiver);
});
}
return handleMessages();
};
const handleMessages = () => {
let results: Message[] = [];
messages.forEach(message => {
if (!results.some(result => result.systemProperties?.['x-opt-sequence-number'] === message.systemProperties?.['x-opt-sequence-number'])) {
// if user click stop/start too refrequently, it's possible duplicate receivers are created before the cleanup happens as it's async
// remove duplicate messages before proper cleanup is finished
results.push(message);
}
})
messages = []; // empty the array everytime the result is returned
return results;
}
const stopClient = async () => {
return stopReceivers().then(() => {
return client && client.close().catch(error => {
console.log(`client cleanup error: ${error}`); // swallow the error as we will cleanup anyways
});
}).finally (() => {
client = null;
receivers = [];
});
};
const stopReceivers = async () => {
return Promise.all(
receivers.map(receiver => {
if (receiver && (receiver.isReceiverOpen === undefined || receiver.isReceiverOpen)) {
return stopReceiver(receiver);
} else {
return null;
}
})
);
};
const stopReceiver = async (receiver: ReceiveHandler) => {
receiver.stop().catch((err: object) => {
throw new Error(`receivers cleanup error: ${err}`);
});
}
const needToCreateNewEventHubClient = (parmas: StartEventHubMonitoringParameters): boolean => {
return !client ||
parmas.hubConnectionString && parmas.hubConnectionString !== connectionString ||
parmas.customEventHubConnectionString && `${parmas.customEventHubConnectionString}/${parmas.customEventHubName}` !== connectionString;
}
const updateDeviceIdIfNecessary = (parmas: StartEventHubMonitoringParameters) => {
if( !deviceId || parmas.deviceId !== deviceId)
{
deviceId = parmas.deviceId;
messages = [];
}
}
const onMessageReceived = async (eventData: any) => {
if (eventData && eventData.annotations && eventData.annotations[IOTHUB_CONNECTION_DEVICE_ID] === deviceId) {
const message: Message = {
body: eventData.body,
enqueuedTime: eventData.enqueuedTimeUtc.toString(),
properties: eventData.applicationProperties
};
message.systemProperties = eventData.annotations;
messages.push(message);
}
};

Просмотреть файл

@ -0,0 +1,22 @@
/***********************************************************
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License
**********************************************************/
import { Message as MessageInterface } from '../../src/app/api/models/messages';
export interface StartEventHubMonitoringParameters {
deviceId: string;
consumerGroup: string;
startTime: string;
startListeners: boolean;
customEventHubName?: string;
customEventHubConnectionString?: string;
hubConnectionString?: string;
}
export type Message = MessageInterface;
export interface EventHubInterface {
startEventHubMonitoring(params: StartEventHubMonitoringParameters): Promise<Message[]>;
stopEventHubMonitoring(): Promise<void>;
}

Просмотреть файл

@ -31,11 +31,11 @@ export interface FetchDevicesParameters {
export interface MonitorEventsParameters {
deviceId: string;
consumerGroup: string;
startListeners: boolean;
customEventHubName?: string;
customEventHubConnectionString?: string;
hubConnectionString?: string;
startTime?: Date;
}

Просмотреть файл

@ -610,58 +610,40 @@ describe('deviceTwinService', () => {
customEventHubConnectionString: undefined,
deviceId,
hubConnectionString: undefined,
startListeners: true,
startTime: undefined
};
it ('returns if hubConnectionString is not specified', () => {
expect(DevicesService.monitorEvents(parameters)).toEqual(emptyPromise);
});
it('calls fetch with specified parameters and invokes monitorEvents when response is 200', async () => {
it('calls startEventHubMonitoring with expected parameters', async () => {
jest.spyOn(DataplaneService, 'dataPlaneConnectionHelper').mockResolvedValue({
connectionInfo: getConnectionInfoFromConnectionString(connectionString), connectionString, sasToken});
// tslint:disable
const responseBody = [{'body':{'temp':0},'enqueuedTime':'2019-09-06T17:47:11.334Z','properties':{'iothub-message-schema':'temp'}}];
const response = {
json: () => responseBody,
status: 200
} as any;
// tslint:enable
jest.spyOn(window, 'fetch').mockResolvedValue(response);
const result = await DevicesService.monitorEvents(parameters);
const startEventHubMonitoring = jest.fn();
jest.spyOn(interfaceUtils, 'getEventHubInterface').mockReturnValue({
startEventHubMonitoring,
stopEventHubMonitoring: jest.fn()
});
const eventHubRequestParameters = {
await DevicesService.monitorEvents(parameters);
expect(startEventHubMonitoring).toBeCalledWith({
...parameters,
hubConnectionString: connectionString,
startTime: parameters.startTime && parameters.startTime.toISOString()
};
const serviceRequestParams = {
body: JSON.stringify(eventHubRequestParameters),
cache: 'no-cache',
credentials: 'include',
headers,
method: HTTP_OPERATION_TYPES.Post,
mode: 'cors',
};
expect(fetch).toBeCalledWith(DevicesService.EVENTHUB_MONITOR_ENDPOINT, serviceRequestParams);
expect(result).toEqual(responseBody);
});
});
});
context('stopMonitoringEvents', () => {
it('calls fetch with specified parameters', () => {
DevicesService.stopMonitoringEvents();
const serviceRequestParams = {
body: JSON.stringify({}),
cache: 'no-cache',
credentials: 'include',
headers,
method: HTTP_OPERATION_TYPES.Post,
mode: 'cors',
};
expect(fetch).toBeCalledWith(DevicesService.EVENTHUB_STOP_ENDPOINT, serviceRequestParams);
it('calls stopEventHubMonitoring', async () => {
const stopEventHubMonitoring = jest.fn();
jest.spyOn(interfaceUtils, 'getEventHubInterface').mockReturnValue({
startEventHubMonitoring: jest.fn(),
stopEventHubMonitoring
});
await DevicesService.stopMonitoringEvents();
expect(stopEventHubMonitoring).toBeCalled();
});
});
});

Просмотреть файл

@ -19,18 +19,15 @@ import { CONTROLLER_API_ENDPOINT,
MONITOR,
STOP,
HEADERS,
CLOUD_TO_DEVICE,
HTTP_OPERATION_TYPES,
DataPlaneStatusCode,
HUB_DATA_PLANE_API_VERSION
} from '../../constants/apiConstants';
import { buildQueryString } from '../shared/utils';
import { Message } from '../models/messages';
import { Twin, Device, DataPlaneResponse } from '../models/device';
import { DeviceIdentity } from '../models/deviceIdentity';
import { parseEventHubMessage } from './eventHubMessageHelper';
import { dataPlaneConnectionHelper, dataPlaneResponseHelper, request, DATAPLANE_CONTROLLER_ENDPOINT, DataPlaneRequest } from './dataplaneServiceHelper';
import { getDeviceInterface } from '../shared/interfaceUtils';
import { getDeviceInterface, getEventHubInterface } from '../shared/interfaceUtils';
const EVENTHUB_CONTROLLER_ENDPOINT = `${CONTROLLER_API_ENDPOINT}${EVENTHUB}`;
export const EVENTHUB_MONITOR_ENDPOINT = `${EVENTHUB_CONTROLLER_ENDPOINT}${MONITOR}`;
@ -243,12 +240,14 @@ export const deleteDevices = async (parameters: DeleteDevicesParameters) => {
// tslint:disable-next-line:cyclomatic-complexity
export const monitorEvents = async (parameters: MonitorEventsParameters): Promise<Message[]> => {
const api = getEventHubInterface();
let requestParameters = {
...parameters,
startTime: parameters.startTime && parameters.startTime.toISOString()
};
// if either of the info about custom event hub is not provided, use default hub connection string to connect to event hub
// if no custom event hub info is provided, use default hub connection string to connect to event hub
if (!parameters.customEventHubConnectionString || !parameters.customEventHubName) {
const connectionInfo = await dataPlaneConnectionHelper();
requestParameters = {
@ -257,17 +256,11 @@ export const monitorEvents = async (parameters: MonitorEventsParameters): Promis
};
}
const response = await request(EVENTHUB_MONITOR_ENDPOINT, requestParameters);
if (response.status === DataPlaneStatusCode.SuccessLowerBound) {
const messages = await response.json() as Message[];
return messages && messages.length && messages.length !== 0 && messages.map(message => parseEventHubMessage(message)) || [];
}
else {
const error = await response.json();
throw new Error(error && error.name);
}
const result = await api.startEventHubMonitoring(requestParameters);
return result;
};
export const stopMonitoringEvents = async (): Promise<void> => {
await request(EVENTHUB_STOP_ENDPOINT, {});
const api = getEventHubInterface();
await api.stopEventHubMonitoring();
};

Просмотреть файл

@ -2,7 +2,7 @@
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License
**********************************************************/
import { appConfig, HostMode, H } from '../../../appConfig/appConfig';
import { appConfig, HostMode } from '../../../appConfig/appConfig';
import { HIGH_CONTRAST } from '../../constants/browserStorage';
import { API_INTERFACES } from '../../../../public/constants';
import * as interfaceUtils from './interfaceUtils';
@ -93,3 +93,18 @@ describe('getLocalModelRepositoryInterface', () => {
expect(factory).toHaveBeenLastCalledWith(API_INTERFACES.MODEL_DEFINITION);
});
});
describe('getEventHubInterface', () => {
it('throws exception when mode is not electron', () => {
appConfig.hostMode = HostMode.Browser;
expect(() => interfaceUtils.getEventHubInterface()).toThrowError(interfaceUtils.NOT_AVAILABLE);
});
it('calls expected factory when mode is electron', () => {
appConfig.hostMode = HostMode.Electron;
const factory = jest.spyOn(interfaceUtils, 'getElectronInterface');
interfaceUtils.getEventHubInterface()
expect(factory).toHaveBeenLastCalledWith(API_INTERFACES.EVENTHUB);
});
});

Просмотреть файл

@ -6,6 +6,7 @@ import { SettingsInterface } from '../../../../public/interfaces/settingsInterfa
import { DeviceInterface } from '../../../../public/interfaces/deviceInterface';
import { DirectoryInterface } from '../../../../public/interfaces/directoryInterface';
import { ModelRepositoryInterface } from '../../../../public/interfaces/modelRepositoryInterface';
import { EventHubInterface } from './../../../../public/interfaces/eventHubInterface';
import { API_INTERFACES } from '../../../../public/constants';
import { appConfig, HostMode } from '../../../appConfig/appConfig';
import { HIGH_CONTRAST } from '../../constants/browserStorage';
@ -50,6 +51,14 @@ export const getDirectoryInterface = (): DirectoryInterface => {
return getElectronInterface(API_INTERFACES.DIRECTORY);
};
export const getEventHubInterface = (): EventHubInterface => {
if (appConfig.hostMode !== HostMode.Electron) {
throw new Error(NOT_AVAILABLE);
}
return getElectronInterface(API_INTERFACES.EVENTHUB);
};
export const getElectronInterface = <T>(name: string): T => {
// tslint:disable-next-line: no-any no-string-literal
const api = (window as any)[name];

Просмотреть файл

@ -66,7 +66,7 @@ exports[`deviceEvents deviceEvents in non-pnp context matches snapshot 1`] = `
key="0"
>
<h5>
9:44:58 PM, 10/14/2019
Wed Feb 17 2021 16:06:00 GMT-0800 (Pacific Standard Time)
:
</h5>
<pre>
@ -74,7 +74,7 @@ exports[`deviceEvents deviceEvents in non-pnp context matches snapshot 1`] = `
"body": {
"humid": 123
},
"enqueuedTime": "2019-10-14T21:44:58.397Z",
"enqueuedTime": "Wed Feb 17 2021 16:06:00 GMT-0800 (Pacific Standard Time)",
"properties": {
"iothub-message-schema": "humid"
}

Просмотреть файл

@ -7,11 +7,9 @@ import * as React from 'react';
import { act } from 'react-dom/test-utils';
import { shallow, mount } from 'enzyme';
import { TextField } from 'office-ui-fabric-react/lib/components/TextField';
import { Toggle } from 'office-ui-fabric-react/lib/components/Toggle';
import { CommandBar } from 'office-ui-fabric-react/lib/components/CommandBar';
import { Shimmer } from 'office-ui-fabric-react/lib/components/Shimmer';
import { DeviceEvents } from './deviceEvents';
import { appConfig, HostMode } from '../../../../appConfig/appConfig';
import { DEFAULT_CONSUMER_GROUP } from '../../../constants/apiConstants';
import { startEventsMonitoringAction } from '../actions';
import * as AsyncSagaReducer from '../../../shared/hooks/useAsyncSagaReducer';
@ -36,7 +34,7 @@ describe('deviceEvents', () => {
body: {
humid: 123
},
enqueuedTime: '2019-10-14T21:44:58.397Z',
enqueuedTime: 'Wed Feb 17 2021 16:06:00 GMT-0800 (Pacific Standard Time)',
properties: {
'iothub-message-schema': 'humid'
}
@ -87,6 +85,7 @@ describe('deviceEvents', () => {
expect(startEventsMonitoringSpy.mock.calls[0][0]).toEqual({
consumerGroup: DEFAULT_CONSUMER_GROUP,
deviceId: 'device1',
startListeners: true,
startTime: undefined
});
});
@ -182,7 +181,7 @@ describe('deviceEvents', () => {
body: {
humid: '123' // intentionally set a value which type is double
},
enqueuedTime: '2019-10-14T21:44:58.397Z',
enqueuedTime: 'Wed Feb 17 2021 16:06:00 GMT-0800 (Pacific Standard Time)',
systemProperties: {
'iothub-message-schema': 'humid'
}
@ -206,7 +205,7 @@ describe('deviceEvents', () => {
body: {
'non-matching-key': 0
},
enqueuedTime: '2019-10-14T21:44:58.397Z',
enqueuedTime: 'Wed Feb 17 2021 16:06:00 GMT-0800 (Pacific Standard Time)',
systemProperties: {
'iothub-message-schema': 'humid'
}
@ -231,7 +230,7 @@ describe('deviceEvents', () => {
'humid': 0,
'humid-foo': 'test'
},
enqueuedTime: '2019-10-14T21:44:58.397Z',
enqueuedTime: 'Wed Feb 17 2021 16:06:00 GMT-0800 (Pacific Standard Time)',
systemProperties: {}
}];
jest.spyOn(AsyncSagaReducer, 'useAsyncSagaReducer').mockReturnValue([{
@ -258,7 +257,7 @@ describe('deviceEvents', () => {
body: {
'non-matching-key': 0
},
enqueuedTime: '2019-10-14T21:44:58.397Z'
enqueuedTime: 'Wed Feb 17 2021 16:06:00 GMT-0800 (Pacific Standard Time)'
}];
jest.spyOn(AsyncSagaReducer, 'useAsyncSagaReducer').mockReturnValue([{
payload: events,

Просмотреть файл

@ -41,7 +41,7 @@ import { StartTime } from './startTime';
import './deviceEvents.scss';
const JSON_SPACES = 2;
const LOADING_LOCK = 2000;
const LOADING_LOCK = 8000;
export const DeviceEvents: React.FC = () => {
const { t } = useTranslation();
@ -107,7 +107,7 @@ export const DeviceEvents: React.FC = () => {
if (monitoringData) {
setStartTime(new Date());
setTimeout(() => {
fetchData();
fetchData(false)();
}, LOADING_LOCK);
}
else {
@ -140,7 +140,7 @@ export const DeviceEvents: React.FC = () => {
setShowPnpModeledEvents={setShowPnpModeledEvents}
setShowSimulationPanel={setShowSimulationPanel}
dispatch={dispatch}
fetchData={fetchData}
fetchData={fetchData(true)}
/>
);
};
@ -222,7 +222,7 @@ export const DeviceEvents: React.FC = () => {
};
return (
<article key={index} className="device-events-content">
{<h5>{parseDateTimeString(modifiedEvents.enqueuedTime)}:</h5>}
{<h5>{modifiedEvents.enqueuedTime}:</h5>}
<pre>{JSON.stringify(modifiedEvents, undefined, JSON_SPACES)}</pre>
</article>
);
@ -342,7 +342,7 @@ export const DeviceEvents: React.FC = () => {
return(
<div className="col-sm2">
<Label aria-label={t(ResourceKeys.deviceEvents.columns.timestamp)}>
{enqueuedTime && parseDateTimeString(enqueuedTime)}
{enqueuedTime}
</Label>
</div>
);
@ -475,10 +475,11 @@ export const DeviceEvents: React.FC = () => {
);
};
const fetchData = () => {
const fetchData = (startListeners: boolean) => () => {
let parameters: MonitorEventsParameters = {
consumerGroup,
deviceId,
startListeners,
startTime
};