зеркало из
1
0
Форкнуть 0

fix transport and client for e2e test (#21)

* fix transport and client for e2e test
* fixup after rebase
This commit is contained in:
Bert Kleewein 2018-11-21 13:44:47 -08:00 коммит произвёл GitHub
Родитель 014666fed4
Коммит 899ad25c83
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
7 изменённых файлов: 117 добавлений и 36 удалений

Просмотреть файл

@ -47,6 +47,7 @@ class SymmetricKeyAuthenticationProvider(BaseRenewableTokenAuthenticationProvide
module_id,
shared_access_key,
shared_access_key_name=None,
gateway_hostname=None
):
"""
@ -61,6 +62,8 @@ class SymmetricKeyAuthenticationProvider(BaseRenewableTokenAuthenticationProvide
)
self.shared_access_key = shared_access_key
self.shared_access_key_name = shared_access_key_name
self.gateway_hostname = gateway_hostname
self.ca_cert = None
@staticmethod
def parse(connection_string):
@ -88,6 +91,7 @@ class SymmetricKeyAuthenticationProvider(BaseRenewableTokenAuthenticationProvide
d.get(MODULE_ID),
d.get(SHARED_ACCESS_KEY),
d.get(SHARED_ACCESS_KEY_NAME),
d.get(GATEWAY_HOST_NAME),
)
def _sign(self, quoted_resource_uri, expiry):

Просмотреть файл

@ -27,6 +27,7 @@ class InternalClient(object):
self.state = "initial"
self.on_connection_state = None
self.on_event_sent = None
def connect(self):
"""Connects the client to an Azure IoT Hub.
@ -34,8 +35,17 @@ class InternalClient(object):
"""
logger.info("connecting to transport")
self._transport.on_transport_connected = self._handle_transport_connected_state
self._transport.on_transport_disconnected = self._handle_transport_connected_state
self._transport.on_event_sent = self._handle_transport_event_sent
self._transport.connect()
def disconnect(self):
"""
Disconnect the client from the Azure IoT Hub or Azure IoT Edge Hub
"""
logger.info("disconnecting from transport")
self._transport.disconnect()
def send_event(self, event):
"""
Sends a message to the default events endpoint on the Azure IoT Hub or Edge Hub instance via a message broker.
@ -48,7 +58,7 @@ class InternalClient(object):
"""
The connection status is emitted whenever the client on the module gets connected or disconnected.
"""
logger.info("emit_connection_status")
logger.info("emit_connection_status: {}".format(self.state))
if self.on_connection_state:
self.on_connection_state(self.state)
else:
@ -58,6 +68,11 @@ class InternalClient(object):
self.state = new_state
self._emit_connection_status()
def _handle_transport_event_sent(self):
logger.info("_handle_transport_event_sent: " + str(self.on_event_sent))
if self.on_event_sent:
self.on_event_sent()
@classmethod
def from_authentication_provider(cls, authentication_provider, transport_name):
"""Creates a device client with the specified authentication provider and transport protocol"""

Просмотреть файл

@ -54,7 +54,7 @@ class MQTTProvider(object):
def on_disconnect_callback(client, userdata, result_code):
logger.info("disconnected with result code: %s", str(result_code))
self.on_mqtt_disconnected()
self.on_mqtt_disconnected("disconnected")
def on_publish_callback(client, userdata, mid):
logger.info("payload published for %s", str(mid))

Просмотреть файл

@ -26,6 +26,8 @@ class MQTTTransport(AbstractTransport):
AbstractTransport.__init__(self, auth_provider)
self._mqtt_provider = None
self.on_transport_connected = None
self.on_transport_disconnected = None
self.on_event_sent = None
self._event_queue = queue.LifoQueue()
states = ["disconnected", "connecting", "connected", "sending", "disconnecting"]
@ -46,7 +48,7 @@ class MQTTTransport(AbstractTransport):
"trigger": "_trig_provider_connect_complete",
"source": "connecting",
"dest": "connected",
"after": "_after_action_deliver_next_queued_event",
"after": "_trig_check_send_event_queue",
},
{"trigger": "_trig_disconnect", "source": "disconnected", "dest": None},
{
@ -65,20 +67,14 @@ class MQTTTransport(AbstractTransport):
"source": "connected",
"before": "_before_action_add_event_to_queue",
"dest": "sending",
"after": "_after_action_deliver_next_queued_event",
"after": "_trig_check_send_event_queue",
},
{
"trigger": "_trig_provider_publish_complete",
"source": "sending",
"dest": None,
"unless": "_queue_is_empty",
"after": "_after_action_deliver_next_queued_event",
},
{
"trigger": "_trig_provider_publish_complete",
"source": "sending",
"dest": "connected",
"conditions": "_queue_is_empty",
"before": "_before_action_notify_publish_complete",
"after": "_trig_check_send_event_queue"
},
{
"trigger": "_trig_send_event",
@ -93,6 +89,19 @@ class MQTTTransport(AbstractTransport):
"dest": "connecting",
"after": "_after_action_provider_connect",
},
{
"trigger": "_trig_check_send_event_queue",
"source": [ "connected", "sending" ],
"dest": "connected",
"conditions": "_queue_is_empty",
},
{
"trigger": "_trig_check_send_event_queue",
"source": [ "connected", "sending" ],
"dest": "sending",
"unless": "_queue_is_empty",
"after": "_after_action_deliver_next_queued_event",
},
]
def _on_transition_complete(event):
@ -116,7 +125,6 @@ class MQTTTransport(AbstractTransport):
initial="disconnected",
send_event=True,
finalize_event=_on_transition_complete,
after_state_change=self._after_state_change,
)
# to render the state machine as a PNG:
@ -129,25 +137,19 @@ class MQTTTransport(AbstractTransport):
self._create_mqtt_provider()
def _after_state_change(self, event):
"""
Callback that happens after all state changes. Since there are many different
ways to get to the connected state, this is where we call our "on_connected" callback.
(This would be better done inside a handler attached to the _provider_connect_complete event,
but no such thing exists).
"""
logger.info(
"after state change: trigger=%s, dest=%s, error=%s",
event.event.name,
event.state.name,
str(event.error),
)
if (
(not event.error)
and (event.event.name == "_trig_provider_connect_complete")
and self.on_transport_connected
):
self.on_transport_connected(event.state.name)
def on_enter_connected(self, event):
if event.event.name == "_trig_provider_connect_complete":
self.on_transport_connected("connected")
def on_enter_disconnected(self, event):
if event.event.name == "_trig_provider_disconnect_complete":
self.on_transport_disconnected("disconnected")
def _before_action_notify_publish_complete(self, event):
logger.info("publish complete:" + str(event))
logger.info("publish error:" + str(event.error));
if not event.error:
self.on_event_sent()
def _after_action_provider_connect(self, event):
"""
@ -173,7 +175,7 @@ class MQTTTransport(AbstractTransport):
def _queue_is_empty(self, event):
"""
Return true if the sending queue is empty.
Return true if the sending queue is empty.
This is meant to be called by the state machine as a "conditions" or "unless" check
"""
return self._event_queue.empty()
@ -189,7 +191,6 @@ class MQTTTransport(AbstractTransport):
self._mqtt_provider.publish(topic, event_to_send)
except queue.Empty:
logger.warning("UNEXPECTED: queue is empty in _after_action_deliver_next_queued_event")
pass
def _create_mqtt_provider(self):
client_id = self._auth_provider.device_id
@ -199,9 +200,10 @@ class MQTTTransport(AbstractTransport):
username = self._auth_provider.hostname + "/" + client_id + "/" + "?api-version=2018-06-30"
hostname = None
if hasattr(self._auth_provider, "gateway_hostname"):
hostname = self._auth_provider.gateway_hostname
else:
if not hostname or len(hostname) == 0:
hostname = self._auth_provider.hostname
if hasattr(self._auth_provider, "ca_cert"):

Просмотреть файл

@ -58,6 +58,21 @@ def test_connected_state_handler_called_wth_new_state_once_transport_gets_connec
assert client.state == "connected"
stub_on_connection_state.assert_called_once_with("connected")
def test_connected_state_handler_called_wth_new_state_once_transport_gets_connected(mocker, authentication_provider, mock_transport):
client = InternalClient(authentication_provider, mock_transport)
stub_on_connection_state = mocker.stub(name="on_connection_state")
client.on_connection_state = stub_on_connection_state
client.connect()
mock_transport.on_transport_connected("connected")
stub_on_connection_state.reset_mock()
client.disconnect()
mock_transport.on_transport_disconnected("disconnected")
assert client.state == "disconnected"
stub_on_connection_state.assert_called_once_with("disconnected")
def test_internal_client_send_event_in_turn_calls_transport_send_event(authentication_provider, mock_transport):

Просмотреть файл

@ -52,7 +52,7 @@ def test_connect_triggers_client_connect(MockMqttClient, MockSsl):
@patch.object(mqtt, "Client")
@pytest.mark.parametrize("client_callback_name, client_callback_args, provider_callback_name, provider_callback_args", [
("on_connect", [None, None, None, 0], "on_mqtt_connected", ["connected"]),
("on_disconnect", [None, None, 0], "on_mqtt_disconnected", []),
("on_disconnect", [None, None, 0], "on_mqtt_disconnected", ["disconnected"]),
("on_publish", [None, None, 0], "on_mqtt_published", []),
("on_subscribe", [None, None, 0], "on_mqtt_subscribed", [])
])

Просмотреть файл

@ -37,6 +37,8 @@ def transport(authentication_provider):
with mock.patch("azure.iot.hub.devicesdk.transport.mqtt.mqtt_transport.MQTTProvider"):
transport = MQTTTransport(authentication_provider)
transport.on_transport_connected = MagicMock()
transport.on_transport_disconnected = MagicMock()
transport.on_event_sent = MagicMock()
return transport
@ -165,6 +167,37 @@ class TestSendEvent():
# and verify that we're sending the second event
mock_mqtt_provider.publish.assert_called_once_with(fake_topic, fake_event_2)
def test_puback_calls_client_callback(self, transport):
mock_mqtt_provider = transport._mqtt_provider
# connect
transport.connect()
mock_mqtt_provider.on_mqtt_connected()
# send an event
transport.send_event(fake_event)
# fake the puback:
transport._trig_provider_publish_complete()
# assert
transport.on_event_sent.assert_called_once_with()
def test_connect_send_disconnect(self, transport):
mock_mqtt_provider = transport._mqtt_provider
# connect
transport.connect()
mock_mqtt_provider.on_mqtt_connected()
# send an event
transport.send_event(fake_event)
transport._trig_provider_publish_complete()
# disconnect
transport.disconnect()
mock_mqtt_provider.disconnect.assert_called_once_with()
class TestDisconnect():
def test_disconnect_calls_disconnect_on_provider(self, transport):
mock_mqtt_provider = transport._mqtt_provider
@ -182,3 +215,15 @@ class TestDisconnect():
mock_mqtt_provider.disconnect.assert_not_called()
def test_disconnect_calls_client_disconnect_callback(self, transport):
mock_mqtt_provider = transport._mqtt_provider
transport.connect()
transport._trig_provider_connect_complete()
transport.disconnect()
transport._trig_provider_disconnect_complete()
transport.on_transport_disconnected.assert_called_once_with("disconnected")