зеркало из
1
0
Форкнуть 0

fix (azure-iot-device) Add MQTT_ERR_KEEPALIVE mapping to fix connection issues caused by new Paho error code (#981)

* fix (azure-iot-device) Add MQTT_ERR_KEEPALIVE mapping to fix connection issues caused by new Paho error code

* Add failure documentation to mqtt_transport.py and handle disconnect failures in the connection watchdog

* add tests for disconnect exception in watchdog handler
This commit is contained in:
Bert Kleewein 2022-03-30 13:36:39 -07:00 коммит произвёл GitHub
Родитель 2891227e1b
Коммит 5021c0cb32
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
5 изменённых файлов: 80 добавлений и 9 удалений

Просмотреть файл

@ -44,6 +44,7 @@ paho_rc_to_error = {
mqtt.MQTT_ERR_UNKNOWN: exceptions.ProtocolClientError,
mqtt.MQTT_ERR_ERRNO: exceptions.ProtocolClientError,
mqtt.MQTT_ERR_QUEUE_SIZE: exceptions.ProtocolClientError,
mqtt.MQTT_ERR_KEEPALIVE: exceptions.ConnectionDroppedError,
}
@ -69,7 +70,7 @@ def _create_error_from_rc_code(rc):
message = mqtt.error_string(rc)
return paho_rc_to_error[rc](message)
else:
return exceptions.ProtocolClientError("Unknown CONNACK rc=={}".format(rc))
return exceptions.ProtocolClientError("Unknown rc=={}".format(rc))
class MQTTTransport(object):
@ -373,7 +374,10 @@ class MQTTTransport(object):
:raises: ConnectionFailedError if connection could not be established.
:raises: ConnectionDroppedError if connection is dropped during execution.
:raises: UnauthorizedError if there is an error authenticating.
:raises: NoConnectionError in certain failure scenarios where a connection could not be established
:raises: ProtocolClientError if there is some other client error.
:raises: TlsExchangeAuthError if there a filure with TLS certificate exchange
:raises: ProtocolProxyError if there is a proxy-specific error
"""
logger.debug("connecting to mqtt broker")
@ -427,6 +431,10 @@ class MQTTTransport(object):
Disconnect from the MQTT broker.
:raises: ProtocolClientError if there is some client error.
:raises: ConnectionDroppedError in unexpected cases.
:raises: UnauthorizedError in unexpected cases.
:raises: ConnectionFailedError in unexpected cases.
:raises: NoConnectionError if the client isn't actually conected.
"""
logger.info("disconnecting MQTT client")
try:
@ -467,6 +475,7 @@ class MQTTTransport(object):
:raises: ValueError if topic is None or has zero string length.
:raises: ConnectionDroppedError if connection is dropped during execution.
:raises: ProtocolClientError if there is some other client error.
:raises: NoConnectionError if the client isn't actually conected.
"""
logger.info("subscribing to {} with qos {}".format(topic, qos))
try:
@ -491,6 +500,7 @@ class MQTTTransport(object):
:raises: ValueError if topic is None or has zero string length.
:raises: ConnectionDroppedError if connection is dropped during execution.
:raises: ProtocolClientError if there is some other client error.
:raises: NoConnectionError if the client isn't actually conected.
"""
logger.info("unsubscribing from {}".format(topic))
try:
@ -524,6 +534,7 @@ class MQTTTransport(object):
:raises: TypeError if payload is not a valid type
:raises: ConnectionDroppedError if connection is dropped during execution.
:raises: ProtocolClientError if there is some other client error.
:raises: NoConnectionError if the client isn't actually conected.
"""
logger.info("publishing on {}".format(topic))
try:

Просмотреть файл

@ -64,6 +64,12 @@ class MQTTTransportStage(PipelineStage):
@pipeline_thread.runs_on_pipeline_thread
def _start_connection_watchdog(self, connection_op):
"""
Start a watchdog on the conection operation. This protects against cases where transport.connect()
succeeds but the CONNACK never arrives. This is like a timeout, but it is handled at this level
because specific cleanup needs to take place on timeout (see below), and this cleanup doesn't
belong anywhere else since it is very specific to this stage.
"""
logger.debug("{}({}): Starting watchdog".format(self.name, connection_op.name))
self_weakref = weakref.ref(self)
@ -77,7 +83,17 @@ class MQTTTransportStage(PipelineStage):
logger.info(
"{}({}): Connection watchdog expired. Cancelling op".format(this.name, op.name)
)
this.transport.disconnect()
try:
this.transport.disconnect()
except Exception:
# If we don't catch this, the pending connection op might not ever be cancelled.
# Most likely, the transport isn't actually connected, but other failures are theoreticaly
# possible. Either way, if disconnect fails, we should assume that we're disconencted.
logger.info(
"transport.disconnect raised error while disconnecting in watchdog. Safe to ignore."
)
logger.info(traceback.format_exc())
if this.pipeline_root.connected:
logger.info(
"{}({}): Pipeline is still connected on watchdog expiration. Sending DisconnectedEvent".format(

Просмотреть файл

@ -80,7 +80,7 @@ setup(
"urllib3>=1.26.5,<1.27",
# Actual project dependencies
"deprecation>=2.1.0,<3.0.0",
"paho-mqtt>=1.4.0,<2.0.0",
"paho-mqtt>=1.6.1,<2.0.0",
"requests>=2.20.0,<3.0.0",
"requests-unixsocket>=0.1.5,<1.0.0",
"janus",

Просмотреть файл

@ -25,6 +25,8 @@ this_module = sys.modules[__name__]
logging.basicConfig(level=logging.DEBUG)
pytestmark = pytest.mark.usefixtures("fake_pipeline_thread")
logging.getLogger("azure.iot.device.common").setLevel(level=logging.DEBUG)
###################
# COMMON FIXTURES #
###################
@ -1214,6 +1216,15 @@ class TestMQTTTransportStageOnDisconnectedUnexpectedNoPendingConnectionOp(
assert background_exception.__cause__ is cause
disconnect_can_raise = [
"disconnect_raises",
[
pytest.param(True, id="mqtt_transport.disconnect raises an exception"),
pytest.param(False, id="mqtt_transport.disconnect does not raises an exception"),
],
]
@pytest.mark.describe("MQTTTransportStage - OCCURRENCE: Connection watchdog expired")
class TestMQTTTransportStageWatchdogExpired(MQTTTransportStageTestConfigComplex):
@pytest.fixture(params=[pipeline_ops_base.ConnectOperation], ids=["Pending ConnectOperation"])
@ -1245,10 +1256,16 @@ class TestMQTTTransportStageWatchdogExpired(MQTTTransportStageTestConfigComplex)
assert stage.transport.disconnect.call_count == 0
@pytest.mark.parametrize(*disconnect_can_raise)
@pytest.mark.it(
"Completes the op that started the watchdog with an OperationTimeout exception if that op is still pending"
)
def test_completes_with_operation_cancelled(self, mocker, stage, pending_op, mock_timer):
def test_completes_with_operation_cancelled(
self, mocker, stage, pending_op, mock_timer, disconnect_raises, arbitrary_exception
):
if disconnect_raises:
stage.transport.disconnect = mocker.MagicMock(side_effect=arbitrary_exception)
callback = pending_op.callback_stack[0]
stage.run_op(pending_op)
@ -1259,10 +1276,16 @@ class TestMQTTTransportStageWatchdogExpired(MQTTTransportStageTestConfigComplex)
assert callback.call_count == 1
assert isinstance(callback.call_args[1]["error"], pipeline_exceptions.OperationTimeout)
@pytest.mark.parametrize(*disconnect_can_raise)
@pytest.mark.it(
"Does not complete the op that started the watchdog with an OperationCancelled error if that op is no longer pending"
)
def test_does_not_complete_op_if_no_longer_pending(self, mocker, stage, pending_op, mock_timer):
def test_does_not_complete_op_if_no_longer_pending(
self, mocker, stage, pending_op, mock_timer, disconnect_raises, arbitrary_exception
):
if disconnect_raises:
stage.transport.disconnect = mocker.MagicMock(side_effect=arbitrary_exception)
callback = pending_op.callback_stack[0]
stage.run_op(pending_op)
@ -1273,12 +1296,16 @@ class TestMQTTTransportStageWatchdogExpired(MQTTTransportStageTestConfigComplex)
assert callback.call_count == 0
@pytest.mark.parametrize(*disconnect_can_raise)
@pytest.mark.it(
"Sends a DisconnectedEvent if the op that started the watchdog is still pending and the pipeline_root connected flag is True"
)
def test_sends_disconnected_event_if_still_pendin_and_connected(
self, mocker, stage, pending_op, mock_timer
self, mocker, stage, pending_op, mock_timer, disconnect_raises, arbitrary_exception
):
if disconnect_raises:
stage.transport.disconnect = mocker.MagicMock(side_effect=arbitrary_exception)
stage.pipeline_root.connected = True
stage.run_op(pending_op)
@ -1290,12 +1317,16 @@ class TestMQTTTransportStageWatchdogExpired(MQTTTransportStageTestConfigComplex)
stage.send_event_up.call_args[0][0], pipeline_events_base.DisconnectedEvent
)
@pytest.mark.parametrize(*disconnect_can_raise)
@pytest.mark.it(
"Does not send a DisconnectedEvent if the op that started the watchdog is still pending and the pipeline_root connected flag is False"
)
def test_does_not_send_disconnected_event_if_still_pending_and_not_connected(
self, mocker, stage, pending_op, mock_timer
self, mocker, stage, pending_op, mock_timer, disconnect_raises, arbitrary_exception
):
if disconnect_raises:
stage.transport.disconnect = mocker.MagicMock(side_effect=arbitrary_exception)
stage.pipeline_root.connected = False
stage.run_op(pending_op)
@ -1304,12 +1335,16 @@ class TestMQTTTransportStageWatchdogExpired(MQTTTransportStageTestConfigComplex)
assert stage.send_event_up.call_count == 0
@pytest.mark.parametrize(*disconnect_can_raise)
@pytest.mark.it(
"Does not send a DisconnectedEvent if the op that started the watchdog is no longer pending and the pipeline connected flag is True"
)
def test_does_not_send_disconnected_event_if_no_longer_pending_and_connected(
self, mocker, stage, pending_op, mock_timer
self, mocker, stage, pending_op, mock_timer, disconnect_raises, arbitrary_exception
):
if disconnect_raises:
stage.transport.disconnect = mocker.MagicMock(side_effect=arbitrary_exception)
stage.pipeline_root.connected = True
stage.run_op(pending_op)
stage._pending_connection_op = None
@ -1319,12 +1354,16 @@ class TestMQTTTransportStageWatchdogExpired(MQTTTransportStageTestConfigComplex)
assert stage.send_event_up.call_count == 0
@pytest.mark.parametrize(*disconnect_can_raise)
@pytest.mark.it(
"Does not send a DisconnectedEvent if the op that started the watchdog is no longer pending and the pipeline connected flag is False"
)
def test_does_not_send_disconnected_event_if_no_longer_pending_and_not_connected(
self, mocker, stage, pending_op, mock_timer
self, mocker, stage, pending_op, mock_timer, disconnect_raises, arbitrary_exception
):
if disconnect_raises:
stage.transport.disconnect = mocker.MagicMock(side_effect=arbitrary_exception)
stage.pipeline_root.connected = False
stage.run_op(pending_op)
stage._pending_connection_op = None

Просмотреть файл

@ -119,6 +119,11 @@ operation_return_codes = [
"rc": mqtt.MQTT_ERR_QUEUE_SIZE,
"error": errors.ProtocolClientError,
},
{
"name": "MQTT_ERR_KEEPALIVE",
"rc": mqtt.MQTT_ERR_KEEPALIVE,
"error": errors.ConnectionDroppedError,
},
]