1
0
Форкнуть 0

Merge pull request #18 from iot-for-all/custom-transform-adapter

JSON to JSON telemetry transformation adapter
This commit is contained in:
peolivei2 2021-05-13 17:58:31 -07:00 коммит произвёл GitHub
Родитель 2ee04e2892 8ccf088d26
Коммит 90dd5bbeab
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
23 изменённых файлов: 3530 добавлений и 0 удалений

Просмотреть файл

@ -384,6 +384,13 @@ folder we provide an example of a custom adapter written in TypeScript that forw
This adapter uses a client automatically generated with [AutoRest](https://github.com/Azure/autorest) using the Bridge swagger available
under `Docs/swagger.json`. The example also contains the necessary ARM template to deploy it as a sidecar with the Bridge.
For scenarios that require JSON to JSON transformation of telemetry payloads and changes to the main telemetry API
(e.g., exposing different telemetry endpoints, accepting the device Id and API key in different formats), we provide a
parametrized adapter under [Samples/custom-transform-adapter](https://github.com/iot-for-all/iotc-device-bridge/tree/main/Samples/custom-transform-adapter)
that requires no additional code. This adapter can be customized through a simple configuration JSON file. This component might
be useful, for instance, in a scenario where device payloads can't be modified in the source and can only be sent to a fixed URL
(i.e., that doesn't include a variable parameter, such as device Id).
We also provide an example of how to deploy multiple adapters (`Samples/MultipleAdapterDeployment`). In this setup,
each adapter is deployed as a separate container and requests are routed based on the path.

2
Samples/custom-transform-adapter/.gitignore поставляемый Normal file
Просмотреть файл

@ -0,0 +1,2 @@
/bin/
__debug_bin

Просмотреть файл

@ -0,0 +1,12 @@
# golang:1.16-alpine
FROM golang@sha256:49c07aa83790aca732250c2258b5912659df31b6bfa2ab428661bc66833769e1
RUN mkdir /app
ADD . /app
WORKDIR /app
ENV PORT=3000
RUN go build -o main .
EXPOSE $PORT
CMD ["/app/main"]

Просмотреть файл

@ -0,0 +1,209 @@
# JSON to JSON telemetry transformation - parametrized adapter
This adapter can be used to extend the Device Bridge API (both parameters and data format) without the need to write custom code.
The code, written in Go, is deployed as a side-car container alongside the Bridge and is configured through a route definition file.
The configuration can specify a custom [jq](https://stedolan.github.io/jq/) query to transform telemetry messages.
All telemetry messages received by the adapter are transformed and forwarded to the Bridge.
![Demo](assets/demo.gif "demo")
- [JSON to JSON telemetry transformation - parametrized adapter](#json-to-json-telemetry-transformation---parametrized-adapter)
* [Deployment](#deployment)
+ [API surface](#api-surface)
+ [Uploading configuration file](#uploading-configuration-file)
+ [Logs](#logs)
* [Configuration](#configuration)
+ [Route parameters](#route-parameters)
- [`path`](#-path-)
- [`transform`](#-transform-)
- [`deviceIdPathParam`](#-deviceidpathparam-)
- [`deviceIdBodyQuery`](#-deviceidbodyquery-)
- [`authHeader`](#-authheader-)
- [`authQueryParam`](#-authqueryparam-)
+ [Example](#example)
## Deployment
To deploy, build the image in this directory and push to your registry. Then use the template below to deploy the solution. This template is a
simple extension of the one found in the root of this repository (a complete definition of the parameters can be found [here](https://github.com/iot-for-all/iotc-device-bridge#3---deployment-parameters).
This will deploy the main Bridge container as well as the adapter container (the parameter `adapter-image` is the name of the adapter image to pull from the registry).
[![Deploy to Azure](http://azuredeploy.net/deploybutton.png)](https://portal.azure.com/#create/Microsoft.Template/uri/https%3A%2F%2Fraw.githubusercontent.com%2Fiot-for-all%2Fiotc-device-bridge%2Fmain%2FSamples%2Fcustom-transform-adapter%2Fazuredeploy.json)
### API surface
The deployment will forward all requests to `/devices/*` and `/health` directly to the Bridge. All other requests will be routed to the adapter for transformation.
### Uploading configuration file
A storage account is provisioned with every instance of the Device Bridge. This account will be in the same resource group and contains a File Share named `bridge`.
By default, the adapter will look for a `config.json` configuration file in the `bridge` File Share. The example command below uploads a configuration file to the Bridge
storage account. The format of the configuration file is explained in the next sections.
`az storage file upload --account-name <account-name> --path ./config.json --share-name bridge --source <path-to-local-config-file>`
Once the configuration file has been uploaded, restart the Bridge instance so the new configuration can be applied. The container logs will display which routes are being configured.
### Logs
The adapter logs will be published to the same Log Analytics Workspace and the Bridge.
## Configuration
A configuration file must be in JSON format and have the format below. Each entry of the `d2cMessages` array specifies
a route that will receive `POST` requests with telemetry messages.
```
{
"d2cMessages": [
{
// Parameters for route 1
},
{
// Parameters for route 2
},
// Other routes
]
}
```
### Route parameters
The following route configuration parameters are available:
#### `path`
Path filter for requests that will be handled by this route. For instance, `"path": "/message"` will handle all `POST` requests made to `/message`.
A path definition can have parameters. For instance `"path": "/telemetry/{id}"` defined a path parameter `id` and will handle any requests that
start with `/telemetry/`.
#### `transform`
[jq](https://stedolan.github.io/jq/) query that defines how request bodies received by this route will be transformed before being forwarded to
the Bridge. Transformations take the request body as input and must output a JSON object that meets the the Device Bridge
[telemetry body format](https://github.com/iot-for-all/iotc-device-bridge#device-to-cloud-messages).
For instance, the transformation `"transform": "{ data: . }"` will convert the following request body:
```json
{
"temperature": 21
}
```
into:
```json
{
"data": {
"temperature": 21
}
}
```
Which is a valid telemetry body format for the Bridge.
> NOTE: if a transform is not specified, the route will pass the request body to the Device Bridge _as is_.
#### `transformFile`
Similar to `tranform`, but specifies the path to the file that contains the jq query. The query file must placed in the same location as
the `config.json`, in the `bridge` File Share of the Storage Account provisioned with the Bridge.
#### `deviceIdPathParam`
Specifies the name of the path parameter the will contain the device Id. For instance, if we have a route with `"path": "/telemetry/{id}"`
and a `"deviceIdPathParam": "id"`, a `POST` request to `/telemetry/my-device` will result in the telemetry being sent on behalf of device `my-device`.
#### `deviceIdBodyQuery`
Defines a jq query that will be used to pick the device Id from the request body. This query must generate a string as output.
For instance, the query `"deviceIdBodyQuery": ".device.id"` will pick the `my-device` Id from the following request body:
```json
{
"device": {
"id": "my-device"
}
}
```
#### `authHeader`
Specifies the name of the custom header that will contain the API key used to authenticate with the Device Bridge (specified during deployment).
#### `authQueryParam`
Name of the query parameter that contains the Device Bridge API key for authentication.
### Example
The following example demonstrates the configuration parameters above and how they affect the behavior of each route:
```json
{
"d2cMessages": [{
"path": "/model1/telemetry",
"transform": "{ data: .reports | map( { (.name | tostring): .value } ) | add, properties: { seq_id: .originator.seq_id | tostring }, componentName: .originator.board_model, creationTimeUtc: .originator.time }",
"deviceIdBodyQuery": ".originator.hw_serial | tostring",
"authQueryParam": "key"
},
{
"path": "/{device_id}/telemetry",
"deviceIdPathParam": "device_id",
"authHeader": "Api-Key"
}]
}
```
The first route will convert the following HTTP request:
```json
POST /mode1/telemetry?key=<my-api-key>
{
"reports": [
{
"name": "temperature",
"value": 21
},
{
"name": "humidity",
"value": 34.2
}
],
"originator": {
"hw_serial": 218009,
"board_model": "Smart_Sensor",
"seq_id": 117340065,
"time": "2019-09-22T12:42:31Z"
}
}
```
Into the following telemetry request to the Device Bridge:
```json
x-api-key: <my-api-key>
POST /devices/218009/messages/events
{
"data": {
"temperature": 21,
"humidity": 34.2
},
"properties": {
"seq_id": "117340065"
},
"componentName": "Smart_Sensor",
"creationTimeUtc": "2019-09-22T12:42:31Z"
}
```
The second route, will convert the following request:
```json
Api-Key: <my-api-key>
POST /my-device-1/telemetry
{
"data": {
"speed": 14.2
}
}
```
Into the following request to the Device Bridge (since the transform is not defined, the route passes the body _as is_):
```json
x-api-key: <my-api-key>
POST /devices/my-device-1/messages/events
{
"data": {
"speed": 14.2
}
}
```

Двоичные данные
Samples/custom-transform-adapter/assets/demo.gif Normal file

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 92 MiB

Просмотреть файл

@ -0,0 +1,575 @@
{
"$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentTemplate.json#",
"contentVersion": "1.0.0.0",
"parameters": {
"bridge-name": {
"type": "string",
"metadata": {
"description": "The name of the device bridge. Also will be used in the url."
}
},
"iotc-dps-sas-key": {
"type": "securestring",
"metadata": {
"description": "DPS sas key for provisioning devices and sending data. Retrieved from iot central."
}
},
"iotc-id-scope": {
"type": "string",
"metadata": {
"description": "ID Scope provisioning devices and sending data. Retrieved from iot central."
}
},
"api-key": {
"type": "securestring",
"metadata": {
"description": "Api key used to validate requests to IoTC device bridge."
}
},
"sql-username": {
"type": "string",
"metadata": {
"description": "Username for the sql server provisioned."
}
},
"sql-password": {
"type": "securestring",
"metadata": {
"description": "Password for the sql server provisioned."
}
},
"log-analytics-workspace-id": {
"type": "string",
"metadata": {
"description": "Log Analytics workspace id for log storage."
}
},
"log-analytics-workspace-key": {
"type": "securestring",
"metadata": {
"description": "Log Analytics workspace key for log storage."
}
},
"bridge-image": {
"type": "string",
"metadata": {
"description": "Docker image to be deployed for the core and setup modules."
}
},
"adapter-image": {
"type": "string",
"metadata": {
"description": "Docker image to be deployed for the adapter module."
}
},
"acr-server": {
"type": "string",
"metadata": {
"description": "Private ACR server to pull the image from."
}
},
"acr-username": {
"type": "string",
"metadata": {
"description": "Username of the private ACR."
}
},
"acr-password": {
"type": "securestring",
"metadata": {
"description": "Password of the private ACR."
}
}
},
"variables": {
"setupContainerGroupsName": "[concat('iotc-container-groups-setup-', uniqueString(resourceGroup().id))]",
"containerGroupsName": "[concat('iotc-container-groups-', uniqueString(resourceGroup().id))]",
"bridgeContainerName": "[concat('iotc-bridge-container-', uniqueString(resourceGroup().id))]",
"keyvaultName": "[concat('iotc-kv-', uniqueString(resourceGroup().id))]",
"databaseName": "[concat('iotc-db-', uniqueString(resourceGroup().id))]",
"sqlServerName": "[concat('iotc-sql-', uniqueString(resourceGroup().id))]",
"storageAccountName": "[concat('iotcsa', uniqueString(resourceGroup().id))]",
"bridge-url": "[concat(parameters('bridge-name'), '.', resourceGroup().location, '.azurecontainer.io')]",
"reverse-proxy-bridge-routes": "reverse_proxy /devices/* localhost:5001",
"reverse-proxy-health-route": "reverse_proxy /health localhost:5001",
"reverse-proxy-adapter-catch-all": "reverse_proxy * localhost:3000",
"caddy-config": "[concat('printf \"', variables('bridge-url'), '\n{\n', variables('reverse-proxy-bridge-routes'), '\n', variables('reverse-proxy-health-route'), '\n', variables('reverse-proxy-adapter-catch-all'), '\n}\n\"')]"
},
"resources": [
{
"type": "Microsoft.Storage/storageAccounts",
"apiVersion": "2019-06-01",
"name": "[variables('storageAccountName')]",
"location": "[resourceGroup().location]",
"kind": "StorageV2",
"sku": {
"name": "Standard_LRS",
"tier": "Standard"
},
"resources": [
{
"type": "fileServices/shares",
"apiVersion": "2019-06-01",
"name": "/default/bridge",
"dependsOn": [
"[resourceId('Microsoft.Storage/storageAccounts', variables('storageAccountName'))]"
]
}
]
},
{
"type": "Microsoft.Sql/servers",
"apiVersion": "2019-06-01-preview",
"name": "[variables('sqlServerName')]",
"location": "[resourceGroup().location]",
"tags": {},
"kind": "v12.0",
"identity": {
"type": "SystemAssigned"
},
"properties": {
"administratorLogin": "[parameters('sql-username')]",
"administratorLoginPassword": "[parameters('sql-password')]",
"version": "12.0",
"publicNetworkAccess": "Enabled"
},
"resources": [
{
"type": "firewallrules",
"name": "AllowAllWindowsAzureIps",
"apiVersion": "2019-06-01-preview",
"location": "[resourceGroup().location]",
"dependsOn": [
"[resourceId('Microsoft.Sql/servers', concat(variables('sqlServerName')))]"
],
"properties": {
"endIpAddress": "0.0.0.0",
"startIpAddress": "0.0.0.0"
}
},
{
"type": "databases",
"apiVersion": "2019-06-01-preview",
"name": "[variables('databaseName')]",
"location": "[resourceGroup().location]",
"dependsOn": [
"[resourceId('Microsoft.Sql/servers', variables('sqlServerName'))]"
],
"sku": {
"name": "Basic",
"tier": "Basic",
"capacity": 5
},
"kind": "v12.0,user",
"properties": {
"collation": "SQL_Latin1_General_CP1_CI_AS",
"maxSizeBytes": 2147483648,
"catalogCollation": "SQL_Latin1_General_CP1_CI_AS",
"zoneRedundant": false,
"readScale": "Disabled",
"storageAccountType": "GRS"
}
}
]
},
{
"type": "Microsoft.ContainerInstance/containerGroups",
"apiVersion": "2019-12-01",
"name": "[variables('setupContainerGroupsName')]",
"location": "[resourceGroup().location]",
"dependsOn": [
"[resourceId('Microsoft.Sql/servers/databases', variables('sqlServerName'), variables('databaseName'))]"
],
"identity": {
"type": "SystemAssigned"
},
"properties": {
"sku": "Standard",
"containers": [
{
"name": "bridge-setup",
"properties": {
"image": "[parameters('bridge-image')]",
"environmentVariables": [
{
"name": "KV_URL",
"value": "[concat('https://', variables('keyvaultName'), '.vault.azure.net/')]"
}
],
"command": [
"dotnet",
"DeviceBridge.dll",
"--setup"
],
"resources": {
"requests": {
"memoryInGB": 1,
"cpu": 1
}
}
}
}
],
"restartPolicy": "OnFailure",
"imageRegistryCredentials": [
{
"server": "[parameters('acr-server')]",
"username": "[parameters('acr-username')]",
"password": "[parameters('acr-password')]"
}
],
"diagnostics" : {
"logAnalytics": {
"workspaceId": "[parameters('log-analytics-workspace-id')]",
"workspaceKey": "[parameters('log-analytics-workspace-key')]"
}
},
"osType": "Linux"
}
},
{
"type": "Microsoft.ContainerInstance/containerGroups",
"apiVersion": "2019-12-01",
"name": "[variables('containerGroupsName')]",
"location": "[resourceGroup().location]",
"dependsOn": [
"[resourceId('Microsoft.Storage/storageAccounts', variables('storageAccountName'))]",
"[resourceId('Microsoft.Sql/servers/databases', variables('sqlServerName'), variables('databaseName'))]"
],
"identity": {
"type": "SystemAssigned"
},
"properties": {
"sku": "Standard",
"containers": [
{
"name": "adapter",
"properties": {
"image": "[parameters('adapter-image')]",
"ports": [
{
"port": 3000
}
],
"environmentVariables": [
{
"name": "PORT",
"value": "3000"
},
{
"name": "BRIDGE_URL",
"value": "http://localhost:5001"
},
{
"name": "CONFIG_PATH",
"value": "/adapter-config"
},
{
"name": "CONFIG_FILE",
"value": "config.json"
}
],
"resources": {
"requests": {
"memoryInGB": 0.5,
"cpu": 0.5
}
},
"volumeMounts": [
{
"name": "adapter-config",
"mountPath": "/adapter-config"
}
]
}
},
{
"name": "[variables('bridgeContainerName')]",
"properties": {
"image": "[parameters('bridge-image')]",
"ports": [
{
"port": 5001
}
],
"environmentVariables": [
{
"name": "MAX_POOL_SIZE",
"value": "50"
},
{
"name": "DEVICE_CONNECTION_BATCH_SIZE",
"value": "150"
},
{
"name": "DEVICE_CONNECTION_BATCH_INTERVAL_MS",
"value": "1000"
},
{
"name": "KV_URL",
"value": "[concat('https://', variables('keyvaultName'), '.vault.azure.net/')]"
},
{
"name": "PORT",
"value": "5001"
}
],
"resources": {
"requests": {
"memoryInGB": 1.5,
"cpu": 0.8
}
}
}
},
{
"name": "caddy-ssl-server",
"properties": {
"image": "caddy@sha256:d0b43ebda8fd47409cec98d5f3c3b4c60bfc6bca35338313c002dc64c2283055",
"command": [
"/bin/sh",
"-c",
"[concat(variables('caddy-config'), ' | caddy run --config - --adapter caddyfile 2>> /tmp/caddy.log')]"
],
"ports": [
{
"protocol": "TCP",
"port": 443
},
{
"protocol": "TCP",
"port": 80
}
],
"environmentVariables": [],
"resources": {
"requests": {
"memoryInGB": 0.5,
"cpu": 0.2
}
},
"volumeMounts": [
{
"name": "data",
"mountPath": "/data"
},
{
"name": "config",
"mountPath": "/config"
}
]
}
}
],
"volumes": [
{
"name": "data",
"azureFile": {
"shareName": "bridge",
"storageAccountName": "[variables('storageAccountName')]",
"storageAccountKey": "[listKeys(resourceId('Microsoft.Storage/storageAccounts', variables('storageAccountName')), '2019-06-01').keys[0].value]"
}
},
{
"name": "config",
"azureFile": {
"shareName": "bridge",
"storageAccountName": "[variables('storageAccountName')]",
"storageAccountKey": "[listKeys(resourceId('Microsoft.Storage/storageAccounts', variables('storageAccountName')), '2019-06-01').keys[0].value]"
}
},
{
"name": "adapter-config",
"azureFile": {
"shareName": "bridge",
"storageAccountName": "[variables('storageAccountName')]",
"storageAccountKey": "[listKeys(resourceId('Microsoft.Storage/storageAccounts', variables('storageAccountName')), '2019-06-01').keys[0].value]"
}
}
],
"initContainers": [
],
"restartPolicy": "Always",
"imageRegistryCredentials": [
{
"server": "[parameters('acr-server')]",
"username": "[parameters('acr-username')]",
"password": "[parameters('acr-password')]"
}
],
"diagnostics" : {
"logAnalytics": {
"workspaceId": "[parameters('log-analytics-workspace-id')]",
"workspaceKey": "[parameters('log-analytics-workspace-key')]"
}
},
"ipAddress": {
"ports": [
{
"protocol": "TCP",
"port": 443
}
],
"type": "Public",
"dnsNameLabel": "[parameters('bridge-name')]"
},
"osType": "Linux"
}
},
{
"type": "Microsoft.KeyVault/vaults",
"apiVersion": "2016-10-01",
"name": "[variables('keyvaultName')]",
"location": "[resourceGroup().location]",
"dependsOn": [
"[resourceId('Microsoft.ContainerInstance/containerGroups', variables('containerGroupsName'))]",
"[resourceId('Microsoft.ContainerInstance/containerGroups', variables('setupContainerGroupsName'))]"
],
"properties": {
"sku": {
"family": "A",
"name": "Standard"
},
"tenantId": "[subscription().tenantId]",
"enabledForDeployment": true,
"enabledForDiskEncryption": true,
"enabledForTemplateDeployment": true,
"accessPolicies": [
{
"tenantId": "[subscription().tenantid]",
"objectId": "[reference(resourceId('Microsoft.ContainerInstance/containerGroups', variables('containerGroupsName')),'2019-12-01', 'full').identity.principalId]",
"permissions": {
"keys": [],
"secrets": [
"get",
"list"
],
"certificates": []
}
},
{
"tenantId": "[subscription().tenantid]",
"objectId": "[reference(resourceId('Microsoft.ContainerInstance/containerGroups', variables('setupContainerGroupsName')),'2019-12-01', 'full').identity.principalId]",
"permissions": {
"keys": [],
"secrets": [
"get",
"list",
"set"
],
"certificates": []
}
}
]
},
"resources": [
{
"type": "secrets",
"apiVersion": "2016-10-01",
"name": "apiKey",
"location": "[resourceGroup().location]",
"dependsOn": [
"[resourceId('Microsoft.KeyVault/vaults', variables('keyvaultName'))]"
],
"properties": {
"value": "[parameters('api-key')]",
"attributes": {
"enabled": true
}
}
},
{
"type": "secrets",
"apiVersion": "2016-10-01",
"name": "iotc-sas-key",
"location": "[resourceGroup().location]",
"dependsOn": [
"[resourceId('Microsoft.KeyVault/vaults', variables('keyvaultName'))]"
],
"properties": {
"value": "[parameters('iotc-dps-sas-key')]",
"attributes": {
"enabled": true
}
}
},
{
"type": "secrets",
"apiVersion": "2016-10-01",
"name": "iotc-id-scope",
"location": "[resourceGroup().location]",
"dependsOn": [
"[resourceId('Microsoft.KeyVault/vaults', variables('keyvaultName'))]"
],
"properties": {
"value": "[parameters('iotc-id-scope')]",
"attributes": {
"enabled": true
}
}
},
{
"type": "secrets",
"apiVersion": "2016-10-01",
"name": "sql-username",
"location": "[resourceGroup().location]",
"dependsOn": [
"[resourceId('Microsoft.KeyVault/vaults', variables('keyvaultName'))]"
],
"properties": {
"value": "[parameters('sql-username')]",
"attributes": {
"enabled": true
}
}
},
{
"type": "secrets",
"apiVersion": "2016-10-01",
"name": "sql-password",
"location": "[resourceGroup().location]",
"dependsOn": [
"[resourceId('Microsoft.KeyVault/vaults', variables('keyvaultName'))]"
],
"properties": {
"value": "[parameters('sql-password')]",
"attributes": {
"enabled": true
}
}
},
{
"type": "secrets",
"apiVersion": "2016-10-01",
"name": "sql-server",
"location": "[resourceGroup().location]",
"dependsOn": [
"[resourceId('Microsoft.KeyVault/vaults', variables('keyvaultName'))]",
"[resourceId('Microsoft.Sql/servers', concat(variables('sqlServerName')))]"
],
"properties": {
"value": "[reference(variables('sqlServerName')).fullyQualifiedDomainName]",
"attributes": {
"enabled": true
}
}
},
{
"type": "secrets",
"apiVersion": "2016-10-01",
"name": "sql-database",
"location": "[resourceGroup().location]",
"dependsOn": [
"[resourceId('Microsoft.KeyVault/vaults', variables('keyvaultName'))]"
],
"properties": {
"value": "[variables('databaseName')]",
"attributes": {
"enabled": true
}
}
}
]
}
]
}

Просмотреть файл

@ -0,0 +1,116 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
package main
import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"path/filepath"
)
// Config represents an adapter configuration (with routes, transforms, etc.)
type Config struct {
D2CMessages []D2CMessage
}
type D2CMessage struct {
Path string // Path filter for requests that will be routed to this transform
Transform string // jq query to tranform the request body
DeviceIdPathParam string // Path parameter containing device Id
DeviceIdBodyQuery string // jq query to pick the device Id from the request body
AuthHeader string // Header containing auth key
AuthQueryParam string // Query parameter containing auth key
}
// ConfigRaw represents the input config file, before processing.
type ConfigRaw struct {
D2CMessages []D2CMessageRaw `json:"d2cMessages"`
}
type D2CMessageRaw struct {
Path string `json:"path"`
Transform string `json:"transform"`
TransformFile string `json:"transformFile"`
DeviceIdPathParam string `json:"deviceIdPathParam"`
DeviceIdBodyQuery string `json:"deviceIdBodyQuery"`
AuthHeader string `json:"authHeader"`
AuthQueryParam string `json:"authQueryParam"`
}
// LoadConfig loads, parses, and validates an adapter config from a file.
func LoadConfig(configPath string, configFileName string) (*Config, error) {
if configPath == "" {
return nil, errors.New("transform-adapter: missing config path")
}
if configFileName == "" {
return nil, errors.New("transform-adapter: missing config file")
}
configFile, err := ioutil.ReadFile(filepath.Join(configPath, configFileName))
if err != nil {
return nil, err
}
var configRaw ConfigRaw
if err = json.Unmarshal(configFile, &configRaw); err != nil {
return nil, err
}
if err := validate(&configRaw); err != nil {
return nil, err
}
config := Config{D2CMessages: make([]D2CMessage, len(configRaw.D2CMessages))}
// Generate processed config
for i, message := range configRaw.D2CMessages {
// Resolve transform files
if message.TransformFile != "" {
transformFileContent, err := ioutil.ReadFile(filepath.Join(configPath, message.TransformFile))
if err != nil {
return nil, err
}
message.Transform = string(transformFileContent)
}
config.D2CMessages[i] = D2CMessage{
Path: message.Path,
Transform: message.Transform,
DeviceIdPathParam: message.DeviceIdPathParam,
DeviceIdBodyQuery: message.DeviceIdBodyQuery,
AuthHeader: message.AuthHeader,
AuthQueryParam: message.AuthQueryParam,
}
}
return &config, nil
}
func validate(config *ConfigRaw) error {
for _, message := range config.D2CMessages {
if message.Path == "" {
return errors.New("transform-adapter: path missing in D2C message definition")
}
if message.Transform != "" && message.TransformFile != "" {
return fmt.Errorf("transform-adapter: either transform or transformFile may be defined, not both, in D2C message definition %s", message.Path)
}
if (message.AuthHeader == "" && message.AuthQueryParam == "") || (message.AuthHeader != "" && message.AuthQueryParam != "") {
return fmt.Errorf("transform-adapter: either authHeader or authQueryParam must be defined in D2C message definition %s", message.Path)
}
if (message.DeviceIdPathParam == "" && message.DeviceIdBodyQuery == "") || (message.DeviceIdPathParam != "" && message.DeviceIdBodyQuery != "") {
return fmt.Errorf("transform-adapter: either deviceIdPathParam or deviceIdBodyQuery must be defined in D2C message definition %s", message.Path)
}
}
return nil
}

Просмотреть файл

@ -0,0 +1,2 @@
{
"d2cMessages": [{

Просмотреть файл

@ -0,0 +1,18 @@
{
"d2cMessages": [{
"path": "/{id}/cde",
"deviceIdPathParam": "id",
"authHeader": "key"
}, {
"path": "/message",
"transform": "{ data: .dd, properties, componentName, creationTimeUtc }",
"DeviceIdBodyQuery": ".Device.Id",
"authQueryParam": "apk"
},
{
"path": "/telemetry/{deviceId}",
"deviceIdPathParam": "deviceId",
"authHeader": "api-key",
"transformFile": "./transform_mock.jq"
}]
}

Просмотреть файл

@ -0,0 +1,52 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
package main
import (
"fmt"
"os"
"testing"
"github.com/stretchr/testify/assert"
)
func TestLoadConfigFileNotFound(t *testing.T) {
_, err := LoadConfig(".", "config_not_found.json")
assert.EqualError(t, err, "open config_not_found.json: no such file or directory")
}
func TestLoadConfigMalformedJSON(t *testing.T) {
_, err := LoadConfig(".", "config_malformed_mock.json")
assert.EqualError(t, err, "unexpected end of JSON input")
}
func ExampleLoadConfigSuccess() {
currentPath, _ := os.Getwd()
result, _ := LoadConfig(currentPath, "config_mock.json")
fmt.Println(result)
// Output: &{[{/{id}/cde id key } {/message { data: .dd, properties, componentName, creationTimeUtc } .Device.Id apk} {/telemetry/{deviceId} {
// data: .obj
// | map( { (.name | tostring): .value } )
// | add
// } deviceId api-key }]}
}
func TestValidatePathMissing(t *testing.T) {
err := validate(&ConfigRaw{D2CMessages: []D2CMessageRaw{{Transform: "."}}})
assert.EqualError(t, err, "transform-adapter: path missing in D2C message definition")
}
func TestValidateMultipleTransforms(t *testing.T) {
err := validate(&ConfigRaw{D2CMessages: []D2CMessageRaw{{Path: "/", Transform: ".", TransformFile: "./transform.jq"}}})
assert.EqualError(t, err, "transform-adapter: either transform or transformFile may be defined, not both, in D2C message definition /")
}
func TestValidateDeviceIdParamMissing(t *testing.T) {
err := validate(&ConfigRaw{D2CMessages: []D2CMessageRaw{{Path: "/", AuthHeader: "key"}}})
assert.EqualError(t, err, "transform-adapter: either deviceIdPathParam or deviceIdBodyQuery must be defined in D2C message definition /")
}
func TestValidateAuthMissing(t *testing.T) {
err := validate(&ConfigRaw{D2CMessages: []D2CMessageRaw{{Path: "/"}}})
assert.EqualError(t, err, "transform-adapter: either authHeader or authQueryParam must be defined in D2C message definition /")
}

Просмотреть файл

@ -0,0 +1,16 @@
module github.com/iot-for-all/iotc-device-bridge/custom-transform-adapter
go 1.16
require (
github.com/Azure/go-autorest/autorest v0.11.18
github.com/Azure/go-autorest/autorest/date v0.3.0
github.com/Azure/go-autorest/autorest/validation v0.3.1
github.com/Azure/go-autorest/tracing v0.6.0
github.com/google/uuid v1.2.0 // indirect
github.com/gorilla/mux v1.8.0
github.com/itchyny/gojq v0.12.2
github.com/mitchellh/mapstructure v1.4.1 // indirect
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.3.0
)

Просмотреть файл

@ -0,0 +1,58 @@
github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs=
github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
github.com/Azure/go-autorest/autorest v0.11.18 h1:90Y4srNYrwOtAgVo3ndrQkTYn6kf1Eg/AjTFJ8Is2aM=
github.com/Azure/go-autorest/autorest v0.11.18/go.mod h1:dSiJPy22c3u0OtOKDNttNgqpNFY/GeWa7GH/Pz56QRA=
github.com/Azure/go-autorest/autorest/adal v0.9.13 h1:Mp5hbtOePIzM8pJVRa3YLrWWmZtoxRXqUEzCfJt3+/Q=
github.com/Azure/go-autorest/autorest/adal v0.9.13/go.mod h1:W/MM4U6nLxnIskrw4UwWzlHfGjwUS50aOsc/I3yuU8M=
github.com/Azure/go-autorest/autorest/date v0.3.0 h1:7gUk1U5M/CQbp9WoqinNzJar+8KY+LPI6wiWrP/myHw=
github.com/Azure/go-autorest/autorest/date v0.3.0/go.mod h1:BI0uouVdmngYNUzGWeSYnokU+TrmwEsOqdt8Y6sso74=
github.com/Azure/go-autorest/autorest/mocks v0.4.1 h1:K0laFcLE6VLTOwNgSxaGbUcLPuGXlNkbVvq4cW4nIHk=
github.com/Azure/go-autorest/autorest/mocks v0.4.1/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k=
github.com/Azure/go-autorest/autorest/validation v0.3.1 h1:AgyqjAd94fwNAoTjl/WQXg4VvFeRFpO+UhNyRXqF1ac=
github.com/Azure/go-autorest/autorest/validation v0.3.1/go.mod h1:yhLgjC0Wda5DYXl6JAsWyUe4KVNffhoDhG0zVzUMo3E=
github.com/Azure/go-autorest/logger v0.2.1 h1:IG7i4p/mDa2Ce4TRyAO8IHnVhAVF3RFU+ZtXWSmf4Tg=
github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZmbF5NWuPV8+WeEW8=
github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo=
github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/itchyny/go-flags v1.5.0/go.mod h1:lenkYuCobuxLBAd/HGFE4LRoW8D3B6iXRQfWYJ+MNbA=
github.com/itchyny/gojq v0.12.2 h1:TxhFjk1w7Vnb0SwQPeG4FxTC98O4Es+x/mPaD5Azgfs=
github.com/itchyny/gojq v0.12.2/go.mod h1:mi4PdXSlFllHyByM68JKUrbiArtEdEnNEmjbwxcQKAg=
github.com/itchyny/timefmt-go v0.1.2 h1:q0Xa4P5it6K6D7ISsbLAMwx1PnWlixDcJL6/sFs93Hs=
github.com/itchyny/timefmt-go v0.1.2/go.mod h1:0osSSCQSASBJMsIZnhAaF1C2fCBTJZXrnj37mG8/c+A=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mitchellh/mapstructure v1.4.1 h1:CpVNEelQCZBooIPDn+AR3NpivK/TIKU8bDxdASFVQag=
github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 h1:hb9wdF1z5waM+dSIICn1l0DkLVDT3hqhhQsDNUmHPRE=
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210301091718-77cc2087c03b h1:kHlr0tATeLRMEiZJu5CknOw/E8V6h69sXXQFGoPtjcc=
golang.org/x/sys v0.0.0-20210301091718-77cc2087c03b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

Просмотреть файл

@ -0,0 +1,35 @@
package bridgeapi
// Code generated by Microsoft (R) AutoRest Code Generator.
// Changes may cause incorrect behavior and will be lost if the code is regenerated.
import (
"context"
"github.com/Azure/go-autorest/autorest"
"github.com/iot-for-all/iotc-device-bridge/custom-transform-adapter/lib/bridge"
)
// BaseClientAPI contains the set of methods on the BaseClient type.
type BaseClientAPI interface {
CreateOrUpdateC2DMessageSubscription(ctx context.Context, deviceID string, body *bridge.SubscriptionCreateOrUpdateBody) (result bridge.DeviceSubscriptionWithStatus, err error)
CreateOrUpdateConnectionStatusSubscription(ctx context.Context, deviceID string, body *bridge.SubscriptionCreateOrUpdateBody) (result bridge.DeviceSubscription, err error)
CreateOrUpdateDesiredPropertiesSubscription(ctx context.Context, deviceID string, body *bridge.SubscriptionCreateOrUpdateBody) (result bridge.DeviceSubscriptionWithStatus, err error)
CreateOrUpdateMethodsSubscription(ctx context.Context, deviceID string, body *bridge.SubscriptionCreateOrUpdateBody) (result bridge.DeviceSubscriptionWithStatus, err error)
DeleteC2DMessageSubscription(ctx context.Context, deviceID string) (result autorest.Response, err error)
DeleteConnectionStatusSubscription(ctx context.Context, deviceID string) (result autorest.Response, err error)
DeleteDesiredPropertiesSubscription(ctx context.Context, deviceID string) (result autorest.Response, err error)
DeleteMethodsSubscription(ctx context.Context, deviceID string) (result autorest.Response, err error)
GetC2DMessageSubscription(ctx context.Context, deviceID string) (result bridge.DeviceSubscriptionWithStatus, err error)
GetConnectionStatusSubscription(ctx context.Context, deviceID string) (result bridge.DeviceSubscription, err error)
GetCurrentConnectionStatus(ctx context.Context, deviceID string) (result bridge.DeviceStatusResponseBody, err error)
GetDesiredPropertiesSubscription(ctx context.Context, deviceID string) (result bridge.DeviceSubscriptionWithStatus, err error)
GetMethodsSubscription(ctx context.Context, deviceID string) (result bridge.DeviceSubscriptionWithStatus, err error)
GetTwin(ctx context.Context, deviceID string) (result bridge.GetTwinOKResponse, err error)
Register(ctx context.Context, deviceID string, body *bridge.RegistrationBody) (result autorest.Response, err error)
Resync(ctx context.Context, deviceID string) (result autorest.Response, err error)
SendMessage(ctx context.Context, deviceID string, body *bridge.MessageBody) (result autorest.Response, err error)
UpdateReportedProperties(ctx context.Context, deviceID string, body *bridge.ReportedPropertiesPatch) (result autorest.Response, err error)
}
var _ BaseClientAPI = (*bridge.BaseClient)(nil)

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -0,0 +1,107 @@
package bridge
// Code generated by Microsoft (R) AutoRest Code Generator.
// Changes may cause incorrect behavior and will be lost if the code is regenerated.
import (
"encoding/json"
"github.com/Azure/go-autorest/autorest"
"github.com/Azure/go-autorest/autorest/date"
)
// The package's fully qualified name.
const fqdn = "lib/bridge"
// DeviceStatusResponseBody ...
type DeviceStatusResponseBody struct {
autorest.Response `json:"-"`
Status *string `json:"status,omitempty"`
Reason *string `json:"reason,omitempty"`
}
// DeviceSubscription ...
type DeviceSubscription struct {
autorest.Response `json:"-"`
DeviceID *string `json:"deviceId,omitempty"`
SubscriptionType *string `json:"subscriptionType,omitempty"`
CallbackURL *string `json:"callbackUrl,omitempty"`
CreatedAt *date.Time `json:"createdAt,omitempty"`
}
// DeviceSubscriptionWithStatus ...
type DeviceSubscriptionWithStatus struct {
autorest.Response `json:"-"`
DeviceID *string `json:"deviceId,omitempty"`
SubscriptionType *string `json:"subscriptionType,omitempty"`
CallbackURL *string `json:"callbackUrl,omitempty"`
CreatedAt *date.Time `json:"createdAt,omitempty"`
Status *string `json:"status,omitempty"`
}
// GetTwinOKResponse ...
type GetTwinOKResponse struct {
autorest.Response `json:"-"`
Twin *GetTwinOKResponseTwin `json:"twin,omitempty"`
}
// GetTwinOKResponseTwin ...
type GetTwinOKResponseTwin struct {
Properties *GetTwinOKResponseTwinProperties `json:"properties,omitempty"`
}
// GetTwinOKResponseTwinProperties ...
type GetTwinOKResponseTwinProperties struct {
Desired interface{} `json:"desired,omitempty"`
Reported interface{} `json:"reported,omitempty"`
}
// MessageBody ...
type MessageBody struct {
Data map[string]interface{} `json:"data"`
Properties map[string]*string `json:"properties"`
ComponentName *string `json:"componentName,omitempty"`
CreationTimeUtc *date.Time `json:"creationTimeUtc,omitempty"`
}
// MarshalJSON is the custom marshaler for MessageBody.
func (mb MessageBody)MarshalJSON() ([]byte, error){
objectMap := make(map[string]interface{})
if(mb.Data != nil) {
objectMap["data"] = mb.Data
}
if(mb.Properties != nil) {
objectMap["properties"] = mb.Properties
}
if(mb.ComponentName != nil) {
objectMap["componentName"] = mb.ComponentName
}
if(mb.CreationTimeUtc != nil) {
objectMap["creationTimeUtc"] = mb.CreationTimeUtc
}
return json.Marshal(objectMap)
}
// RegistrationBody ...
type RegistrationBody struct {
ModelID *string `json:"modelId,omitempty"`
}
// ReportedPropertiesPatch ...
type ReportedPropertiesPatch struct {
Patch map[string]interface{} `json:"patch"`
}
// MarshalJSON is the custom marshaler for ReportedPropertiesPatch.
func (rpp ReportedPropertiesPatch)MarshalJSON() ([]byte, error){
objectMap := make(map[string]interface{})
if(rpp.Patch != nil) {
objectMap["patch"] = rpp.Patch
}
return json.Marshal(objectMap)
}
// SubscriptionCreateOrUpdateBody ...
type SubscriptionCreateOrUpdateBody struct {
CallbackURL *string `json:"callbackUrl,omitempty"`
}

Просмотреть файл

@ -0,0 +1,16 @@
package bridge
// Code generated by Microsoft (R) AutoRest Code Generator.
// Changes may cause incorrect behavior and will be lost if the code is regenerated.
// UserAgent returns the UserAgent string to use when sending http.Requests.
func UserAgent() string {
return "Azure-SDK-For-Go/" + Version() + " bridge/1.0"
}
// Version returns the semantic version (see http://semver.org) of the client.
func Version() string {
return "0.0.0"
}

Просмотреть файл

@ -0,0 +1,33 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
package main
import (
"os"
log "github.com/sirupsen/logrus"
)
func init() {
log.SetFormatter(&log.JSONFormatter{})
}
func main() {
bridgeUrl := os.Getenv("BRIDGE_URL")
configPath := os.Getenv("CONFIG_PATH")
configFileName := os.Getenv("CONFIG_FILE")
config, err := LoadConfig(configPath, configFileName)
if err != nil {
log.WithField("error", err).Panicf("unable to load config: %s", err)
}
adapter, err := NewAdapter(config, bridgeUrl)
if err != nil {
log.WithField("error", err).Panicf("unable to build adapter server: %s", err)
}
log.Fatal(adapter.ListenAndServe(os.Getenv("PORT")))
}

Просмотреть файл

@ -0,0 +1,14 @@
package main
import (
"io/ioutil"
"os"
"testing"
log "github.com/sirupsen/logrus"
)
func TestMain(m *testing.M) {
log.SetOutput(ioutil.Discard)
os.Exit(m.Run())
}

Просмотреть файл

@ -0,0 +1,299 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
package main
import (
"context"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"math/rand"
"net/http"
"strconv"
"time"
"github.com/Azure/go-autorest/autorest"
"github.com/Azure/go-autorest/autorest/date"
"github.com/google/uuid"
"github.com/gorilla/mux"
"github.com/iot-for-all/iotc-device-bridge/custom-transform-adapter/lib/bridge"
"github.com/mitchellh/mapstructure"
log "github.com/sirupsen/logrus"
)
const maxBodySize = 1024 * 1024 // 1 MiB
type BridgeClient interface {
SetAuthorizer(autorest.Authorizer)
SetRetryAttempts(int)
SendMessage(context.Context, string, *bridge.MessageBody) (autorest.Response, error)
GetBaseURI() string
}
type BridgeClientAutorest struct {
bridge.BaseClient
}
func (client *BridgeClientAutorest) SetAuthorizer(authorizer autorest.Authorizer) {
client.Authorizer = authorizer
}
func (client *BridgeClientAutorest) SetRetryAttempts(retryAttempts int) {
client.RetryAttempts = retryAttempts
}
func (client *BridgeClientAutorest) SendMessage(ctx context.Context, deviceID string, body *bridge.MessageBody) (autorest.Response, error) {
return client.BaseClient.SendMessage(ctx, deviceID, body)
}
func (client *BridgeClientAutorest) GetBaseURI() string {
return client.BaseURI
}
type Adapter struct {
GetBridgeClient func() BridgeClient
Router *mux.Router
Engine *TransformEngine
}
// AugmentedD2CMessage represents a D2C message route definition augmented to include the Id of the cached transform queries.
type AugmentedD2CMessage struct {
D2CMessage
TransformId string
DeviceIdBodyQueryId string
}
// NewAdapter builds a transform adapter for a given configuration.
func NewAdapter(config *Config, bridgeEndpoint string) (*Adapter, error) {
log.Infof("Initializing adapter for Bridge %s", bridgeEndpoint)
if bridgeEndpoint == "" {
return nil, errors.New("transform-adapter: missing Bridge URL")
}
adapter := Adapter{
Engine: NewTransformEngine(),
Router: mux.NewRouter(),
GetBridgeClient: func() BridgeClient {
return &BridgeClientAutorest{bridge.NewWithBaseURI(bridgeEndpoint)}
},
}
for _, message := range config.D2CMessages {
log.Infof("Initializing route %s", message.Path)
augmentedMessage := AugmentedD2CMessage{D2CMessage: message}
// Initialize cache for request body transform.
if message.Transform != "" {
augmentedMessage.TransformId = uuid.New().String()
if err := adapter.Engine.AddTransform(augmentedMessage.TransformId, message.Transform); err != nil {
return nil, fmt.Errorf("transform-adapter: failed to add request body transform for route %s: %s", message.Path, err)
}
} else {
log.Warnf("Empty transform. Route %s will be set as pass-through", message.Path)
}
// Initialize cache for device Id transform.
if message.DeviceIdBodyQuery != "" {
augmentedMessage.DeviceIdBodyQueryId = uuid.New().String()
if err := adapter.Engine.AddTransform(augmentedMessage.DeviceIdBodyQueryId, message.DeviceIdBodyQuery); err != nil {
return nil, fmt.Errorf("transform-adapter: failed to add device Id query transform for route %s: %s", message.Path, err)
}
}
handler := adapter.buildD2CMessageHandler(augmentedMessage)
adapter.Router.HandleFunc(message.Path, withLogging(handler)).Methods("POST")
}
return &adapter, nil
}
func (adapter *Adapter) ListenAndServe(port string) error {
portInt, err := strconv.Atoi(port)
if err != nil {
return fmt.Errorf("invalid port: %s", err)
}
log.Infof("Server listening on port %d", portInt)
return http.ListenAndServe(fmt.Sprintf(":%d", portInt), adapter.Router)
}
// buildD2CMessageHandler builds the HTTP handler for a given C2D route definition.
func (adapter *Adapter) buildD2CMessageHandler(message AugmentedD2CMessage) func(*log.Entry, http.ResponseWriter, *http.Request) {
return func(logger *log.Entry, w http.ResponseWriter, r *http.Request) {
var jsonBody map[string]interface{}
if err := decodeJsonBody(w, r, &jsonBody); err != nil {
respondError(logger, w, http.StatusBadRequest, fmt.Errorf("failed to decode JSON body: %w", err))
return
}
// Execute body transformation if one was provided. If not, the route is pass-through.
var transformedPayload interface{}
if message.TransformId != "" {
var err error
transformedPayload, err = adapter.Engine.Execute(message.TransformId, jsonBody)
if err != nil {
respondError(logger, w, http.StatusBadRequest, fmt.Errorf("payload transformation failed: %w", err))
return
}
} else {
transformedPayload = jsonBody
}
if err := decodeDateTimeField(&transformedPayload, "creationTimeUtc"); err != nil {
respondError(logger, w, http.StatusBadRequest, fmt.Errorf("failed to parse \"creationTimeUtc\": %w", err))
return
}
// Convert transformation output to Autorest typed input.
var bridgePayload bridge.MessageBody
if err := mapstructure.Decode(transformedPayload, &bridgePayload); err != nil {
respondError(logger, w, http.StatusBadRequest, fmt.Errorf("failed to transform payload to expected Device Bridge format: %w", err))
return
}
bridgeClient := adapter.GetBridgeClient()
// Extracts the API key from the query parameter or header.
var apiKey string
if message.AuthQueryParam != "" {
values, ok := r.URL.Query()[message.AuthQueryParam]
if !ok || len(values) < 1 {
respondError(logger, w, http.StatusBadRequest, fmt.Errorf("expected auth query parameter \"%s\" to be defined", message.AuthQueryParam))
return
}
apiKey = values[0]
} else if message.AuthHeader != "" {
apiKey = r.Header.Get(message.AuthHeader)
} else {
respondError(logger, w, http.StatusBadRequest, errors.New("no auth method specified"))
return
}
bridgeClient.SetAuthorizer(autorest.NewAPIKeyAuthorizerWithHeaders(map[string]interface{}{
"x-api-key": apiKey,
}))
var deviceId string
switch {
case message.DeviceIdBodyQueryId != "":
queriedDeviceId, err := adapter.Engine.Execute(message.DeviceIdBodyQueryId, jsonBody)
if err != nil {
respondError(logger, w, http.StatusBadRequest, fmt.Errorf("device Id body query failed: %w", err))
return
}
var ok bool
if deviceId, ok = queriedDeviceId.(string); !ok || deviceId == "" {
respondError(logger, w, http.StatusBadRequest, errors.New("expected result from device Id body query to be string"))
return
}
case message.DeviceIdPathParam != "":
var ok bool
if deviceId, ok = mux.Vars(r)[message.DeviceIdPathParam]; !ok {
respondError(logger, w, http.StatusBadRequest, fmt.Errorf("expected device Id in \"%s\" path parameter", message.DeviceIdPathParam))
return
}
default:
respondError(logger, w, http.StatusBadRequest, errors.New("no device Id specified"))
return
}
bridgeClient.SetRetryAttempts(1) // Don't retry (the Bridge already has internal retries)
if bridgeResponse, err := bridgeClient.SendMessage(r.Context(), deviceId, &bridgePayload); err != nil {
// We return the Bridge status code if we have it
responseStatusCode := http.StatusInternalServerError
if bridgeResponse != (autorest.Response{}) {
responseStatusCode = bridgeResponse.StatusCode
}
respondError(logger, w, responseStatusCode, fmt.Errorf("call to Device Bridge failed: %w", err))
return
}
w.WriteHeader(http.StatusOK)
}
}
// LoggingResponseWriter is an HTTP response writer extended to capture the response status.
type LoggingResponseWriter struct {
http.ResponseWriter
ResponseStatus int
}
func (r *LoggingResponseWriter) WriteHeader(status int) {
r.ResponseStatus = status
r.ResponseWriter.WriteHeader(status)
}
// withLogging wraps a request handler, logging the request, response, and injecting a logger with request context.
func withLogging(handler func(*log.Entry, http.ResponseWriter, *http.Request)) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
logger := log.WithField("request_id", makeShortId())
logger.Infof("HTTP request. Path %s", r.URL.Path)
loggingResponseWriter := &LoggingResponseWriter{ResponseWriter: w}
handler(logger, loggingResponseWriter, r)
duration := time.Since(startTime).String()
logger.Infof("HTTP response. Path %s, status %d, duration %s", r.URL.Path, loggingResponseWriter.ResponseStatus, duration)
}
}
func decodeJsonBody(w http.ResponseWriter, r *http.Request, output *map[string]interface{}) error {
r.Body = http.MaxBytesReader(w, r.Body, maxBodySize)
decoder := json.NewDecoder(r.Body)
return decoder.Decode(output)
}
func respondError(logger *log.Entry, w http.ResponseWriter, statusCode int, err error) {
logger.Error(err.Error())
respondJson(logger, w, statusCode, map[string]string{"error": err.Error()})
}
func respondJson(logger *log.Entry, w http.ResponseWriter, statusCode int, payload interface{}) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(statusCode)
if err := json.NewEncoder(w).Encode(payload); err != nil {
log.WithField("error", err).Errorf("Failed to encode JSON response: %s", err)
}
}
// makeShortId returns a random 8-character string.
func makeShortId() string {
randBytes := make([]byte, 4)
rand.Read(randBytes)
return hex.EncodeToString(randBytes)
}
// decodeDateTimeField converts the specified string field of a JSON map into a date time value, using the Autorest date.Time type.
// The value is decoded in place. Ignores if field is not present in the map.
func decodeDateTimeField(json *interface{}, field string) error {
jsonMap, ok := (*json).(map[string]interface{})
if !ok {
return nil
}
fieldRaw, ok := jsonMap[field]
if !ok || fieldRaw == nil {
return nil
}
fieldStr, ok := fieldRaw.(string)
if !ok {
return fmt.Errorf("if provided, field \"%s\" must be a timestamp string", field)
}
dateTime, err := time.Parse(time.RFC3339, fieldStr)
if err != nil {
return err
}
jsonMap[field] = date.Time{dateTime}
return nil
}

Просмотреть файл

@ -0,0 +1,457 @@
package main
import (
"bytes"
"context"
"errors"
"fmt"
"net/http"
"net/http/httptest"
"testing"
"github.com/Azure/go-autorest/autorest"
"github.com/iot-for-all/iotc-device-bridge/custom-transform-adapter/lib/bridge"
"github.com/stretchr/testify/assert"
)
type BridgeClientMock struct {
LastSendMessageBody *bridge.MessageBody
LastSendMessageDeviceId string
LastAuthorizer autorest.Authorizer
}
func (client *BridgeClientMock) SetAuthorizer(authorizer autorest.Authorizer) {
client.LastAuthorizer = authorizer
}
func (client *BridgeClientMock) SetRetryAttempts(retryAttempts int) {
}
func (client *BridgeClientMock) SendMessage(ctx context.Context, deviceID string, body *bridge.MessageBody) (autorest.Response, error) {
client.LastSendMessageBody = body
client.LastSendMessageDeviceId = deviceID
return autorest.Response{}, nil
}
func (client *BridgeClientMock) GetBaseURI() string {
return "test"
}
var mockBridgeClient = BridgeClientMock{}
var mockGetBridgeClient = func() BridgeClient {
return &mockBridgeClient
}
type BridgeWithBrokenSend struct {
BridgeClientMock
err error
respose autorest.Response
}
func (client *BridgeWithBrokenSend) SendMessage(ctx context.Context, deviceID string, body *bridge.MessageBody) (autorest.Response, error) {
return client.respose, client.err
}
func TestNewAdapterFromConfigBridgeUrl(t *testing.T) {
adapter, _ := NewAdapter(&Config{D2CMessages: []D2CMessage{
{
Path: "/{id}/message",
DeviceIdPathParam: "id",
AuthHeader: "key",
},
}}, "localhost:1000")
assert.Equal(t, "localhost:1000", adapter.GetBridgeClient().GetBaseURI())
}
func TestMalformedJsonBody(t *testing.T) {
adapter, _ := NewAdapter(&Config{D2CMessages: []D2CMessage{
{
Path: "/{id}/message",
DeviceIdPathParam: "id",
AuthHeader: "key",
},
}}, "localhost:1000")
adapter.GetBridgeClient = mockGetBridgeClient
var jsonBody = []byte("not a JSON")
req, _ := http.NewRequest("POST", "/test_device/message", bytes.NewBuffer(jsonBody))
req.Header.Add("key", "test_key")
recorder := httptest.NewRecorder()
adapter.Router.ServeHTTP(recorder, req)
assert.Equal(t, 400, recorder.Code)
assert.Contains(t, recorder.Body.String(), "failed to decode JSON body")
}
func TestBasicTransform(t *testing.T) {
adapter, _ := NewAdapter(&Config{D2CMessages: []D2CMessage{
{
Path: "/{id}/message",
DeviceIdPathParam: "id",
AuthHeader: "key",
Transform: "{ data: .telemetry }",
},
}}, "localhost:1000")
adapter.GetBridgeClient = mockGetBridgeClient
var jsonBody = []byte(`{ "telemetry": {"temperature": 21} }`)
req, _ := http.NewRequest("POST", "/test_device/message", bytes.NewBuffer(jsonBody))
req.Header.Add("key", "test_key")
recorder := httptest.NewRecorder()
adapter.Router.ServeHTTP(recorder, req)
assert.Equal(t, 200, recorder.Code)
assert.Equal(t, "test_device", mockBridgeClient.LastSendMessageDeviceId)
assert.Equal(t, float64(21), mockBridgeClient.LastSendMessageBody.Data["temperature"])
}
func TestBadTransform(t *testing.T) {
adapter, _ := NewAdapter(&Config{D2CMessages: []D2CMessage{
{
Path: "/{id}/message",
DeviceIdPathParam: "id",
AuthHeader: "key",
Transform: "{(.a): 1}",
},
}}, "localhost:1000")
adapter.GetBridgeClient = mockGetBridgeClient
var jsonBody = []byte(`{ "a": 1 }`)
req, _ := http.NewRequest("POST", "/test_device/message", bytes.NewBuffer(jsonBody))
req.Header.Add("key", "test_key")
recorder := httptest.NewRecorder()
adapter.Router.ServeHTTP(recorder, req)
assert.Equal(t, 400, recorder.Code)
assert.Contains(t, recorder.Body.String(), "payload transformation failed")
}
func TestPassthroughTransform(t *testing.T) {
adapter, _ := NewAdapter(&Config{D2CMessages: []D2CMessage{
{
Path: "/{id}/message",
DeviceIdPathParam: "id",
AuthHeader: "key",
},
}}, "localhost:1000")
adapter.GetBridgeClient = mockGetBridgeClient
var jsonBody = []byte(`{ "data": {"temperature": 30} }`)
req, _ := http.NewRequest("POST", "/test_device_passthrough/message", bytes.NewBuffer(jsonBody))
req.Header.Add("key", "test_key")
recorder := httptest.NewRecorder()
adapter.Router.ServeHTTP(recorder, req)
assert.Equal(t, 200, recorder.Code)
assert.Equal(t, "test_device_passthrough", mockBridgeClient.LastSendMessageDeviceId)
assert.Equal(t, float64(30), mockBridgeClient.LastSendMessageBody.Data["temperature"])
}
func TestCreationTimeUtc(t *testing.T) {
adapter, _ := NewAdapter(&Config{D2CMessages: []D2CMessage{
{
Path: "/{id}/message",
DeviceIdPathParam: "id",
AuthHeader: "key",
Transform: "{ data: .telemetry, creationTimeUtc: .time }",
},
}}, "localhost:1000")
adapter.GetBridgeClient = mockGetBridgeClient
var jsonBody = []byte(`{ "telemetry": {"temperature": 22}, "time": "2031-09-22T12:42:31Z" }`)
req, _ := http.NewRequest("POST", "/test_device_time/message", bytes.NewBuffer(jsonBody))
req.Header.Add("key", "test_key")
recorder := httptest.NewRecorder()
adapter.Router.ServeHTTP(recorder, req)
assert.Equal(t, 200, recorder.Code)
assert.Equal(t, "test_device_time", mockBridgeClient.LastSendMessageDeviceId)
assert.Equal(t, float64(22), mockBridgeClient.LastSendMessageBody.Data["temperature"])
assert.Equal(t, "2031-09-22T12:42:31Z", mockBridgeClient.LastSendMessageBody.CreationTimeUtc.String())
}
func TestBadCreationTimeUtc(t *testing.T) {
adapter, _ := NewAdapter(&Config{D2CMessages: []D2CMessage{
{
Path: "/{id}/message",
DeviceIdPathParam: "id",
AuthHeader: "key",
Transform: "{ data: .telemetry, creationTimeUtc: .time }",
},
}}, "localhost:1000")
adapter.GetBridgeClient = mockGetBridgeClient
var jsonBody = []byte(`{ "telemetry": {"temperature": 22}, "time": "abc" }`)
req, _ := http.NewRequest("POST", "/test_device_time/message", bytes.NewBuffer(jsonBody))
req.Header.Add("key", "test_key")
recorder := httptest.NewRecorder()
adapter.Router.ServeHTTP(recorder, req)
assert.Equal(t, 400, recorder.Code)
assert.Contains(t, recorder.Body.String(), "failed to parse \\\"creationTimeUtc\\\"")
}
func TestBadOutputPayload(t *testing.T) {
adapter, _ := NewAdapter(&Config{D2CMessages: []D2CMessage{
{
Path: "/{id}/message",
DeviceIdPathParam: "id",
AuthHeader: "key",
Transform: "{ data: .telemetry }",
},
}}, "localhost:1000")
adapter.GetBridgeClient = mockGetBridgeClient
var jsonBody = []byte(`{ "telemetry": "bad data" }`)
req, _ := http.NewRequest("POST", "/test_device_payload/message", bytes.NewBuffer(jsonBody))
req.Header.Add("key", "test_key")
recorder := httptest.NewRecorder()
adapter.Router.ServeHTTP(recorder, req)
assert.Equal(t, 400, recorder.Code)
assert.Contains(t, recorder.Body.String(), "failed to transform payload to expected Device Bridge format")
}
func ExampleAuthQueryParam() {
adapter, _ := NewAdapter(&Config{D2CMessages: []D2CMessage{
{
Path: "/{id}/message",
DeviceIdPathParam: "id",
AuthQueryParam: "key",
},
}}, "localhost:1000")
adapter.GetBridgeClient = mockGetBridgeClient
var jsonBody = []byte(`{ }`)
req, _ := http.NewRequest("POST", "/test_device/message?key=my_key", bytes.NewBuffer(jsonBody))
recorder := httptest.NewRecorder()
adapter.Router.ServeHTTP(recorder, req)
fmt.Println(recorder.Code, mockBridgeClient.LastAuthorizer)
// Output: 200 &{map[x-api-key:my_key] map[]}
}
func TestBadAuthQueryParam(t *testing.T) {
adapter, _ := NewAdapter(&Config{D2CMessages: []D2CMessage{
{
Path: "/{id}/message",
DeviceIdPathParam: "id",
AuthQueryParam: "key",
},
}}, "localhost:1000")
adapter.GetBridgeClient = mockGetBridgeClient
var jsonBody = []byte(`{ }`)
req, _ := http.NewRequest("POST", "/test_device/message", bytes.NewBuffer(jsonBody))
recorder := httptest.NewRecorder()
adapter.Router.ServeHTTP(recorder, req)
assert.Equal(t, 400, recorder.Code)
assert.Contains(t, recorder.Body.String(), "expected auth query parameter \\\"key\\\" to be defined")
}
func ExampleAuthHeader() {
adapter, _ := NewAdapter(&Config{D2CMessages: []D2CMessage{
{
Path: "/{id}/message",
DeviceIdPathParam: "id",
AuthHeader: "key",
},
}}, "localhost:1000")
adapter.GetBridgeClient = mockGetBridgeClient
var jsonBody = []byte(`{ }`)
req, _ := http.NewRequest("POST", "/test_device/message", bytes.NewBuffer(jsonBody))
req.Header.Add("key", "my_header_key")
recorder := httptest.NewRecorder()
adapter.Router.ServeHTTP(recorder, req)
fmt.Println(recorder.Code, mockBridgeClient.LastAuthorizer)
// Output: 200 &{map[x-api-key:my_header_key] map[]}
}
func TestNoAuth(t *testing.T) {
adapter, _ := NewAdapter(&Config{D2CMessages: []D2CMessage{
{
Path: "/{id}/message",
DeviceIdPathParam: "id",
},
}}, "localhost:1000")
adapter.GetBridgeClient = mockGetBridgeClient
var jsonBody = []byte(`{ }`)
req, _ := http.NewRequest("POST", "/test_device/message", bytes.NewBuffer(jsonBody))
recorder := httptest.NewRecorder()
adapter.Router.ServeHTTP(recorder, req)
assert.Equal(t, 400, recorder.Code)
assert.Contains(t, recorder.Body.String(), "no auth method specified")
}
func TestDeviceIdBodyQuery(t *testing.T) {
adapter, _ := NewAdapter(&Config{D2CMessages: []D2CMessage{
{
Path: "/message",
DeviceIdBodyQuery: ".device.id",
AuthHeader: "key",
},
}}, "localhost:1000")
adapter.GetBridgeClient = mockGetBridgeClient
var jsonBody = []byte(`{ "device": { "id": "body_id" }}`)
req, _ := http.NewRequest("POST", "/message", bytes.NewBuffer(jsonBody))
req.Header.Add("key", "test_key")
recorder := httptest.NewRecorder()
adapter.Router.ServeHTTP(recorder, req)
assert.Equal(t, 200, recorder.Code)
assert.Equal(t, "body_id", mockBridgeClient.LastSendMessageDeviceId)
}
func TestDeviceIdBodyQueryFieldMissing(t *testing.T) {
adapter, _ := NewAdapter(&Config{D2CMessages: []D2CMessage{
{
Path: "/message",
DeviceIdBodyQuery: "{(.device): 1}",
AuthHeader: "key",
},
}}, "localhost:1000")
adapter.GetBridgeClient = mockGetBridgeClient
var jsonBody = []byte(`{ "device": {}}`)
req, _ := http.NewRequest("POST", "/message", bytes.NewBuffer(jsonBody))
req.Header.Add("key", "test_key")
recorder := httptest.NewRecorder()
adapter.Router.ServeHTTP(recorder, req)
assert.Equal(t, 400, recorder.Code)
assert.Contains(t, recorder.Body.String(), "device Id body query failed")
}
func TestDeviceIdBodyQueryBadFormat(t *testing.T) {
adapter, _ := NewAdapter(&Config{D2CMessages: []D2CMessage{
{
Path: "/message",
DeviceIdBodyQuery: ".device.id",
AuthHeader: "key",
},
}}, "localhost:1000")
adapter.GetBridgeClient = mockGetBridgeClient
var jsonBody = []byte(`{ "device": { "id": 123 }}`)
req, _ := http.NewRequest("POST", "/message", bytes.NewBuffer(jsonBody))
req.Header.Add("key", "test_key")
recorder := httptest.NewRecorder()
adapter.Router.ServeHTTP(recorder, req)
assert.Equal(t, 400, recorder.Code)
assert.Contains(t, recorder.Body.String(), "expected result from device Id body query to be string")
}
func TestDeviceIdPathParameterMissing(t *testing.T) {
adapter, _ := NewAdapter(&Config{D2CMessages: []D2CMessage{
{
Path: "/{another_id}/message",
AuthHeader: "key",
},
}}, "localhost:1000")
adapter.GetBridgeClient = mockGetBridgeClient
var jsonBody = []byte(`{ }`)
req, _ := http.NewRequest("POST", "/test_device/message", bytes.NewBuffer(jsonBody))
req.Header.Add("key", "test_key")
recorder := httptest.NewRecorder()
adapter.Router.ServeHTTP(recorder, req)
assert.Equal(t, 400, recorder.Code)
assert.Contains(t, recorder.Body.String(), "no device Id specified")
}
func TestBridgeStatusCode(t *testing.T) {
adapter, _ := NewAdapter(&Config{D2CMessages: []D2CMessage{
{
Path: "/{id}/message",
DeviceIdPathParam: "id",
AuthHeader: "key",
},
}}, "localhost:1000")
var brokenBridgeClient = BridgeWithBrokenSend{err: errors.New("bad request"), respose: autorest.Response{&http.Response{StatusCode: 401}}}
var mockGetBrokenBridgeClient = func() BridgeClient {
return &brokenBridgeClient
}
adapter.GetBridgeClient = mockGetBrokenBridgeClient
var jsonBody = []byte(`{ }`)
req, _ := http.NewRequest("POST", "/test_device/message", bytes.NewBuffer(jsonBody))
req.Header.Add("key", "test_key")
recorder := httptest.NewRecorder()
adapter.Router.ServeHTTP(recorder, req)
assert.Equal(t, 401, recorder.Code)
assert.Contains(t, recorder.Body.String(), "call to Device Bridge failed: bad request")
}
func TestBridgeEmptyStatusCodeReturns500(t *testing.T) {
adapter, _ := NewAdapter(&Config{D2CMessages: []D2CMessage{
{
Path: "/{id}/message",
DeviceIdPathParam: "id",
AuthHeader: "key",
},
}}, "localhost:1000")
var brokenBridgeClient = BridgeWithBrokenSend{err: errors.New("bad request"), respose: autorest.Response{}}
var mockGetBrokenBridgeClient = func() BridgeClient {
return &brokenBridgeClient
}
adapter.GetBridgeClient = mockGetBrokenBridgeClient
var jsonBody = []byte(`{ }`)
req, _ := http.NewRequest("POST", "/test_device/message", bytes.NewBuffer(jsonBody))
req.Header.Add("key", "test_key")
recorder := httptest.NewRecorder()
adapter.Router.ServeHTTP(recorder, req)
assert.Equal(t, 500, recorder.Code)
assert.Contains(t, recorder.Body.String(), "call to Device Bridge failed: bad request")
}
func Test404(t *testing.T) {
adapter, _ := NewAdapter(&Config{D2CMessages: []D2CMessage{
{
Path: "/{id}/message",
DeviceIdPathParam: "id",
AuthHeader: "key",
},
}}, "localhost:1000")
adapter.GetBridgeClient = mockGetBridgeClient
var jsonBody = []byte(`{ }`)
req, _ := http.NewRequest("POST", "/anotherpath", bytes.NewBuffer(jsonBody))
req.Header.Add("key", "test_key")
recorder := httptest.NewRecorder()
adapter.Router.ServeHTTP(recorder, req)
assert.Equal(t, 404, recorder.Code)
}
func TestMultipleRoutes(t *testing.T) {
adapter, _ := NewAdapter(&Config{D2CMessages: []D2CMessage{
{
Path: "/{id}/message",
DeviceIdPathParam: "id",
AuthHeader: "key",
Transform: "{ data: .telemetry }",
},
{
Path: "/another_message",
DeviceIdBodyQuery: ".device.id",
AuthHeader: "another_key",
},
}}, "localhost:1000")
adapter.GetBridgeClient = mockGetBridgeClient
var jsonBody = []byte(`{ "telemetry": {"temperature": 21} }`)
req, _ := http.NewRequest("POST", "/test_device/message", bytes.NewBuffer(jsonBody))
req.Header.Add("key", "test_key")
recorder := httptest.NewRecorder()
adapter.Router.ServeHTTP(recorder, req)
assert.Equal(t, 200, recorder.Code)
assert.Equal(t, "test_device", mockBridgeClient.LastSendMessageDeviceId)
assert.Equal(t, float64(21), mockBridgeClient.LastSendMessageBody.Data["temperature"])
jsonBody = []byte(`{ "device": { "id": "body_id" }, "data": {"humidity": 30}}`)
req, _ = http.NewRequest("POST", "/another_message", bytes.NewBuffer(jsonBody))
req.Header.Add("another_key", "test_key")
recorder = httptest.NewRecorder()
adapter.Router.ServeHTTP(recorder, req)
assert.Equal(t, 200, recorder.Code)
assert.Equal(t, "body_id", mockBridgeClient.LastSendMessageDeviceId)
assert.Equal(t, float64(30), mockBridgeClient.LastSendMessageBody.Data["humidity"])
}

Просмотреть файл

@ -0,0 +1,61 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
package main
import (
"fmt"
"github.com/itchyny/gojq"
)
// TransformEngine keeps a set of pre-compiled jq queries ready for execution
type TransformEngine struct {
transforms map[string]*gojq.Code
}
func NewTransformEngine() *TransformEngine {
return &TransformEngine{make(map[string]*gojq.Code)}
}
// AddTransform saves a query, identified by Id, for later execution
func (engine *TransformEngine) AddTransform(id string, query string) error {
parsed, err := gojq.Parse(query)
if err != nil {
return err
}
compiled, err := gojq.Compile(parsed)
if err != nil {
return err
}
engine.transforms[id] = compiled
return nil
}
// Execute executes the transformation identified by Id over the given input.
//
// Thread safe.
func (engine *TransformEngine) Execute(id string, input map[string]interface{}) (interface{}, error) {
compiled, ok := engine.transforms[id]
if !ok {
return nil, fmt.Errorf("transform-adapter: transformation for id %s not found", id)
}
iter := compiled.Run(input)
result, ok := iter.Next()
if !ok {
return nil, fmt.Errorf("transform-adapter: transform id %s generated empty result", id)
}
if err, ok := result.(error); ok {
return nil, fmt.Errorf("transform-adapter: transform id %s failed: %s", id, err)
}
if _, ok := iter.Next(); ok {
return nil, fmt.Errorf("transform-adapter: transform id %s generated multiple results", id)
}
return result, nil
}

Просмотреть файл

@ -0,0 +1,5 @@
{
data: .obj
| map( { (.name | tostring): .value } )
| add
}

Просмотреть файл

@ -0,0 +1,58 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
package main
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
)
func TestTransformEngineAddTransform(t *testing.T) {
engine := NewTransformEngine()
assert.NoError(t, engine.AddTransform("pass-through", "."))
assert.NoError(t, engine.AddTransform("sample", ". | {a, b}"))
assert.Error(t, engine.AddTransform("invalid", ".{a, b}"))
}
// Simple query that maps { "a": 1 } to { "b": 1}.
func ExampleTransformEngineExecuteSample() {
engine := NewTransformEngine()
engine.AddTransform("sample", "{ b: .a}")
result, _ := engine.Execute("sample", map[string]interface{}{"a": 1})
fmt.Println(result)
// Output: map[b:1]
}
// Simple multi-line query that maps { "a": 1 } to { "b": 1}.
func ExampleTransformEngineExecuteMultilineQuery() {
engine := NewTransformEngine()
engine.AddTransform("sample", `{
b: .a
}`)
result, _ := engine.Execute("sample", map[string]interface{}{"a": 1})
fmt.Println(result)
// Output: map[b:1]
}
func TestTransformEngineExecuteNotFound(t *testing.T) {
engine := NewTransformEngine()
assert.NoError(t, engine.AddTransform("a-transform", "."))
_, err := engine.Execute("another-transform", map[string]interface{}{})
assert.EqualError(t, err, "transform-adapter: transformation for id another-transform not found")
}
func TestTransformEngineExecuteFail(t *testing.T) {
engine := NewTransformEngine()
assert.NoError(t, engine.AddTransform("bad-transform", "{(.a): 1}"))
_, err := engine.Execute("bad-transform", map[string]interface{}{"a": 1})
assert.EqualError(t, err, "transform-adapter: transform id bad-transform failed: expected a string for object key but got: number (1)")
}
func TestTransformEngineExecuteMultipleResults(t *testing.T) {
engine := NewTransformEngine()
assert.NoError(t, engine.AddTransform("multiple-results", "{a: 1},{b: 2}"))
_, err := engine.Execute("multiple-results", map[string]interface{}{})
assert.EqualError(t, err, "transform-adapter: transform id multiple-results generated multiple results")
}