remove extra 'thief' object layer (#115)

This commit is contained in:
Bert Kleewein 2021-08-10 10:53:33 -07:00 коммит произвёл GitHub
Родитель 96850b6d95
Коммит ae706d3b50
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
13 изменённых файлов: 242 добавлений и 358 удалений

Просмотреть файл

@ -18,8 +18,6 @@ groups:
- name: Fields
desc: "Names of fields inside telemetry messages and twins"
values:
- name: THIEF
value: "thief"
- name: PROPERTIES
value: "properties"
- name: REPORTED
@ -64,7 +62,7 @@ groups:
- name: REQUESTED_SERVICE_POOL
value: "requestedServicePool"
- name: ReportedProperties
desc: "properties.reported.thief"
desc: "properties.reported"
values:
- name: SYSTEM_PROPERTIES
value: "systemProperties"
@ -85,19 +83,19 @@ groups:
- name: EXIT_REASON
value: "exitReason"
- name: Pairing
desc: "properties.reported.thief.pairing"
desc: "properties.reported.pairing"
values:
- name: SERVICE_INSTANCE_ID
value: "serviceInstanceId"
- name: RUN_ID
value: "runId"
- name: TestContent
desc: "properties.reported.thief.testContent"
desc: "properties.reported.testContent"
values:
- name: REPORTED_PROPERTY_TEST
value: "reportedPropertyTest"
- name: ReportedPropertyTest
desc: "properties.reported.thief.testContent.reportedPropertytest"
desc: "properties.reported.testContent.reportedPropertytest"
values:
- name: ADD_OPERATION_ID
value: "addOperationId"
@ -107,21 +105,21 @@ groups:
value: "prop_e2e"
- name: Desired
desc: "properties.desired.thief"
desc: "properties.desired"
values:
- name: PAIRING
value: "pairing"
- name: TEST_CONTENT
value: "testContent"
- name: Pairing
desc: "propeties.desired.thief.pairing"
desc: "propeties.desired.pairing"
values:
- name: SERVICE_INSTANCE_ID
value: "serviceInstanceId"
- name: RUN_ID
value: "runId"
- name: TestContent
desc: "propeties.desired.thief.testContent"
desc: "propeties.desired.testContent"
values:
- name: TWIN_GUID
value: "twinGuid"

Просмотреть файл

@ -18,7 +18,6 @@ class Fields(object):
Names of fields inside telemetry messages and twins
"""
THIEF = "thief"
PROPERTIES = "properties"
REPORTED = "reported"
DESIRED = "desired"
@ -54,9 +53,9 @@ class Fields(object):
DESIRED_PROPERTIES = "desiredProperties"
REQUESTED_SERVICE_POOL = "requestedServicePool"
# -------------------------
# properties.reported.thief
# -------------------------
# -------------------
# properties.reported
# -------------------
SYSTEM_PROPERTIES = "systemProperties"
SESSION_METRICS = "sessionMetrics"
TEST_METRICS = "testMetrics"
@ -67,39 +66,39 @@ class Fields(object):
TEST_CONTENT = "testContent"
EXIT_REASON = "exitReason"
# ---------------------------------
# properties.reported.thief.pairing
# ---------------------------------
# ---------------------------
# properties.reported.pairing
# ---------------------------
SERVICE_INSTANCE_ID = "serviceInstanceId"
RUN_ID = "runId"
# -------------------------------------
# properties.reported.thief.testContent
# -------------------------------------
# -------------------------------
# properties.reported.testContent
# -------------------------------
REPORTED_PROPERTY_TEST = "reportedPropertyTest"
# ----------------------------------------------------------
# properties.reported.thief.testContent.reportedPropertytest
# ----------------------------------------------------------
# ----------------------------------------------------
# properties.reported.testContent.reportedPropertytest
# ----------------------------------------------------
ADD_OPERATION_ID = "addOperationId"
REMOVE_OPERATION_ID = "removeOperationId"
E2E_PROPERTY = "prop_e2e"
# ------------------------
# properties.desired.thief
# ------------------------
# ------------------
# properties.desired
# ------------------
PAIRING = "pairing"
TEST_CONTENT = "testContent"
# -------------------------------
# propeties.desired.thief.pairing
# -------------------------------
# -------------------------
# propeties.desired.pairing
# -------------------------
SERVICE_INSTANCE_ID = "serviceInstanceId"
RUN_ID = "runId"
# -----------------------------------
# propeties.desired.thief.testContent
# -----------------------------------
# -----------------------------
# propeties.desired.testContent
# -----------------------------
TWIN_GUID = "twinGuid"
# -----------------------------------

Просмотреть файл

@ -427,16 +427,17 @@ class DeviceApp(object):
"""
# First remove all old props (in case of schema change). Also clears out old results.
props = {Fields.THIEF: None}
props = {}
for key in self.client.get_twin()["reported"]:
if not key.startswith("$"):
props[key] = None
self.client.patch_twin_reported_properties(props)
props = {
Fields.THIEF: {
Fields.SYSTEM_PROPERTIES: self.get_system_properties(),
Fields.SESSION_METRICS: self.get_session_metrics(),
Fields.TEST_METRICS: self.get_test_metrics(),
Fields.CONFIG: self.config,
}
Fields.SYSTEM_PROPERTIES: self.get_system_properties(),
Fields.SESSION_METRICS: self.get_session_metrics(),
Fields.TEST_METRICS: self.get_test_metrics(),
Fields.CONFIG: self.config,
}
self.client.patch_twin_reported_properties(props)
@ -476,8 +477,8 @@ class DeviceApp(object):
# Note: we're changing the dictionary that the user passed in.
# This isn't the best idea, but it works and it saves us from deep copies
if self.service_instance_id:
payload[Fields.THIEF][Fields.SERVICE_INSTANCE_ID] = self.service_instance_id
payload[Fields.THIEF][Fields.RUN_ID] = run_id
payload[Fields.SERVICE_INSTANCE_ID] = self.service_instance_id
payload[Fields.RUN_ID] = run_id
# This function only creates the message. The caller needs to queue it up for sending.
msg = Message(json.dumps(payload))
@ -500,11 +501,9 @@ class DeviceApp(object):
]:
operation_ticket = self.operation_ticket_list.make_event_based_operation_ticket()
pairing_payload = {
Fields.THIEF: {
Fields.CMD: Commands.PAIR_WITH_SERVICE_APP,
Fields.OPERATION_ID: operation_ticket.id,
Fields.REQUESTED_SERVICE_POOL: requested_service_pool,
}
Fields.CMD: Commands.PAIR_WITH_SERVICE_APP,
Fields.OPERATION_ID: operation_ticket.id,
Fields.REQUESTED_SERVICE_POOL: requested_service_pool,
}
msg = self.create_message_from_dict(pairing_payload)
self.client.send_message(msg)
@ -515,13 +514,12 @@ class DeviceApp(object):
timeout=self.config[Settings.PAIRING_REQUEST_SEND_INTERVAL_IN_SECONDS]
)
if operation_ticket.event.isSet():
msg = json.loads(operation_ticket.result_message.data.decode())
body = json.loads(operation_ticket.result_message.data.decode())
logger.info("Received pairing response: {}".format(pprint.pformat(msg)))
logger.info("Received pairing response: {}".format(pprint.pformat(body)))
event_logger.info(Events.RECEIVED_PAIRING_RESPONSE)
thief = msg[Fields.THIEF]
self.service_instance_id = thief[Fields.SERVICE_INSTANCE_ID]
self.service_instance_id = body[Fields.SERVICE_INSTANCE_ID]
event_logger.info(Events.PAIRING_COMPLETE)
return
@ -579,13 +577,11 @@ class DeviceApp(object):
operation_ticket = self.operation_ticket_list.make_event_based_operation_ticket()
payload = {
Fields.THIEF: {
Fields.CMD: Commands.SEND_OPERATION_RESPONSE,
Fields.OPERATION_ID: operation_ticket.id,
Fields.SESSION_METRICS: self.get_session_metrics(),
Fields.TEST_METRICS: self.get_test_metrics(),
Fields.SYSTEM_HEALTH_METRICS: self.get_system_health_telemetry(),
}
Fields.CMD: Commands.SEND_OPERATION_RESPONSE,
Fields.OPERATION_ID: operation_ticket.id,
Fields.SESSION_METRICS: self.get_session_metrics(),
Fields.TEST_METRICS: self.get_test_metrics(),
Fields.SYSTEM_HEALTH_METRICS: self.get_system_health_telemetry(),
}
msg = self.create_message_from_dict(payload)
@ -634,11 +630,9 @@ class DeviceApp(object):
to run in it's own thread. Returns when the PUBACK returns.
"""
payload = {
Fields.THIEF: {
Fields.CMD: Commands.SEND_OPERATION_RESPONSE,
Fields.OPERATION_ID: operation_id,
"extraPayload": self.make_random_payload(),
}
Fields.CMD: Commands.SEND_OPERATION_RESPONSE,
Fields.OPERATION_ID: operation_id,
"extraPayload": self.make_random_payload(),
}
msg = self.create_message_from_dict(payload)
@ -703,7 +697,7 @@ class DeviceApp(object):
callback for desired property patch reception
"""
# props that have the pairing structure go to `incoming_pairing_message_queue`
if patch.get(Fields.THIEF, {}).get(Fields.PAIRING, {}):
if patch.get(Fields.PAIRING, {}):
self.incoming_pairing_message_queue.put(patch)
# other props go into incoming_deisred_property_patch_queue
@ -714,12 +708,10 @@ class DeviceApp(object):
"""
Callback for receiving c2d messages.
"""
obj = json.loads(msg.data.decode())
thief = obj.get(Fields.THIEF)
body = json.loads(msg.data.decode())
if thief and thief[Fields.RUN_ID] == run_id:
cmd = thief[Fields.CMD]
if cmd in [
if body and body[Fields.RUN_ID] == run_id:
if body[Fields.CMD] in [
Commands.PAIR_RESPONSE,
Commands.OPERATION_RESPONSE,
Commands.METHOD_RESPONSE,
@ -728,10 +720,10 @@ class DeviceApp(object):
self.incoming_executor.submit(self.handle_operation_response, msg)
else:
logger.warning("Unknown command received: {}".format(obj))
logger.warning("Unknown command received: {}".format(body))
else:
logger.warning("C2D received, but it's not for us: {}".format(obj))
logger.warning("C2D received, but it's not for us: {}".format(body))
def handle_operation_response(self, msg):
"""
@ -740,15 +732,15 @@ class DeviceApp(object):
the service has responded.
"""
thief = json.loads(msg.data.decode())[Fields.THIEF]
body = json.loads(msg.data.decode())
operation_ids = thief.get(Fields.OPERATION_IDS, [])
operation_ids = body.get(Fields.OPERATION_IDS, [])
if not operation_ids:
operation_ids = [
thief.get(Fields.OPERATION_ID),
body.get(Fields.OPERATION_ID),
]
logger.info("Received {} message with {}".format(thief[Fields.CMD], operation_ids))
logger.info("Received {} message with {}".format(body[Fields.CMD], operation_ids))
for operation_id in operation_ids:
operation_ticket = self.operation_ticket_list.get(operation_id)
@ -789,7 +781,7 @@ class DeviceApp(object):
"""
test function to send a single reported property and then clear it after the
server verifies it. It does this by setting properties inside
`properties/reported/thief/testContent/reportedPropertyTest`. Each property has
`properties/reported/testContent/reportedPropertyTest`. Each property has
a `addOperationId` value and a `removeOperationId` value. When the service sees the
property added, it sends the `addOperationId` to the device. When the service sees the
property removed, it sends the `removeOperationId` to the device. This way the device
@ -808,11 +800,7 @@ class DeviceApp(object):
metrics[Fields.SESSION_METRICS] = self.get_session_metrics()
def make_reported_prop(property_name, val):
return {
Fields.THIEF: {
Fields.TEST_CONTENT: {Fields.REPORTED_PROPERTY_TEST: {property_name: val}}
}
}
return {Fields.TEST_CONTENT: {Fields.REPORTED_PROPERTY_TEST: {property_name: val}}}
add_operation = self.operation_ticket_list.make_event_based_operation_ticket()
remove_operation = self.operation_ticket_list.make_event_based_operation_ticket()
@ -827,7 +815,7 @@ class DeviceApp(object):
logger.info("Adding test property {}".format(prop_name))
props = make_reported_prop(prop_name, prop_value)
props[Fields.THIEF].update(metrics)
props.update(metrics)
self.client.patch_twin_reported_properties(props)
if add_operation.event.wait(timeout=self.config[Settings.OPERATION_TIMEOUT_IN_SECONDS]):
@ -856,12 +844,8 @@ class DeviceApp(object):
twin_guid = str(uuid.uuid4())
payload_set_desired_props = {
Fields.THIEF: {
Fields.CMD: Commands.SET_DESIRED_PROPS,
Fields.DESIRED_PROPERTIES: {
Fields.THIEF: {Fields.TEST_CONTENT: {Fields.TWIN_GUID: twin_guid}}
},
}
Fields.CMD: Commands.SET_DESIRED_PROPS,
Fields.DESIRED_PROPERTIES: {Fields.TEST_CONTENT: {Fields.TWIN_GUID: twin_guid}},
}
msg = self.create_message_from_dict(payload_set_desired_props)
logger.info("Sending message to update desired getTwin property to {}".format(twin_guid))
@ -879,8 +863,7 @@ class DeviceApp(object):
except queue.Empty:
break
else:
thief = patch.get(Fields.THIEF, {})
test_content = thief.get(Fields.TEST_CONTENT, {})
test_content = patch.get(Fields.TEST_CONTENT, {})
actual_twin_guid = test_content.get(Fields.TWIN_GUID)
if actual_twin_guid == twin_guid:
logger.info("received expected patch: {} succeeded".format(twin_guid))
@ -907,8 +890,7 @@ class DeviceApp(object):
logger.info("Got twin: {}".format(twin))
desired = twin.get(Fields.DESIRED, {})
thief = desired.get(Fields.THIEF, {})
test_content = thief.get(Fields.TEST_CONTENT, {})
test_content = desired.get(Fields.TEST_CONTENT, {})
actual_twin_guid = test_content.get(Fields.TWIN_GUID)
if actual_twin_guid == twin_guid:
@ -964,12 +946,10 @@ class DeviceApp(object):
payload = None
command_payload = {
Fields.THIEF: {
Fields.CMD: Commands.INVOKE_METHOD,
Fields.OPERATION_ID: operation_ticket.id,
Fields.METHOD_NAME: method.method_name,
Fields.METHOD_INVOKE_PAYLOAD: payload,
}
Fields.CMD: Commands.INVOKE_METHOD,
Fields.OPERATION_ID: operation_ticket.id,
Fields.METHOD_NAME: method.method_name,
Fields.METHOD_INVOKE_PAYLOAD: payload,
}
msg = self.create_message_from_dict(command_payload)
logger.info("sending method invoke with guid={}".format(operation_ticket.id))
@ -981,21 +961,21 @@ class DeviceApp(object):
operation_ticket.id, operation_ticket.result_message
)
)
thief = json.loads(operation_ticket.result_message.data.decode())[Fields.THIEF]
body = json.loads(operation_ticket.result_message.data.decode())
fail = False
if thief[Fields.METHOD_RESPONSE_STATUS_CODE] != method.expected_status_code:
if body[Fields.METHOD_RESPONSE_STATUS_CODE] != method.expected_status_code:
logger.error(
"Unexpected method status: id={}, received {} expected {}".format(
operation_ticket.id,
thief[Fields.METHOD_RESPONSE_STATUS_CODE],
body[Fields.METHOD_RESPONSE_STATUS_CODE],
method.expected_status_code,
)
)
fail = True
if thief[Fields.METHOD_RESPONSE_PAYLOAD] != payload:
if body[Fields.METHOD_RESPONSE_PAYLOAD] != payload:
logger.error(
"Unexpected payload: id={}, received {} expected {}".format(
operation_ticket.id, thief[Fields.METHOD_RESPONSE_PAYLOAD], payload
operation_ticket.id, body[Fields.METHOD_RESPONSE_PAYLOAD], payload
)
)
fail = True
@ -1016,11 +996,9 @@ class DeviceApp(object):
sent_test_payload = self.make_random_payload()
command_payload = {
Fields.THIEF: {
Fields.CMD: Commands.SEND_C2D,
Fields.OPERATION_ID: operation_ticket.id,
Fields.TEST_C2D_PAYLOAD: sent_test_payload,
}
Fields.CMD: Commands.SEND_C2D,
Fields.OPERATION_ID: operation_ticket.id,
Fields.TEST_C2D_PAYLOAD: sent_test_payload,
}
logger.info("Requesting C2D for operationId {}".format(operation_ticket.id))
@ -1030,11 +1008,11 @@ class DeviceApp(object):
logger.info("C2D received for operationId {}".format(operation_ticket.id))
self.metrics.receive_c2d_count_received.increment()
thief = json.loads(operation_ticket.result_message.data.decode())[Fields.THIEF]
if thief[Fields.TEST_C2D_PAYLOAD] != sent_test_payload:
body = json.loads(operation_ticket.result_message.data.decode())
if body[Fields.TEST_C2D_PAYLOAD] != sent_test_payload:
logger.warning(
"C2D payload for operationId {} does not match. Expected={}, Received={}".format(
operation_ticket.id, sent_test_payload, thief[Fields.TEST_C2D_PAYLOAD]
operation_ticket.id, sent_test_payload, body[Fields.TEST_C2D_PAYLOAD]
)
)
raise ThiefFatalException("C2D payload mismatch")
@ -1160,11 +1138,9 @@ class DeviceApp(object):
# Update one last time. This is required because the service app relies
# on RUN_STATE to know when we're dead
props = {
Fields.THIEF: {
Fields.SESSION_METRICS: self.get_session_metrics(),
Fields.TEST_METRICS: self.get_test_metrics(),
Fields.SYSTEM_HEALTH_METRICS: self.get_system_health_telemetry(),
}
Fields.SESSION_METRICS: self.get_session_metrics(),
Fields.TEST_METRICS: self.get_test_metrics(),
Fields.SYSTEM_HEALTH_METRICS: self.get_system_health_telemetry(),
}
logger.info("Results: {}".format(pprint.pformat(props)))
self.client.patch_twin_reported_properties(props)

Просмотреть файл

@ -19,25 +19,6 @@ PAIRING_REQUEST_TIMEOUT_INTERVAL_IN_SECONDS = 180
PAIRING_REQUEST_SEND_INTERVAL_IN_SECONDS = 10
def create_message_from_dict(payload, service_instance_id, run_id):
"""
helper function to create a message from a dict object
"""
# Note: we're changing the dictionary that the user passed in.
# This isn't the best idea, but it works and it saves us from deep copies
if service_instance_id:
payload[Fields.THIEF][Fields.SERVICE_INSTANCE_ID] = service_instance_id
payload[Fields.THIEF][Fields.RUN_ID] = run_id
# This function only creates the message. The caller needs to queue it up for sending.
msg = Message(json.dumps(payload))
msg.content_type = Const.JSON_CONTENT_TYPE
msg.content_encoding = Const.JSON_CONTENT_ENCODING
return msg
@pytest.fixture(scope="class")
def client_kwargs():
return {}
@ -60,22 +41,22 @@ async def connected_client(brand_new_client):
@pytest.fixture(scope="class")
async def c2d_waiter(event_loop, connected_client, operation_ticket_list, run_id):
async def handle_c2d(msg):
thief = json.loads(msg.data.decode()).get(Fields.THIEF, {})
logger.info("Received {}".format(thief))
body = json.loads(msg.data.decode())
logger.info("Received {}".format(body))
if not thief:
logger.warning("No thief object in payload")
if not body:
logger.warning("No payload")
return
if thief.get(Fields.RUN_ID) != run_id:
if body.get(Fields.RUN_ID) != run_id:
logger.warning(
"run_id does not match: expected={}, received={}".format(
run_id, thief.get(Fields.RUN_ID)
run_id, body.get(Fields.RUN_ID)
)
)
return
cmd = thief.get(Fields.CMD)
cmd = body.get(Fields.CMD)
if cmd not in [
Commands.PAIR_RESPONSE,
Commands.OPERATION_RESPONSE,
@ -85,13 +66,13 @@ async def c2d_waiter(event_loop, connected_client, operation_ticket_list, run_id
logger.warning("unknown cmd: {}".format(cmd))
return
operation_ids = thief.get(Fields.OPERATION_IDS, [])
operation_ids = body.get(Fields.OPERATION_IDS, [])
if not operation_ids:
operation_ids = [
thief.get(Fields.OPERATION_ID),
body.get(Fields.OPERATION_ID),
]
logger.info("Received {} message with {}".format(thief[Fields.CMD], operation_ids))
logger.info("Received {} message with {}".format(body[Fields.CMD], operation_ids))
for operation_id in operation_ids:
operation_ticket = operation_ticket_list.get(operation_id)
@ -116,12 +97,10 @@ async def paired_client(
)
pairing_payload = {
Fields.THIEF: {
Fields.CMD: Commands.PAIR_WITH_SERVICE_APP,
Fields.OPERATION_ID: operation_ticket.id,
Fields.REQUESTED_SERVICE_POOL: requested_service_pool,
Fields.RUN_ID: run_id,
}
Fields.CMD: Commands.PAIR_WITH_SERVICE_APP,
Fields.OPERATION_ID: operation_ticket.id,
Fields.REQUESTED_SERVICE_POOL: requested_service_pool,
Fields.RUN_ID: run_id,
}
msg = Message(json.dumps(pairing_payload))
@ -139,9 +118,9 @@ async def paired_client(
pass
else:
logger.info("pairing response received")
msg = json.loads(operation_ticket.result_message.data.decode())
body = json.loads(operation_ticket.result_message.data.decode())
return collections.namedtuple("ConnectedClient", "client service_instance_id")(
connected_client, msg[Fields.THIEF][Fields.SERVICE_INSTANCE_ID]
connected_client, body[Fields.SERVICE_INSTANCE_ID]
)
raise Exception("Service app did not respond in time")

Просмотреть файл

@ -8,7 +8,6 @@ import collections
import json
from thief_constants import Fields, Const
from client_fixtures import (
create_message_from_dict,
client_kwargs,
brand_new_client,
connected_client,
@ -30,27 +29,23 @@ logger.setLevel(level=logging.INFO)
@pytest.fixture(scope="class")
def message_factory(run_id, service_instance_id, operation_ticket_factory): # noqa: F811
def wrapper_function(original_payload, cmd=None):
def wrapper_function(original_body, cmd=None):
operation_ticket = operation_ticket_factory()
payload = copy.deepcopy(original_payload)
if Fields.THIEF not in payload:
payload[Fields.THIEF] = {}
body = copy.deepcopy(original_body)
thief = payload[Fields.THIEF]
thief[Fields.OPERATION_ID] = operation_ticket.id
thief[Fields.SERVICE_INSTANCE_ID] = service_instance_id
thief[Fields.RUN_ID] = run_id
body[Fields.OPERATION_ID] = operation_ticket.id
body[Fields.SERVICE_INSTANCE_ID] = service_instance_id
body[Fields.RUN_ID] = run_id
if cmd:
thief[Fields.CMD] = cmd
body[Fields.CMD] = cmd
message = Message(json.dumps(payload))
message = Message(json.dumps(body))
message.content_type = Const.JSON_CONTENT_TYPE
message.content_encoding = Const.JSON_CONTENT_ENCODING
return collections.namedtuple("WrappedMessage", "message operation_ticket payload")(
message, operation_ticket, payload
return collections.namedtuple("WrappedMessage", "message operation_ticket body")(
message, operation_ticket, body
)
return wrapper_function
@ -60,18 +55,16 @@ def message_factory(run_id, service_instance_id, operation_ticket_factory): # n
def reported_props_factory(run_id, service_instance_id, random_dict_factory): # noqa: F811
def factory_function(operation_ticket):
return {
Fields.THIEF: {
Fields.RUN_ID: run_id,
Fields.SERVICE_INSTANCE_ID: service_instance_id,
Fields.TEST_CONTENT: {
Fields.REPORTED_PROPERTY_TEST: {
Fields.E2E_PROPERTY: {
Fields.ADD_OPERATION_ID: operation_ticket.id,
Fields.RANDOM_CONTENT: random_dict_factory(),
}
Fields.RUN_ID: run_id,
Fields.SERVICE_INSTANCE_ID: service_instance_id,
Fields.TEST_CONTENT: {
Fields.REPORTED_PROPERTY_TEST: {
Fields.E2E_PROPERTY: {
Fields.ADD_OPERATION_ID: operation_ticket.id,
Fields.RANDOM_CONTENT: random_dict_factory(),
}
},
}
}
},
}
return factory_function

Просмотреть файл

@ -16,10 +16,8 @@ def service_app(client, message_factory):
await client.send_message(
message_factory(
{
Fields.THIEF: {
Fields.CMD: Commands.SET_DESIRED_PROPS,
Fields.DESIRED_PROPERTIES: desired_props,
}
Fields.CMD: Commands.SET_DESIRED_PROPS,
Fields.DESIRED_PROPERTIES: desired_props,
}
).message
)
@ -28,18 +26,16 @@ def service_app(client, message_factory):
# invoke the method call
invoke = message_factory(
{
Fields.THIEF: {
Fields.CMD: Commands.INVOKE_METHOD,
Fields.METHOD_NAME: method_name,
Fields.METHOD_INVOKE_PAYLOAD: method_payload,
}
Fields.CMD: Commands.INVOKE_METHOD,
Fields.METHOD_NAME: method_name,
Fields.METHOD_INVOKE_PAYLOAD: method_payload,
}
)
await client.send_message(invoke.message)
# wait for the response to come back via the service API call
await invoke.operation_ticket.event.wait()
method_response = json.loads(invoke.operation_ticket.result_message.data)[Fields.THIEF]
method_response = json.loads(invoke.operation_ticket.result_message.data)
return method_response
@ -50,25 +46,21 @@ def make_desired_property_patch(component_name, property_name, property_value):
logger.info("Setting {} to {}".format(property_name, property_value))
if component_name:
return {
Fields.THIEF: {
Fields.CMD: Commands.UPDATE_PNP_PROPERTIES,
Fields.PNP_PROPERTIES_UPDATE_PATCH: [
{
"op": "add",
"path": "/{}".format(component_name),
"value": {property_name: property_value, "$metadata": {}},
}
],
}
Fields.CMD: Commands.UPDATE_PNP_PROPERTIES,
Fields.PNP_PROPERTIES_UPDATE_PATCH: [
{
"op": "add",
"path": "/{}".format(component_name),
"value": {property_name: property_value, "$metadata": {}},
}
],
}
else:
return {
Fields.THIEF: {
Fields.CMD: Commands.UPDATE_PNP_PROPERTIES,
Fields.PNP_PROPERTIES_UPDATE_PATCH: [
{"op": "add", "path": "/{}".format(property_name), "value": property_value}
],
}
Fields.CMD: Commands.UPDATE_PNP_PROPERTIES,
Fields.PNP_PROPERTIES_UPDATE_PATCH: [
{"op": "add", "path": "/{}".format(property_name), "value": property_value}
],
}
@ -78,19 +70,17 @@ def pnp_service_app(client, message_factory):
async def invoke_pnp_command(self, pnp_command_name, pnp_component_name, request_payload):
invoke = message_factory(
{
Fields.THIEF: {
Fields.CMD: Commands.INVOKE_PNP_COMMAND,
Fields.COMMAND_NAME: pnp_command_name,
Fields.COMMAND_COMPONENT_NAME: pnp_component_name,
Fields.COMMAND_INVOKE_PAYLOAD: request_payload,
}
Fields.CMD: Commands.INVOKE_PNP_COMMAND,
Fields.COMMAND_NAME: pnp_command_name,
Fields.COMMAND_COMPONENT_NAME: pnp_component_name,
Fields.COMMAND_INVOKE_PAYLOAD: request_payload,
}
)
await client.send_message(invoke.message)
# wait for the response to come back via the service API call
await invoke.operation_ticket.event.wait()
command_response = json.loads(invoke.operation_ticket.result_message.data)[Fields.THIEF]
command_response = json.loads(invoke.operation_ticket.result_message.data)
return command_response
@ -99,10 +89,8 @@ def pnp_service_app(client, message_factory):
await client.send_message(msg.message)
await msg.operation_ticket.event.wait()
return (
json.loads(msg.operation_ticket.result_message.data)
.get(Fields.THIEF, {})
.get(Fields.PNP_PROPERTIES_CONTENTS, {})
return json.loads(msg.operation_ticket.result_message.data).get(
Fields.PNP_PROPERTIES_CONTENTS, {}
)
async def update_pnp_properties(

Просмотреть файл

@ -101,6 +101,6 @@ class TestPnpCommands(object):
assert not actual_request.payload
# and make sure the response came back successfully
# There is currently no way to check the command response status code.
# Currently no way to check the command response status code.
# assert command_response[Fields.COMMAND_RESPONSE_STATUS_CODE] == command_response_status
assert command_response[Fields.COMMAND_RESPONSE_PAYLOAD] == response_payload

Просмотреть файл

@ -53,8 +53,8 @@ class TestPnpConnect(object):
await msg.operation_ticket.event.wait()
assert (
json.loads(msg.operation_ticket.result_message.data)[Fields.THIEF][
Fields.PNP_PROPERTIES_CONTENTS
]["$metadata"]["$model"]
json.loads(msg.operation_ticket.result_message.data)[Fields.PNP_PROPERTIES_CONTENTS][
"$metadata"
]["$model"]
== pnp_model_id
)

Просмотреть файл

@ -25,20 +25,18 @@ class TestPnpTelemetry(object):
async def test_send_pnp_telemetry(self, client, message_factory, pnp_model_id):
telemetry = message_factory(
{
Fields.THIEF: {
Fields.CMD: Commands.SEND_OPERATION_RESPONSE,
Fields.FLAGS: [Flags.RETURN_EVENTHUB_MESSAGE_CONTENTS],
},
}
Fields.CMD: Commands.SEND_OPERATION_RESPONSE,
Fields.FLAGS: [Flags.RETURN_EVENTHUB_MESSAGE_CONTENTS],
},
)
await client.send_telemetry(telemetry.payload)
await client.send_telemetry(telemetry.body)
await telemetry.operation_ticket.event.wait()
response = json.loads(telemetry.operation_ticket.result_message.data)
logger.info(pprint.pformat(response))
eventhub_message_contents = response[Fields.THIEF][Fields.EVENTHUB_MESSAGE_CONTENTS]
assert eventhub_message_contents[Fields.EVENTHUB_MESSAGE_BODY] == telemetry.payload
eventhub_message_contents = response[Fields.EVENTHUB_MESSAGE_CONTENTS]
assert eventhub_message_contents[Fields.EVENTHUB_MESSAGE_BODY] == telemetry.body
system_props = eventhub_message_contents[Fields.EVENTHUB_SYSTEM_PROPERTIES]
assert system_props[Fields.EVENTHUB_SYSPROP_DT_DATASCHEMA] == pnp_model_id

Просмотреть файл

@ -15,12 +15,7 @@ pytestmark = pytest.mark.asyncio
@pytest.fixture
def test_message(message_factory):
return message_factory(
{
Fields.THIEF: {
Fields.CMD: Commands.SEND_OPERATION_RESPONSE,
Fields.FLAGS: [Flags.RESPOND_IMMEDIATELY],
}
}
{Fields.CMD: Commands.SEND_OPERATION_RESPONSE, Fields.FLAGS: [Flags.RESPOND_IMMEDIATELY]}
)

Просмотреть файл

@ -15,14 +15,12 @@ pytestmark = pytest.mark.asyncio
def validate_patch_reported(twin, patch):
twin_prop = (
twin.get(Fields.REPORTED, {})
.get(Fields.THIEF, {})
.get(Fields.TEST_CONTENT, {})
.get(Fields.REPORTED_PROPERTY_TEST, {})
.get(Fields.E2E_PROPERTY, {})
)
patch_prop = (
patch.get(Fields.THIEF, {})
.get(Fields.TEST_CONTENT, {})
patch.get(Fields.TEST_CONTENT, {})
.get(Fields.REPORTED_PROPERTY_TEST, {})
.get(Fields.E2E_PROPERTY, {})
)
@ -34,11 +32,7 @@ def validate_patch_reported(twin, patch):
async def clean_reported_properties(client):
await client.patch_twin_reported_properties(
{
Fields.THIEF: {
Fields.TEST_CONTENT: {Fields.REPORTED_PROPERTY_TEST: {Fields.E2E_PROPERTY: None}}
}
}
{Fields.TEST_CONTENT: {Fields.REPORTED_PROPERTY_TEST: {Fields.E2E_PROPERTY: None}}}
)
@ -141,18 +135,15 @@ class TestDesiredProperties(object):
client.on_twin_desired_properties_patch_received = handle_on_patch_received
await service_app.set_desired_props({Fields.THIEF: {Fields.RANDOM_CONTENT: random_dict}})
await service_app.set_desired_props({Fields.RANDOM_CONTENT: random_dict})
await asyncio.wait_for(received.wait(), 10)
logger.info("got it")
assert received_patch.get(Fields.THIEF, {}).get(Fields.RANDOM_CONTENT, {}) == random_dict
assert received_patch.get(Fields.RANDOM_CONTENT, {}) == random_dict
twin = await client.get_twin()
assert (
twin.get(Fields.DESIRED, {}).get(Fields.THIEF, {}).get(Fields.RANDOM_CONTENT, {})
== random_dict
)
assert twin.get(Fields.DESIRED, {}).get(Fields.RANDOM_CONTENT, {}) == random_dict
# TODO: etag tests, version tests

Просмотреть файл

@ -189,9 +189,8 @@ class ServiceApp(object):
def handle_method_invoke(self, device_data, event):
body = event.body_as_json()
thief = body.get(Fields.THIEF, {})
method_guid = thief.get(Fields.OPERATION_ID)
method_name = thief.get(Fields.METHOD_NAME)
method_guid = body.get(Fields.OPERATION_ID)
method_name = body.get(Fields.METHOD_NAME)
logger.info(
"received method invoke method={}, guid={}".format(method_name, method_guid),
@ -200,11 +199,11 @@ class ServiceApp(object):
request = CloudToDeviceMethod(
method_name=method_name,
payload=thief.get(Fields.METHOD_INVOKE_PAYLOAD, None),
response_timeout_in_seconds=thief.get(
payload=body.get(Fields.METHOD_INVOKE_PAYLOAD, None),
response_timeout_in_seconds=body.get(
Fields.METHOD_INVOKE_RESPONSE_TIMEOUT_IN_SECONDS, None
),
connect_timeout_in_seconds=thief.get(
connect_timeout_in_seconds=body.get(
Fields.METHOD_INVOKE_CONNECT_TIMEOUT_IN_SECONDS, None
),
)
@ -228,14 +227,12 @@ class ServiceApp(object):
response_message = json.dumps(
{
Fields.THIEF: {
Fields.CMD: Commands.METHOD_RESPONSE,
Fields.SERVICE_INSTANCE_ID: service_instance_id,
Fields.RUN_ID: device_data.run_id,
Fields.OPERATION_ID: method_guid,
Fields.METHOD_RESPONSE_PAYLOAD: response.payload,
Fields.METHOD_RESPONSE_STATUS_CODE: response.status,
}
Fields.CMD: Commands.METHOD_RESPONSE,
Fields.SERVICE_INSTANCE_ID: service_instance_id,
Fields.RUN_ID: device_data.run_id,
Fields.OPERATION_ID: method_guid,
Fields.METHOD_RESPONSE_PAYLOAD: response.payload,
Fields.METHOD_RESPONSE_STATUS_CODE: response.status,
}
)
@ -254,15 +251,14 @@ class ServiceApp(object):
def handle_pnp_command_invoke(self, device_data, event):
body = event.body_as_json()
thief = body.get(Fields.THIEF, {})
command_guid = thief.get(Fields.OPERATION_ID)
command_name = thief.get(Fields.COMMAND_NAME)
command_component_name = thief.get(Fields.COMMAND_COMPONENT_NAME, None)
payload = thief.get(Fields.COMMAND_INVOKE_PAYLOAD, None)
response_timeout_in_seconds = thief.get(
command_guid = body.get(Fields.OPERATION_ID)
command_name = body.get(Fields.COMMAND_NAME)
command_component_name = body.get(Fields.COMMAND_COMPONENT_NAME, None)
payload = body.get(Fields.COMMAND_INVOKE_PAYLOAD, None)
response_timeout_in_seconds = body.get(
Fields.COMMAND_INVOKE_RESPONSE_TIMEOUT_IN_SECONDS, None
)
connect_timeout_in_seconds = thief.get(
connect_timeout_in_seconds = body.get(
Fields.COMMAND_INVOKE_CONNECT_TIMEOUT_IN_SECONDS, None
)
@ -308,13 +304,11 @@ class ServiceApp(object):
response_message = json.dumps(
{
Fields.THIEF: {
Fields.CMD: Commands.OPERATION_RESPONSE,
Fields.SERVICE_INSTANCE_ID: service_instance_id,
Fields.RUN_ID: device_data.run_id,
Fields.OPERATION_ID: command_guid,
Fields.COMMAND_RESPONSE_PAYLOAD: response,
}
Fields.CMD: Commands.OPERATION_RESPONSE,
Fields.SERVICE_INSTANCE_ID: service_instance_id,
Fields.RUN_ID: device_data.run_id,
Fields.OPERATION_ID: command_guid,
Fields.COMMAND_RESPONSE_PAYLOAD: response,
}
)
@ -331,12 +325,12 @@ class ServiceApp(object):
)
)
def update_pairing(self, device_id, thief):
def update_pairing(self, device_id, body):
# Handle the case where a device is looking for someone to pair with
if thief.get(Fields.CMD, "") == Commands.PAIR_WITH_SERVICE_APP:
if thief.get(Fields.REQUESTED_SERVICE_POOL, None) == service_pool:
received_operation_id = thief.get(Fields.OPERATION_ID)
received_run_id = thief.get(Fields.RUN_ID)
if body.get(Fields.CMD, "") == Commands.PAIR_WITH_SERVICE_APP:
if body.get(Fields.REQUESTED_SERVICE_POOL, None) == service_pool:
received_operation_id = body.get(Fields.OPERATION_ID)
received_run_id = body.get(Fields.RUN_ID)
logger.info(
"Device {} attempting to pair with operation ID {}".format(
@ -347,12 +341,10 @@ class ServiceApp(object):
message = json.dumps(
{
Fields.THIEF: {
Fields.CMD: Commands.PAIR_RESPONSE,
Fields.SERVICE_INSTANCE_ID: service_instance_id,
Fields.RUN_ID: received_run_id,
Fields.OPERATION_ID: received_operation_id,
}
Fields.CMD: Commands.PAIR_RESPONSE,
Fields.SERVICE_INSTANCE_ID: service_instance_id,
Fields.RUN_ID: received_run_id,
Fields.OPERATION_ID: received_operation_id,
}
)
@ -365,9 +357,9 @@ class ServiceApp(object):
# don't add device_id to self.device_list until we get a message with our
# service_instance_id
elif Fields.RUN_ID in thief and Fields.SERVICE_INSTANCE_ID in thief:
received_service_instance_id = thief.get(Fields.SERVICE_INSTANCE_ID, None)
received_run_id = thief[Fields.RUN_ID]
elif Fields.RUN_ID in body and Fields.SERVICE_INSTANCE_ID in body:
received_service_instance_id = body.get(Fields.SERVICE_INSTANCE_ID, None)
received_run_id = body[Fields.RUN_ID]
device_data = self.device_list.try_get(device_id)
@ -408,15 +400,10 @@ class ServiceApp(object):
body = event.body_as_json()
# added `or {}` because thief might be `None`
if get_message_source_from_event(event) == "twinChangeEvents":
thief = (
body.get(Fields.PROPERTIES, {}).get(Fields.REPORTED, {}).get(Fields.THIEF, {}) or {}
)
else:
thief = body.get(Fields.THIEF, {}) or {}
body = body.get(Fields.PROPERTIES, {}).get(Fields.REPORTED, {})
self.update_pairing(device_id, thief)
self.update_pairing(device_id, body)
device_data = self.device_list.try_get(device_id)
if not device_data:
@ -425,9 +412,9 @@ class ServiceApp(object):
if get_message_source_from_event(event) == "twinChangeEvents":
self.incoming_twin_changes.put(event)
else:
cmd = thief.get(Fields.CMD, None)
received_operation_id = thief.get(Fields.OPERATION_ID, None)
received_run_id = thief.get(Fields.RUN_ID, None)
cmd = body.get(Fields.CMD, None)
received_operation_id = body.get(Fields.OPERATION_ID, None)
received_run_id = body.get(Fields.RUN_ID, None)
if cmd == Commands.PAIR_WITH_SERVICE_APP:
# handled in the update_pairing() function above
@ -439,26 +426,24 @@ class ServiceApp(object):
),
extra=custom_props(device_id, device_data.run_id),
)
if Flags.RETURN_EVENTHUB_MESSAGE_CONTENTS in thief.get(Fields.FLAGS, []):
if Flags.RETURN_EVENTHUB_MESSAGE_CONTENTS in body.get(Fields.FLAGS, []):
payload = {
Fields.THIEF: {
Fields.CMD: Commands.OPERATION_RESPONSE,
Fields.SERVICE_INSTANCE_ID: service_instance_id,
Fields.RUN_ID: received_run_id,
Fields.OPERATION_ID: received_operation_id,
Fields.EVENTHUB_MESSAGE_CONTENTS: {
Fields.EVENTHUB_MESSAGE_BODY: body,
Fields.EVENTHUB_CONTENT_TYPE: event.content_type,
Fields.EVENTHUB_CORRELATION_ID: event.correlation_id,
Fields.EVENTHUB_MESSAGE_ID: event.message_id,
Fields.EVENTHUB_SYSTEM_PROPERTIES: convert_binary_dict_to_string_dict(
event.system_properties
),
Fields.EVENTHUB_PROPERTIES: convert_binary_dict_to_string_dict(
event.properties
),
},
}
Fields.CMD: Commands.OPERATION_RESPONSE,
Fields.SERVICE_INSTANCE_ID: service_instance_id,
Fields.RUN_ID: received_run_id,
Fields.OPERATION_ID: received_operation_id,
Fields.EVENTHUB_MESSAGE_CONTENTS: {
Fields.EVENTHUB_MESSAGE_BODY: body,
Fields.EVENTHUB_CONTENT_TYPE: event.content_type,
Fields.EVENTHUB_CORRELATION_ID: event.correlation_id,
Fields.EVENTHUB_MESSAGE_ID: event.message_id,
Fields.EVENTHUB_SYSTEM_PROPERTIES: convert_binary_dict_to_string_dict(
event.system_properties
),
Fields.EVENTHUB_PROPERTIES: convert_binary_dict_to_string_dict(
event.properties
),
},
}
message = json.dumps(payload)
@ -475,11 +460,11 @@ class ServiceApp(object):
OperationResponse(device_id=device_id, operation_id=received_operation_id,)
)
if Flags.RESPOND_IMMEDIATELY in thief.get(Fields.FLAGS, []):
if Flags.RESPOND_IMMEDIATELY in body.get(Fields.FLAGS, []):
self.force_send_operation_response.set()
elif cmd == Commands.SET_DESIRED_PROPS:
desired = thief.get(Fields.DESIRED_PROPERTIES, {})
desired = body.get(Fields.DESIRED_PROPERTIES, {})
if desired:
logger.info("Updating desired props: {}".format(desired))
self.registry_manager.update_twin(
@ -506,13 +491,11 @@ class ServiceApp(object):
message = json.dumps(
{
Fields.THIEF: {
Fields.CMD: Commands.OPERATION_RESPONSE,
Fields.SERVICE_INSTANCE_ID: service_instance_id,
Fields.RUN_ID: received_run_id,
Fields.OPERATION_ID: received_operation_id,
Fields.PNP_PROPERTIES_CONTENTS: twin,
}
Fields.CMD: Commands.OPERATION_RESPONSE,
Fields.SERVICE_INSTANCE_ID: service_instance_id,
Fields.RUN_ID: received_run_id,
Fields.OPERATION_ID: received_operation_id,
Fields.PNP_PROPERTIES_CONTENTS: twin,
}
)
@ -531,7 +514,7 @@ class ServiceApp(object):
)
self.digital_twin_client.update_digital_twin(
device_id, thief[Fields.PNP_PROPERTIES_UPDATE_PATCH]
device_id, body[Fields.PNP_PROPERTIES_UPDATE_PATCH]
)
# TODO: send ack for all of these ops, include error if failure
@ -545,13 +528,11 @@ class ServiceApp(object):
)
message = json.dumps(
{
Fields.THIEF: {
Fields.CMD: Commands.C2D_RESPONSE,
Fields.SERVICE_INSTANCE_ID: service_instance_id,
Fields.RUN_ID: received_run_id,
Fields.OPERATION_ID: received_operation_id,
Fields.TEST_C2D_PAYLOAD: thief[Fields.TEST_C2D_PAYLOAD],
}
Fields.CMD: Commands.C2D_RESPONSE,
Fields.SERVICE_INSTANCE_ID: service_instance_id,
Fields.RUN_ID: received_run_id,
Fields.OPERATION_ID: received_operation_id,
Fields.TEST_C2D_PAYLOAD: body[Fields.TEST_C2D_PAYLOAD],
}
)
@ -685,12 +666,10 @@ class ServiceApp(object):
message = json.dumps(
{
Fields.THIEF: {
Fields.CMD: Commands.OPERATION_RESPONSE,
Fields.SERVICE_INSTANCE_ID: service_instance_id,
Fields.RUN_ID: device_data.run_id,
Fields.OPERATION_IDS: operation_ids[device_id],
}
Fields.CMD: Commands.OPERATION_RESPONSE,
Fields.SERVICE_INSTANCE_ID: service_instance_id,
Fields.RUN_ID: device_data.run_id,
Fields.OPERATION_IDS: operation_ids[device_id],
}
)
@ -723,7 +702,6 @@ class ServiceApp(object):
event.body_as_json()
.get(Fields.PROPERTIES, {})
.get(Fields.REPORTED, {})
.get(Fields.THIEF, {})
.get(Fields.TEST_CONTENT, {})
)
reported_property_test = test_content.get(Fields.REPORTED_PROPERTY_TEST)
@ -770,30 +748,23 @@ class ServiceApp(object):
continue
device_id = get_device_id_from_event(event)
thief = (
event.body_as_json()
.get(Fields.PROPERTIES, {})
.get(Fields.REPORTED, {})
.get(Fields.THIEF, {})
)
if not thief:
thief = {}
body = event.body_as_json().get(Fields.PROPERTIES, {}).get(Fields.REPORTED, {})
device_data = self.device_list.try_get(device_id)
if device_data:
run_id = device_data.run_id
else:
run_id = thief.get(Fields.PAIRING, {}).get(Fields.RUN_ID, None)
run_id = body.get(Fields.PAIRING, {}).get(Fields.RUN_ID, None)
logger.info(
"Twin change for {}: {}".format(device_id, event.body_as_json()),
extra=custom_props(device_id, run_id),
)
if thief.get(Fields.TEST_CONTENT):
if body.get(Fields.TEST_CONTENT):
self.respond_to_test_content_properties(event)
run_state = thief.get(Fields.SESSION_METRICS, {}).get("runState")
run_state = body.get(Fields.SESSION_METRICS, {}).get("runState")
if run_state and run_state != RunStates.RUNNING:
logger.info(
"Device {} no longer running.".format(device_id),

Просмотреть файл

@ -19,13 +19,9 @@ while True:
os.system("cls" if os.name == "nt" else "clear -x")
twin = registry_manager.get_twin(thief_secrets.DEVICE_ID)
json_object = {
Fields.SESSION_METRICS: twin.properties.reported.get(Fields.THIEF, {}).get(
Fields.SESSION_METRICS, None
),
Fields.TEST_METRICS: twin.properties.reported.get(Fields.THIEF, {}).get(
Fields.TEST_METRICS, None
),
Fields.SYSTEM_HEALTH_METRICS: twin.properties.reported.get(Fields.THIEF, {}).get(
Fields.SESSION_METRICS: twin.properties.reported.get(Fields.SESSION_METRICS, None),
Fields.TEST_METRICS: twin.properties.reported.get(Fields.TEST_METRICS, None),
Fields.SYSTEM_HEALTH_METRICS: twin.properties.reported.get(
Fields.SYSTEM_HEALTH_METRICS, None
),
}