refactor(azure-iot-device): .pipeline_nucleus -> .nucleus (#1011)
This commit is contained in:
Родитель
539e12ad48
Коммит
d2fa4bb167
|
@ -65,9 +65,9 @@ class PipelineStage(abc.ABC):
|
|||
:type next: PipelineStage
|
||||
:ivar previous: The previous stage in the pipeline. Set to None if this is the first stage in the pipeline.
|
||||
:type previous: PipelineStage
|
||||
:ivar pipeline_nucleus: The pipeline's "nucleus" which contains global pipeline information, accessible
|
||||
:ivar nucleus: The pipeline's "nucleus" which contains global pipeline information, accessible
|
||||
from all stages
|
||||
:type pipeline_nucleus: PipelineNucleus
|
||||
:type nucleus: PipelineNucleus
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
|
@ -77,7 +77,7 @@ class PipelineStage(abc.ABC):
|
|||
self.name = self.__class__.__name__
|
||||
self.next = None
|
||||
self.previous = None
|
||||
self.pipeline_nucleus = None
|
||||
self.nucleus = None
|
||||
|
||||
@pipeline_thread.runs_on_pipeline_thread
|
||||
def run_op(self, op):
|
||||
|
@ -253,14 +253,14 @@ class PipelineRootStage(PipelineStage):
|
|||
:type on_disconnected_handler: Function
|
||||
"""
|
||||
|
||||
def __init__(self, pipeline_nucleus):
|
||||
def __init__(self, nucleus):
|
||||
super().__init__()
|
||||
self.on_pipeline_event_handler = None
|
||||
self.on_connected_handler = None
|
||||
self.on_disconnected_handler = None
|
||||
self.on_new_sastoken_required_handler = None
|
||||
self.on_background_exception_handler = None
|
||||
self.pipeline_nucleus = pipeline_nucleus
|
||||
self.nucleus = nucleus
|
||||
|
||||
def run_op(self, op):
|
||||
# CT-TODO: make this more elegant
|
||||
|
@ -283,7 +283,7 @@ class PipelineRootStage(PipelineStage):
|
|||
old_tail = old_tail.next
|
||||
old_tail.next = new_stage
|
||||
new_stage.previous = old_tail
|
||||
new_stage.pipeline_nucleus = self.pipeline_nucleus
|
||||
new_stage.nucleus = self.nucleus
|
||||
return self
|
||||
|
||||
@pipeline_thread.runs_on_pipeline_thread
|
||||
|
@ -301,7 +301,7 @@ class PipelineRootStage(PipelineStage):
|
|||
logger.debug(
|
||||
"{}: ConnectedEvent received. Calling on_connected_handler".format(self.name)
|
||||
)
|
||||
self.pipeline_nucleus.connected = True
|
||||
self.nucleus.connected = True
|
||||
if self.on_connected_handler:
|
||||
pipeline_thread.invoke_on_callback_thread_nowait(self.on_connected_handler)()
|
||||
|
||||
|
@ -309,7 +309,7 @@ class PipelineRootStage(PipelineStage):
|
|||
logger.debug(
|
||||
"{}: DisconnectedEvent received. Calling on_disconnected_handler".format(self.name)
|
||||
)
|
||||
self.pipeline_nucleus.connected = False
|
||||
self.nucleus.connected = False
|
||||
if self.on_disconnected_handler:
|
||||
pipeline_thread.invoke_on_callback_thread_nowait(self.on_disconnected_handler)()
|
||||
|
||||
|
@ -367,14 +367,14 @@ class SasTokenStage(PipelineStage):
|
|||
def _run_op(self, op):
|
||||
if (
|
||||
isinstance(op, pipeline_ops_base.InitializePipelineOperation)
|
||||
and self.pipeline_nucleus.pipeline_configuration.sastoken is not None
|
||||
and self.nucleus.pipeline_configuration.sastoken is not None
|
||||
):
|
||||
# Start an alarm (renewal or replacement depending on token type)
|
||||
self._start_token_update_alarm()
|
||||
self.send_op_down(op)
|
||||
elif (
|
||||
isinstance(op, pipeline_ops_base.ReauthorizeConnectionOperation)
|
||||
and self.pipeline_nucleus.pipeline_configuration.sastoken is not None
|
||||
and self.nucleus.pipeline_configuration.sastoken is not None
|
||||
):
|
||||
# NOTE 1: This case (currently) implies that we are using Non-Renewable SAS,
|
||||
# although it's not enforced here (it's a product of how the pipeline and client are
|
||||
|
@ -428,7 +428,7 @@ class SasTokenStage(PipelineStage):
|
|||
self._cancel_token_update_alarm()
|
||||
|
||||
update_time = (
|
||||
self.pipeline_nucleus.pipeline_configuration.sastoken.expiry_time
|
||||
self.nucleus.pipeline_configuration.sastoken.expiry_time
|
||||
- self.DEFAULT_TOKEN_UPDATE_MARGIN
|
||||
)
|
||||
|
||||
|
@ -443,7 +443,7 @@ class SasTokenStage(PipelineStage):
|
|||
update_time = time.time() + threading.TIMEOUT_MAX
|
||||
logger.warning(
|
||||
"SAS Token expiration ({expiry} seconds) exceeds max scheduled renewal time ({max} seconds). Will be renewing after {max} seconds instead".format(
|
||||
expiry=self.pipeline_nucleus.pipeline_configuration.sastoken.expiry_time,
|
||||
expiry=self.nucleus.pipeline_configuration.sastoken.expiry_time,
|
||||
max=threading.TIMEOUT_MAX,
|
||||
)
|
||||
)
|
||||
|
@ -452,7 +452,7 @@ class SasTokenStage(PipelineStage):
|
|||
|
||||
# For renewable SasTokens, create an alarm that will automatically renew the token,
|
||||
# and then start another alarm.
|
||||
if isinstance(self.pipeline_nucleus.pipeline_configuration.sastoken, st.RenewableSasToken):
|
||||
if isinstance(self.nucleus.pipeline_configuration.sastoken, st.RenewableSasToken):
|
||||
logger.debug(
|
||||
"{}: Scheduling automatic SAS Token renewal at epoch time: {}".format(
|
||||
self.name, update_time
|
||||
|
@ -466,7 +466,7 @@ class SasTokenStage(PipelineStage):
|
|||
this._cancel_reauth_retry_timer()
|
||||
logger.info("{}: Renewing SAS Token...".format(self.name))
|
||||
# Renew the token
|
||||
sastoken = this.pipeline_nucleus.pipeline_configuration.sastoken
|
||||
sastoken = this.nucleus.pipeline_configuration.sastoken
|
||||
try:
|
||||
sastoken.refresh()
|
||||
except st.SasTokenError as e:
|
||||
|
@ -479,7 +479,7 @@ class SasTokenStage(PipelineStage):
|
|||
# now that token has been renewed. If the pipeline is not currently connected,
|
||||
# there is no need to do this, as the next connection will be using the new
|
||||
# credentials.
|
||||
if this.pipeline_nucleus.connected:
|
||||
if this.nucleus.connected:
|
||||
this._reauthorize()
|
||||
|
||||
# Once again, start a renewal alarm
|
||||
|
@ -528,8 +528,8 @@ class SasTokenStage(PipelineStage):
|
|||
# wouldn't know to reconnect, because the expected state of a failed reauth is
|
||||
# to be disconnected.
|
||||
if (
|
||||
not this.pipeline_nucleus.connected
|
||||
and this.pipeline_nucleus.pipeline_configuration.connection_retry
|
||||
not this.nucleus.connected
|
||||
and this.nucleus.pipeline_configuration.connection_retry
|
||||
):
|
||||
logger.info("{}: Retrying connection reauthorization".format(this.name))
|
||||
# No need to cancel the timer, because if this is running, it has already ended
|
||||
|
@ -539,11 +539,11 @@ class SasTokenStage(PipelineStage):
|
|||
# We need to check this when the timer expires as well as before creating
|
||||
# the timer in case connection has been re-established while timer was
|
||||
# running
|
||||
if not this.pipeline_nucleus.connected:
|
||||
if not this.nucleus.connected:
|
||||
this._reauthorize()
|
||||
|
||||
this._reauth_retry_timer = threading.Timer(
|
||||
this.pipeline_nucleus.pipeline_configuration.connection_retry_interval,
|
||||
this.nucleus.pipeline_configuration.connection_retry_interval,
|
||||
retry_reauthorize,
|
||||
)
|
||||
this._reauth_retry_timer.daemon = True
|
||||
|
@ -570,8 +570,8 @@ class AutoConnectStage(PipelineStage):
|
|||
# we're not connected and the auto-connect feature is enabled.
|
||||
if (
|
||||
op.needs_connection
|
||||
and not self.pipeline_nucleus.connected
|
||||
and self.pipeline_nucleus.pipeline_configuration.auto_connect
|
||||
and not self.nucleus.connected
|
||||
and self.nucleus.pipeline_configuration.auto_connect
|
||||
):
|
||||
logger.debug(
|
||||
"{}({}): Op needs connection. Queueing this op and starting a ConnectionOperation".format(
|
||||
|
@ -642,16 +642,13 @@ class ConnectionLockStage(PipelineStage):
|
|||
)
|
||||
self.queue.put_nowait(op)
|
||||
|
||||
elif isinstance(op, pipeline_ops_base.ConnectOperation) and self.pipeline_nucleus.connected:
|
||||
elif isinstance(op, pipeline_ops_base.ConnectOperation) and self.nucleus.connected:
|
||||
logger.info(
|
||||
"{}({}): Transport is already connected. Completing.".format(self.name, op.name)
|
||||
)
|
||||
op.complete()
|
||||
|
||||
elif (
|
||||
isinstance(op, pipeline_ops_base.DisconnectOperation)
|
||||
and not self.pipeline_nucleus.connected
|
||||
):
|
||||
elif isinstance(op, pipeline_ops_base.DisconnectOperation) and not self.nucleus.connected:
|
||||
logger.info(
|
||||
"{}({}): Transport is already disconnected. Completing.".format(self.name, op.name)
|
||||
)
|
||||
|
@ -1105,7 +1102,7 @@ class ReconnectStage(PipelineStage):
|
|||
# NOTE: Connection Retry == Reconnect. These terms are used interchangeably. 'reconnect' is a
|
||||
# more accurate term for the process happening internally here, but the feature is called
|
||||
# 'connection retry' when facing the end user.
|
||||
if self.pipeline_nucleus.pipeline_configuration.connection_retry:
|
||||
if self.nucleus.pipeline_configuration.connection_retry:
|
||||
|
||||
# If receiving a connection op while one is already in progress, wait for the current
|
||||
# one to finish. This is kind of like a ConnectionLockStage, but inside this one.
|
||||
|
@ -1223,7 +1220,7 @@ class ReconnectStage(PipelineStage):
|
|||
@pipeline_thread.runs_on_pipeline_thread
|
||||
def _handle_pipeline_event(self, event):
|
||||
# Connection Retry Enabled
|
||||
if self.pipeline_nucleus.pipeline_configuration.connection_retry:
|
||||
if self.nucleus.pipeline_configuration.connection_retry:
|
||||
if isinstance(event, pipeline_events_base.ConnectedEvent):
|
||||
# First, clear the reconnect timer no matter what.
|
||||
# We are now connected, so any ongoing reconnect is unnecessary
|
||||
|
@ -1251,7 +1248,7 @@ class ReconnectStage(PipelineStage):
|
|||
else:
|
||||
logger.warning(
|
||||
"{}: ConnectedEvent received while in unexpected state - {}, Connected: {}".format(
|
||||
self.name, self.state, self.pipeline_nucleus.connected
|
||||
self.name, self.state, self.nucleus.connected
|
||||
)
|
||||
)
|
||||
logger.debug(
|
||||
|
@ -1311,7 +1308,7 @@ class ReconnectStage(PipelineStage):
|
|||
else:
|
||||
logger.warning(
|
||||
"{}: DisconnectEvent received while in unexpected state - {}, Connected: {}".format(
|
||||
self.name, self.state, self.pipeline_nucleus.connected
|
||||
self.name, self.state, self.nucleus.connected
|
||||
)
|
||||
)
|
||||
logger.debug(
|
||||
|
@ -1390,7 +1387,7 @@ class ReconnectStage(PipelineStage):
|
|||
op.name,
|
||||
error,
|
||||
this.state,
|
||||
this.pipeline_nucleus.connected,
|
||||
this.nucleus.connected,
|
||||
)
|
||||
)
|
||||
|
||||
|
@ -1411,7 +1408,7 @@ class ReconnectStage(PipelineStage):
|
|||
"{}: Reconnect failed. Starting reconnection timer".format(this.name)
|
||||
)
|
||||
this._start_reconnect_timer(
|
||||
this.pipeline_nucleus.pipeline_configuration.connection_retry_interval
|
||||
this.nucleus.pipeline_configuration.connection_retry_interval
|
||||
)
|
||||
else:
|
||||
# all others are permanent errors
|
||||
|
@ -1433,7 +1430,7 @@ class ReconnectStage(PipelineStage):
|
|||
@pipeline_thread.runs_on_pipeline_thread
|
||||
def _should_reconnect(self, error):
|
||||
"""Returns True if a reconnect should occur in response to an error, False otherwise"""
|
||||
if self.pipeline_nucleus.pipeline_configuration.connection_retry:
|
||||
if self.nucleus.pipeline_configuration.connection_retry:
|
||||
if type(error) in self.transient_connect_errors:
|
||||
return True
|
||||
return False
|
||||
|
@ -1452,7 +1449,7 @@ class ReconnectStage(PipelineStage):
|
|||
this = self_weakref()
|
||||
logger.debug(
|
||||
"{}: Reconnect timer expired. State is {} Connected is {}.".format(
|
||||
self.name, self.state, self.pipeline_nucleus.connected
|
||||
self.name, self.state, self.nucleus.connected
|
||||
)
|
||||
)
|
||||
# Clear the reconnect timer here first and foremost so it doesn't accidentally
|
||||
|
@ -1483,7 +1480,7 @@ class ReconnectStage(PipelineStage):
|
|||
)
|
||||
)
|
||||
this._start_reconnect_timer(
|
||||
this.pipeline_nucleus.pipeline_configuration.connection_retry_interval
|
||||
this.nucleus.pipeline_configuration.connection_retry_interval
|
||||
)
|
||||
else:
|
||||
logger.debug(
|
||||
|
|
|
@ -37,32 +37,32 @@ class HTTPTransportStage(PipelineStage):
|
|||
|
||||
# If there is a gateway hostname, use that as the hostname for connection,
|
||||
# rather than the hostname itself
|
||||
if self.pipeline_nucleus.pipeline_configuration.gateway_hostname:
|
||||
if self.nucleus.pipeline_configuration.gateway_hostname:
|
||||
logger.debug(
|
||||
"Gateway Hostname Present. Setting Hostname to: {}".format(
|
||||
self.pipeline_nucleus.pipeline_configuration.gateway_hostname
|
||||
self.nucleus.pipeline_configuration.gateway_hostname
|
||||
)
|
||||
)
|
||||
hostname = self.pipeline_nucleus.pipeline_configuration.gateway_hostname
|
||||
hostname = self.nucleus.pipeline_configuration.gateway_hostname
|
||||
else:
|
||||
logger.debug(
|
||||
"Gateway Hostname not present. Setting Hostname to: {}".format(
|
||||
self.pipeline_nucleus.pipeline_configuration.hostname
|
||||
self.nucleus.pipeline_configuration.hostname
|
||||
)
|
||||
)
|
||||
hostname = self.pipeline_nucleus.pipeline_configuration.hostname
|
||||
hostname = self.nucleus.pipeline_configuration.hostname
|
||||
|
||||
# Create HTTP Transport
|
||||
logger.debug("{}({}): got connection args".format(self.name, op.name))
|
||||
self.transport = HTTPTransport(
|
||||
hostname=hostname,
|
||||
server_verification_cert=self.pipeline_nucleus.pipeline_configuration.server_verification_cert,
|
||||
x509_cert=self.pipeline_nucleus.pipeline_configuration.x509,
|
||||
cipher=self.pipeline_nucleus.pipeline_configuration.cipher,
|
||||
proxy_options=self.pipeline_nucleus.pipeline_configuration.proxy_options,
|
||||
server_verification_cert=self.nucleus.pipeline_configuration.server_verification_cert,
|
||||
x509_cert=self.nucleus.pipeline_configuration.x509,
|
||||
cipher=self.nucleus.pipeline_configuration.cipher,
|
||||
proxy_options=self.nucleus.pipeline_configuration.proxy_options,
|
||||
)
|
||||
|
||||
self.pipeline_nucleus.transport = self.transport
|
||||
self.nucleus.transport = self.transport
|
||||
op.complete()
|
||||
|
||||
elif isinstance(op, pipeline_ops_http.HTTPRequestAndResponseOperation):
|
||||
|
@ -97,10 +97,8 @@ class HTTPTransportStage(PipelineStage):
|
|||
# http_headers will affect the op.headers, which would be an unintended side effect
|
||||
# and not a good practice.
|
||||
http_headers = copy.deepcopy(op.headers)
|
||||
if self.pipeline_nucleus.pipeline_configuration.sastoken:
|
||||
http_headers["Authorization"] = str(
|
||||
self.pipeline_nucleus.pipeline_configuration.sastoken
|
||||
)
|
||||
if self.nucleus.pipeline_configuration.sastoken:
|
||||
http_headers["Authorization"] = str(self.nucleus.pipeline_configuration.sastoken)
|
||||
|
||||
self.transport.request(
|
||||
method=op.method,
|
||||
|
|
|
@ -94,7 +94,7 @@ class MQTTTransportStage(PipelineStage):
|
|||
)
|
||||
logger.info(traceback.format_exc())
|
||||
|
||||
if this.pipeline_nucleus.connected:
|
||||
if this.nucleus.connected:
|
||||
|
||||
logger.info(
|
||||
"{}({}): Pipeline is still connected on watchdog expiration. Sending DisconnectedEvent".format(
|
||||
|
@ -130,20 +130,20 @@ class MQTTTransportStage(PipelineStage):
|
|||
|
||||
# If there is a gateway hostname, use that as the hostname for connection,
|
||||
# rather than the hostname itself
|
||||
if self.pipeline_nucleus.pipeline_configuration.gateway_hostname:
|
||||
if self.nucleus.pipeline_configuration.gateway_hostname:
|
||||
logger.debug(
|
||||
"Gateway Hostname Present. Setting Hostname to: {}".format(
|
||||
self.pipeline_nucleus.pipeline_configuration.gateway_hostname
|
||||
self.nucleus.pipeline_configuration.gateway_hostname
|
||||
)
|
||||
)
|
||||
hostname = self.pipeline_nucleus.pipeline_configuration.gateway_hostname
|
||||
hostname = self.nucleus.pipeline_configuration.gateway_hostname
|
||||
else:
|
||||
logger.debug(
|
||||
"Gateway Hostname not present. Setting Hostname to: {}".format(
|
||||
self.pipeline_nucleus.pipeline_configuration.hostname
|
||||
self.nucleus.pipeline_configuration.hostname
|
||||
)
|
||||
)
|
||||
hostname = self.pipeline_nucleus.pipeline_configuration.hostname
|
||||
hostname = self.nucleus.pipeline_configuration.hostname
|
||||
|
||||
# Create the Transport object, set it's handlers
|
||||
logger.debug("{}({}): got connection args".format(self.name, op.name))
|
||||
|
@ -151,12 +151,12 @@ class MQTTTransportStage(PipelineStage):
|
|||
client_id=op.client_id,
|
||||
hostname=hostname,
|
||||
username=op.username,
|
||||
server_verification_cert=self.pipeline_nucleus.pipeline_configuration.server_verification_cert,
|
||||
x509_cert=self.pipeline_nucleus.pipeline_configuration.x509,
|
||||
websockets=self.pipeline_nucleus.pipeline_configuration.websockets,
|
||||
cipher=self.pipeline_nucleus.pipeline_configuration.cipher,
|
||||
proxy_options=self.pipeline_nucleus.pipeline_configuration.proxy_options,
|
||||
keep_alive=self.pipeline_nucleus.pipeline_configuration.keep_alive,
|
||||
server_verification_cert=self.nucleus.pipeline_configuration.server_verification_cert,
|
||||
x509_cert=self.nucleus.pipeline_configuration.x509,
|
||||
websockets=self.nucleus.pipeline_configuration.websockets,
|
||||
cipher=self.nucleus.pipeline_configuration.cipher,
|
||||
proxy_options=self.nucleus.pipeline_configuration.proxy_options,
|
||||
keep_alive=self.nucleus.pipeline_configuration.keep_alive,
|
||||
)
|
||||
self.transport.on_mqtt_connected_handler = self._on_mqtt_connected
|
||||
self.transport.on_mqtt_connection_failure_handler = self._on_mqtt_connection_failure
|
||||
|
@ -198,8 +198,8 @@ class MQTTTransportStage(PipelineStage):
|
|||
self._start_connection_watchdog(op)
|
||||
# Use SasToken as password if present. If not present (e.g. using X509),
|
||||
# then no password is required because auth is handled via other means.
|
||||
if self.pipeline_nucleus.pipeline_configuration.sastoken:
|
||||
password = str(self.pipeline_nucleus.pipeline_configuration.sastoken)
|
||||
if self.nucleus.pipeline_configuration.sastoken:
|
||||
password = str(self.nucleus.pipeline_configuration.sastoken)
|
||||
else:
|
||||
password = None
|
||||
try:
|
||||
|
@ -445,7 +445,7 @@ class MQTTTransportStage(PipelineStage):
|
|||
|
||||
# If there is no connection retry, cancel any transport operations waiting on response
|
||||
# so that they do not get stuck there.
|
||||
if not self.pipeline_nucleus.pipeline_configuration.connection_retry:
|
||||
if not self.nucleus.pipeline_configuration.connection_retry:
|
||||
logger.debug(
|
||||
"{}: Connection Retry disabled - cancelling in-flight operations".format(
|
||||
self.name
|
||||
|
|
|
@ -49,10 +49,10 @@ class HTTPPipeline(object):
|
|||
# This is not an ideal solution, but it's the simplest one for the time being.
|
||||
|
||||
# Contains data and information shared globally within the pipeline
|
||||
self._pipeline_nucleus = pipeline_nucleus.PipelineNucleus(pipeline_configuration)
|
||||
self._nucleus = pipeline_nucleus.PipelineNucleus(pipeline_configuration)
|
||||
|
||||
self._pipeline = (
|
||||
pipeline_stages_base.PipelineRootStage(self._pipeline_nucleus)
|
||||
pipeline_stages_base.PipelineRootStage(self._nucleus)
|
||||
.append_stage(pipeline_stages_iothub_http.IoTHubHTTPTranslationStage())
|
||||
.append_stage(pipeline_stages_http.HTTPTransportStage())
|
||||
)
|
||||
|
@ -81,7 +81,7 @@ class HTTPPipeline(object):
|
|||
:raises: :class:`azure.iot.device.iothub.pipeline.exceptions.ProtocolClientError`
|
||||
"""
|
||||
logger.debug("HTTPPipeline invoke_method called")
|
||||
if not self._pipeline_nucleus.pipeline_configuration.method_invoke:
|
||||
if not self._nucleus.pipeline_configuration.method_invoke:
|
||||
# If this parameter is not set, that means that the pipeline was not generated by the edge environment. Method invoke only works for clients generated using the edge environment.
|
||||
error = pipeline_exceptions.PipelineRuntimeError(
|
||||
"invoke_method called, but it is only supported on module clients generated from an edge environment. If you are not using a module generated from an edge environment, you cannot use invoke_method"
|
||||
|
@ -115,7 +115,7 @@ class HTTPPipeline(object):
|
|||
:raises: :class:`azure.iot.device.iothub.pipeline.exceptions.ProtocolClientError`
|
||||
"""
|
||||
logger.debug("HTTPPipeline get_storage_info_for_blob called")
|
||||
if not self._pipeline_nucleus.pipeline_configuration.blob_upload:
|
||||
if not self._nucleus.pipeline_configuration.blob_upload:
|
||||
# If this parameter is not set, that means this is not a device client. Upload to blob is not supported on module clients.
|
||||
error = pipeline_exceptions.PipelineRuntimeError(
|
||||
"get_storage_info_for_blob called, but it is only supported for use with device clients. Ensure you are using a device client."
|
||||
|
@ -153,7 +153,7 @@ class HTTPPipeline(object):
|
|||
:raises: :class:`azure.iot.device.iothub.pipeline.exceptions.ProtocolClientError`
|
||||
"""
|
||||
logger.debug("HTTPPipeline notify_blob_upload_status called")
|
||||
if not self._pipeline_nucleus.pipeline_configuration.blob_upload:
|
||||
if not self._nucleus.pipeline_configuration.blob_upload:
|
||||
# If this parameter is not set, that means this is not a device client. Upload to blob is not supported on module clients.
|
||||
error = pipeline_exceptions.PipelineRuntimeError(
|
||||
"notify_blob_upload_status called, but it is only supported for use with device clients. Ensure you are using a device client."
|
||||
|
|
|
@ -52,13 +52,13 @@ class MQTTPipeline(object):
|
|||
self.on_twin_patch_received = None
|
||||
|
||||
# Contains data and information shared globally within the pipeline
|
||||
self._pipeline_nucleus = pipeline_nucleus.PipelineNucleus(pipeline_configuration)
|
||||
self._nucleus = pipeline_nucleus.PipelineNucleus(pipeline_configuration)
|
||||
|
||||
self._pipeline = (
|
||||
#
|
||||
# The root is always the root. By definition, it's the first stage in the pipeline.
|
||||
#
|
||||
pipeline_stages_base.PipelineRootStage(self._pipeline_nucleus)
|
||||
pipeline_stages_base.PipelineRootStage(self._nucleus)
|
||||
#
|
||||
# SasTokenStage comes near the root by default because it should be as close
|
||||
# to the top of the pipeline as possible, and does not need to be after anything.
|
||||
|
@ -593,11 +593,11 @@ class MQTTPipeline(object):
|
|||
Pipeline Configuration for the pipeline. Note that while a new config object cannot be
|
||||
provided (read-only), the values stored in the config object CAN be changed.
|
||||
"""
|
||||
return self._pipeline_nucleus.pipeline_configuration
|
||||
return self._nucleus.pipeline_configuration
|
||||
|
||||
@property
|
||||
def connected(self):
|
||||
"""
|
||||
Read-only property to indicate if the transport is connected or not.
|
||||
"""
|
||||
return self._pipeline_nucleus.connected
|
||||
return self._nucleus.connected
|
||||
|
|
|
@ -54,16 +54,16 @@ class IoTHubHTTPTranslationStage(PipelineStage):
|
|||
# NOTE: we do not add the sas Authorization header here. Instead we add it later on in
|
||||
# the HTTPTransportStage
|
||||
x_ms_edge_string = "{deviceId}/{moduleId}".format(
|
||||
deviceId=self.pipeline_nucleus.pipeline_configuration.device_id,
|
||||
moduleId=self.pipeline_nucleus.pipeline_configuration.module_id,
|
||||
deviceId=self.nucleus.pipeline_configuration.device_id,
|
||||
moduleId=self.nucleus.pipeline_configuration.module_id,
|
||||
) # these are the identifiers of the current module
|
||||
user_agent_string = urllib.parse.quote_plus(
|
||||
user_agent.get_iothub_user_agent()
|
||||
+ str(self.pipeline_nucleus.pipeline_configuration.product_info)
|
||||
+ str(self.nucleus.pipeline_configuration.product_info)
|
||||
)
|
||||
# Method Invoke must be addressed to the gateway hostname because it is an Edge op
|
||||
headers = {
|
||||
"Host": self.pipeline_nucleus.pipeline_configuration.gateway_hostname,
|
||||
"Host": self.nucleus.pipeline_configuration.gateway_hostname,
|
||||
"Content-Type": "application/json",
|
||||
"Content-Length": str(len(str(body))),
|
||||
"x-ms-edge-moduleId": x_ms_edge_string,
|
||||
|
@ -99,15 +99,15 @@ class IoTHubHTTPTranslationStage(PipelineStage):
|
|||
apiVersion=pkg_constant.IOTHUB_API_VERSION
|
||||
)
|
||||
path = http_path_iothub.get_storage_info_for_blob_path(
|
||||
self.pipeline_nucleus.pipeline_configuration.device_id
|
||||
self.nucleus.pipeline_configuration.device_id
|
||||
)
|
||||
body = json.dumps({"blobName": op.blob_name})
|
||||
user_agent_string = urllib.parse.quote_plus(
|
||||
user_agent.get_iothub_user_agent()
|
||||
+ str(self.pipeline_nucleus.pipeline_configuration.product_info)
|
||||
+ str(self.nucleus.pipeline_configuration.product_info)
|
||||
)
|
||||
headers = {
|
||||
"Host": self.pipeline_nucleus.pipeline_configuration.hostname,
|
||||
"Host": self.nucleus.pipeline_configuration.hostname,
|
||||
"Accept": "application/json",
|
||||
"Content-Type": "application/json",
|
||||
"Content-Length": str(len(str(body))),
|
||||
|
@ -144,7 +144,7 @@ class IoTHubHTTPTranslationStage(PipelineStage):
|
|||
apiVersion=pkg_constant.IOTHUB_API_VERSION
|
||||
)
|
||||
path = http_path_iothub.get_notify_blob_upload_status_path(
|
||||
self.pipeline_nucleus.pipeline_configuration.device_id
|
||||
self.nucleus.pipeline_configuration.device_id
|
||||
)
|
||||
body = json.dumps(
|
||||
{
|
||||
|
@ -156,13 +156,13 @@ class IoTHubHTTPTranslationStage(PipelineStage):
|
|||
)
|
||||
user_agent_string = urllib.parse.quote_plus(
|
||||
user_agent.get_iothub_user_agent()
|
||||
+ str(self.pipeline_nucleus.pipeline_configuration.product_info)
|
||||
+ str(self.nucleus.pipeline_configuration.product_info)
|
||||
)
|
||||
|
||||
# NOTE we do not add the sas Authorization header here. Instead we add it later on in
|
||||
# the HTTPTransportStage
|
||||
headers = {
|
||||
"Host": self.pipeline_nucleus.pipeline_configuration.hostname,
|
||||
"Host": self.nucleus.pipeline_configuration.hostname,
|
||||
"Content-Type": "application/json; charset=utf-8",
|
||||
"Content-Length": str(len(str(body))),
|
||||
"User-Agent": user_agent_string,
|
||||
|
|
|
@ -36,20 +36,20 @@ class IoTHubMQTTTranslationStage(PipelineStage):
|
|||
|
||||
if isinstance(op, pipeline_ops_base.InitializePipelineOperation):
|
||||
|
||||
if self.pipeline_nucleus.pipeline_configuration.module_id:
|
||||
if self.nucleus.pipeline_configuration.module_id:
|
||||
# Module Format
|
||||
client_id = "{}/{}".format(
|
||||
self.pipeline_nucleus.pipeline_configuration.device_id,
|
||||
self.pipeline_nucleus.pipeline_configuration.module_id,
|
||||
self.nucleus.pipeline_configuration.device_id,
|
||||
self.nucleus.pipeline_configuration.module_id,
|
||||
)
|
||||
else:
|
||||
# Device Format
|
||||
client_id = self.pipeline_nucleus.pipeline_configuration.device_id
|
||||
client_id = self.nucleus.pipeline_configuration.device_id
|
||||
|
||||
query_param_seq = []
|
||||
|
||||
# Apply query parameters (i.e. key1=value1&key2=value2...&keyN=valueN format)
|
||||
custom_product_info = str(self.pipeline_nucleus.pipeline_configuration.product_info)
|
||||
custom_product_info = str(self.nucleus.pipeline_configuration.product_info)
|
||||
if custom_product_info.startswith(
|
||||
pkg_constant.DIGITAL_TWIN_PREFIX
|
||||
): # Digital Twin Stuff
|
||||
|
@ -71,7 +71,7 @@ class IoTHubMQTTTranslationStage(PipelineStage):
|
|||
# See the repo wiki article for details:
|
||||
# https://github.com/Azure/azure-iot-sdk-python/wiki/URL-Encoding-(MQTT)
|
||||
username = "{hostname}/{client_id}/?{query_params}".format(
|
||||
hostname=self.pipeline_nucleus.pipeline_configuration.hostname,
|
||||
hostname=self.nucleus.pipeline_configuration.hostname,
|
||||
client_id=client_id,
|
||||
query_params=urllib.parse.urlencode(query_param_seq, quote_via=urllib.parse.quote),
|
||||
)
|
||||
|
@ -88,8 +88,8 @@ class IoTHubMQTTTranslationStage(PipelineStage):
|
|||
):
|
||||
# Convert SendTelemetry and SendOutputMessageOperation operations into MQTT Publish operations
|
||||
telemetry_topic = mqtt_topic_iothub.get_telemetry_topic_for_publish(
|
||||
device_id=self.pipeline_nucleus.pipeline_configuration.device_id,
|
||||
module_id=self.pipeline_nucleus.pipeline_configuration.module_id,
|
||||
device_id=self.nucleus.pipeline_configuration.device_id,
|
||||
module_id=self.nucleus.pipeline_configuration.module_id,
|
||||
)
|
||||
topic = mqtt_topic_iothub.encode_message_properties_in_topic(
|
||||
op.message, telemetry_topic
|
||||
|
@ -154,12 +154,12 @@ class IoTHubMQTTTranslationStage(PipelineStage):
|
|||
def _get_feature_subscription_topic(self, feature):
|
||||
if feature == pipeline_constant.C2D_MSG:
|
||||
return mqtt_topic_iothub.get_c2d_topic_for_subscribe(
|
||||
self.pipeline_nucleus.pipeline_configuration.device_id
|
||||
self.nucleus.pipeline_configuration.device_id
|
||||
)
|
||||
elif feature == pipeline_constant.INPUT_MSG:
|
||||
return mqtt_topic_iothub.get_input_topic_for_subscribe(
|
||||
self.pipeline_nucleus.pipeline_configuration.device_id,
|
||||
self.pipeline_nucleus.pipeline_configuration.module_id,
|
||||
self.nucleus.pipeline_configuration.device_id,
|
||||
self.nucleus.pipeline_configuration.module_id,
|
||||
)
|
||||
elif feature == pipeline_constant.METHODS:
|
||||
return mqtt_topic_iothub.get_method_topic_for_subscribe()
|
||||
|
@ -183,8 +183,8 @@ class IoTHubMQTTTranslationStage(PipelineStage):
|
|||
# Is there value to the user getting the original bytestring from the wire?
|
||||
if isinstance(event, pipeline_events_mqtt.IncomingMQTTMessageEvent):
|
||||
topic = event.topic
|
||||
device_id = self.pipeline_nucleus.pipeline_configuration.device_id
|
||||
module_id = self.pipeline_nucleus.pipeline_configuration.module_id
|
||||
device_id = self.nucleus.pipeline_configuration.device_id
|
||||
module_id = self.nucleus.pipeline_configuration.module_id
|
||||
|
||||
if mqtt_topic_iothub.is_c2d_topic(topic, device_id):
|
||||
message = Message(event.payload)
|
||||
|
|
|
@ -39,13 +39,13 @@ class MQTTPipeline(object):
|
|||
self._registration_id = pipeline_configuration.registration_id
|
||||
|
||||
# Contains data and information shared globally within the pipeline
|
||||
self._pipeline_nucleus = pipeline_nucleus.PipelineNucleus(pipeline_configuration)
|
||||
self._nucleus = pipeline_nucleus.PipelineNucleus(pipeline_configuration)
|
||||
|
||||
self._pipeline = (
|
||||
#
|
||||
# The root is always the root. By definition, it's the first stage in the pipeline.
|
||||
#
|
||||
pipeline_stages_base.PipelineRootStage(self._pipeline_nucleus)
|
||||
pipeline_stages_base.PipelineRootStage(self._nucleus)
|
||||
#
|
||||
# SasTokenStage comes near the root by default because it should be as close
|
||||
# to the top of the pipeline as possible, and does not need to be after anything.
|
||||
|
|
|
@ -38,14 +38,14 @@ class ProvisioningMQTTTranslationStage(PipelineStage):
|
|||
|
||||
if isinstance(op, pipeline_ops_base.InitializePipelineOperation):
|
||||
|
||||
client_id = self.pipeline_nucleus.pipeline_configuration.registration_id
|
||||
client_id = self.nucleus.pipeline_configuration.registration_id
|
||||
query_param_seq = [
|
||||
("api-version", pkg_constant.PROVISIONING_API_VERSION),
|
||||
("ClientVersion", user_agent.get_provisioning_user_agent()),
|
||||
]
|
||||
username = "{id_scope}/registrations/{registration_id}/{query_params}".format(
|
||||
id_scope=self.pipeline_nucleus.pipeline_configuration.id_scope,
|
||||
registration_id=self.pipeline_nucleus.pipeline_configuration.registration_id,
|
||||
id_scope=self.nucleus.pipeline_configuration.id_scope,
|
||||
registration_id=self.nucleus.pipeline_configuration.registration_id,
|
||||
query_params=urllib.parse.urlencode(query_param_seq, quote_via=urllib.parse.quote),
|
||||
)
|
||||
|
||||
|
|
|
@ -50,10 +50,10 @@ def add_base_pipeline_stage_tests(
|
|||
stage = cls_type(**init_kwargs)
|
||||
assert stage.previous is None
|
||||
|
||||
@pytest.mark.it("Initializes 'pipeline_nucleus' attribute as None")
|
||||
@pytest.mark.it("Initializes 'nucleus' attribute as None")
|
||||
def test_pipeline_nucleus(self, cls_type, init_kwargs):
|
||||
stage = cls_type(**init_kwargs)
|
||||
assert stage.pipeline_nucleus is None
|
||||
assert stage.nucleus is None
|
||||
|
||||
if extended_stage_instantiation_test_class:
|
||||
|
||||
|
|
|
@ -78,7 +78,7 @@ class PipelineRootStageTestConfig(object):
|
|||
|
||||
@pytest.fixture
|
||||
def init_kwargs(self, mocker):
|
||||
return {"pipeline_nucleus": pipeline_nucleus.PipelineNucleus(mocker.MagicMock())}
|
||||
return {"nucleus": pipeline_nucleus.PipelineNucleus(mocker.MagicMock())}
|
||||
|
||||
@pytest.fixture
|
||||
def stage(self, mocker, cls_type, init_kwargs):
|
||||
|
@ -115,10 +115,10 @@ class PipelineRootStageInstantiationTests(PipelineRootStageTestConfig):
|
|||
stage = pipeline_stages_base.PipelineRootStage(**init_kwargs)
|
||||
assert stage.on_background_exception_handler is None
|
||||
|
||||
@pytest.mark.it("Initializes 'pipeline_nucleus' with the provided 'pipeline_nucleus' parameter")
|
||||
@pytest.mark.it("Initializes 'nucleus' with the provided 'nucleus' parameter")
|
||||
def test_pipeline_nucleus(self, init_kwargs):
|
||||
stage = pipeline_stages_base.PipelineRootStage(**init_kwargs)
|
||||
assert stage.pipeline_nucleus is init_kwargs["pipeline_nucleus"]
|
||||
assert stage.nucleus is init_kwargs["nucleus"]
|
||||
|
||||
|
||||
pipeline_stage_test.add_base_pipeline_stage_tests(
|
||||
|
@ -155,7 +155,7 @@ class TestPipelineRootStageAppendStage(PipelineRootStageTestConfig):
|
|||
stage.append_stage(new_stage)
|
||||
assert prev_tail.next is new_stage
|
||||
assert new_stage.previous is prev_tail
|
||||
assert new_stage.pipeline_nucleus is root.pipeline_nucleus
|
||||
assert new_stage.nucleus is root.nucleus
|
||||
prev_tail = new_stage
|
||||
|
||||
|
||||
|
@ -186,9 +186,9 @@ class TestPipelineRootStageHandlePipelineEventWithConnectedEvent(
|
|||
|
||||
@pytest.mark.it("Sets the 'connected' attribute on the PipelineNucleus to True")
|
||||
def test_set_connected_true(self, stage, event):
|
||||
assert not stage.pipeline_nucleus.connected
|
||||
assert not stage.nucleus.connected
|
||||
stage.handle_pipeline_event(event)
|
||||
assert stage.pipeline_nucleus.connected
|
||||
assert stage.nucleus.connected
|
||||
|
||||
@pytest.mark.it("Invokes the 'on_connected_handler' handler function, if set")
|
||||
def test_invoke_handler(self, mocker, stage, event):
|
||||
|
@ -212,9 +212,9 @@ class TestPipelineRootStageHandlePipelineEventWithDisconnectedEvent(
|
|||
|
||||
@pytest.mark.it("Sets the 'connected' attribute on the PipelineNucleus to True")
|
||||
def test_set_connected_false(self, stage, event):
|
||||
stage.pipeline_nucleus.connected = True
|
||||
stage.nucleus.connected = True
|
||||
stage.handle_pipeline_event(event)
|
||||
assert not stage.pipeline_nucleus.connected
|
||||
assert not stage.nucleus.connected
|
||||
|
||||
@pytest.mark.it("Invokes the 'on_disconnected_handler' handler function, if set")
|
||||
def test_invoke_handler(self, mocker, stage, event):
|
||||
|
@ -305,11 +305,9 @@ class SasTokenStageTestConfig(object):
|
|||
@pytest.fixture
|
||||
def stage(self, mocker, cls_type, sastoken, init_kwargs):
|
||||
stage = cls_type(**init_kwargs)
|
||||
stage.pipeline_nucleus = pipeline_nucleus.PipelineNucleus(
|
||||
pipeline_configuration=mocker.MagicMock()
|
||||
)
|
||||
stage.pipeline_nucleus.pipeline_configuration.sastoken = sastoken
|
||||
stage.pipeline_nucleus.pipeline_configuration.connection_retry_interval = 1234
|
||||
stage.nucleus = pipeline_nucleus.PipelineNucleus(pipeline_configuration=mocker.MagicMock())
|
||||
stage.nucleus.pipeline_configuration.sastoken = sastoken
|
||||
stage.nucleus.pipeline_configuration.connection_retry_interval = 1234
|
||||
# Mock flow methods
|
||||
stage.send_op_down = mocker.MagicMock()
|
||||
stage.send_event_up = mocker.MagicMock()
|
||||
|
@ -403,7 +401,7 @@ class TestSasTokenStageRunOpWithInitializePipelineOpSasTokenConfig(
|
|||
)
|
||||
def test_sets_alarm(self, mocker, stage, op, mock_alarm):
|
||||
expected_alarm_time = (
|
||||
stage.pipeline_nucleus.pipeline_configuration.sastoken.expiry_time
|
||||
stage.nucleus.pipeline_configuration.sastoken.expiry_time
|
||||
- pipeline_stages_base.SasTokenStage.DEFAULT_TOKEN_UPDATE_MARGIN
|
||||
)
|
||||
|
||||
|
@ -419,7 +417,7 @@ class TestSasTokenStageRunOpWithInitializePipelineOpSasTokenConfig(
|
|||
"Starts a background update alarm that will instead trigger after MAX_TIMEOUT seconds if the SasToken expiration time (less the Update Margin) is more than MAX_TIMEOUT seconds in the future"
|
||||
)
|
||||
def test_sets_alarm_long_expiration(self, mocker, stage, op, mock_alarm):
|
||||
token = stage.pipeline_nucleus.pipeline_configuration.sastoken
|
||||
token = stage.nucleus.pipeline_configuration.sastoken
|
||||
new_expiry = token.expiry_time + threading.TIMEOUT_MAX
|
||||
if isinstance(token, st.RenewableSasToken):
|
||||
token._expiry_time = new_expiry
|
||||
|
@ -523,7 +521,7 @@ class TestSasTokenStageRunOpWithReauthorizeConnectionOperationPipelineOpSasToken
|
|||
)
|
||||
def test_sets_alarm(self, mocker, stage, op, mock_alarm):
|
||||
expected_alarm_time = (
|
||||
stage.pipeline_nucleus.pipeline_configuration.sastoken.expiry_time
|
||||
stage.nucleus.pipeline_configuration.sastoken.expiry_time
|
||||
- pipeline_stages_base.SasTokenStage.DEFAULT_TOKEN_UPDATE_MARGIN
|
||||
)
|
||||
|
||||
|
@ -539,7 +537,7 @@ class TestSasTokenStageRunOpWithReauthorizeConnectionOperationPipelineOpSasToken
|
|||
"Starts a background update alarm that will instead trigger after MAX_TIMEOUT seconds if the SasToken expiration time (less the Update Margin) is more than MAX_TIMEOUT seconds in the future"
|
||||
)
|
||||
def test_sets_alarm_long_expiration(self, mocker, stage, op, mock_alarm):
|
||||
token = stage.pipeline_nucleus.pipeline_configuration.sastoken
|
||||
token = stage.nucleus.pipeline_configuration.sastoken
|
||||
new_expiry = token.expiry_time + threading.TIMEOUT_MAX
|
||||
if isinstance(token, st.RenewableSasToken):
|
||||
token._expiry_time = new_expiry
|
||||
|
@ -675,10 +673,10 @@ class TestSasTokenStageOCCURRENCEUpdateAlarmExpiresRenewToken(SasTokenStageTestC
|
|||
stage.run_op(init_op)
|
||||
|
||||
# Set connected state
|
||||
stage.pipeline_nucleus.connected = connected
|
||||
stage.nucleus.connected = connected
|
||||
|
||||
# Token has not been refreshed
|
||||
token = stage.pipeline_nucleus.pipeline_configuration.sastoken
|
||||
token = stage.nucleus.pipeline_configuration.sastoken
|
||||
assert token.refresh.call_count == 0
|
||||
assert mock_alarm.call_count == 1
|
||||
|
||||
|
@ -704,10 +702,10 @@ class TestSasTokenStageOCCURRENCEUpdateAlarmExpiresRenewToken(SasTokenStageTestC
|
|||
stage.run_op(init_op)
|
||||
|
||||
# Set connected state
|
||||
stage.pipeline_nucleus.connected = connected
|
||||
stage.nucleus.connected = connected
|
||||
|
||||
# Mock refresh
|
||||
token = stage.pipeline_nucleus.pipeline_configuration.sastoken
|
||||
token = stage.nucleus.pipeline_configuration.sastoken
|
||||
refresh_failure = st.SasTokenError()
|
||||
token.refresh = mocker.MagicMock(side_effect=refresh_failure)
|
||||
assert token.refresh.call_count == 0
|
||||
|
@ -737,7 +735,7 @@ class TestSasTokenStageOCCURRENCEUpdateAlarmExpiresRenewToken(SasTokenStageTestC
|
|||
# Set connected state and mock timer
|
||||
mock_timer = mocker.MagicMock()
|
||||
stage._reauth_retry_timer = mock_timer
|
||||
stage.pipeline_nucleus.connected = connected
|
||||
stage.nucleus.connected = connected
|
||||
|
||||
# Call alarm complete callback (as if alarm expired)
|
||||
on_alarm_complete = mock_alarm.call_args[0][1]
|
||||
|
@ -752,7 +750,7 @@ class TestSasTokenStageOCCURRENCEUpdateAlarmExpiresRenewToken(SasTokenStageTestC
|
|||
)
|
||||
def test_when_pipeline_connected(self, mocker, stage, init_op, mock_alarm):
|
||||
# Apply the alarm and set stage as connected
|
||||
stage.pipeline_nucleus.connected = True
|
||||
stage.nucleus.connected = True
|
||||
stage.run_op(init_op)
|
||||
|
||||
# Only the InitializePipeline init_op has been sent down
|
||||
|
@ -760,7 +758,7 @@ class TestSasTokenStageOCCURRENCEUpdateAlarmExpiresRenewToken(SasTokenStageTestC
|
|||
assert stage.send_op_down.call_args == mocker.call(init_op)
|
||||
|
||||
# Pipeline is still connected
|
||||
assert stage.pipeline_nucleus.connected is True
|
||||
assert stage.nucleus.connected is True
|
||||
|
||||
# Call alarm complete callback (as if alarm expired)
|
||||
assert mock_alarm.call_count == 1
|
||||
|
@ -778,7 +776,7 @@ class TestSasTokenStageOCCURRENCEUpdateAlarmExpiresRenewToken(SasTokenStageTestC
|
|||
)
|
||||
def test_when_pipeline_not_connected(self, mocker, stage, init_op, mock_alarm):
|
||||
# Apply the alarm and set stage as connected
|
||||
stage.pipeline_nucleus.connected = False
|
||||
stage.nucleus.connected = False
|
||||
stage.run_op(init_op)
|
||||
|
||||
# Only the InitializePipeline init_op has been sent down
|
||||
|
@ -786,7 +784,7 @@ class TestSasTokenStageOCCURRENCEUpdateAlarmExpiresRenewToken(SasTokenStageTestC
|
|||
assert stage.send_op_down.call_args == mocker.call(init_op)
|
||||
|
||||
# Pipeline is still NOT connected
|
||||
assert stage.pipeline_nucleus.connected is False
|
||||
assert stage.nucleus.connected is False
|
||||
|
||||
# Call alarm complete callback (as if alarm expired)
|
||||
on_alarm_complete = mock_alarm.call_args[0][1]
|
||||
|
@ -808,10 +806,10 @@ class TestSasTokenStageOCCURRENCEUpdateAlarmExpiresRenewToken(SasTokenStageTestC
|
|||
# I am sorry for this test length, but IDK how else to test this...
|
||||
# ... other than throwing everything at it at once
|
||||
def test_new_alarm(self, mocker, stage, init_op, mock_alarm, connected):
|
||||
token = stage.pipeline_nucleus.pipeline_configuration.sastoken
|
||||
token = stage.nucleus.pipeline_configuration.sastoken
|
||||
|
||||
# Set connected state
|
||||
stage.pipeline_nucleus.connected = connected
|
||||
stage.nucleus.connected = connected
|
||||
|
||||
# Apply the alarm
|
||||
stage.run_op(init_op)
|
||||
|
@ -847,7 +845,7 @@ class TestSasTokenStageOCCURRENCEUpdateAlarmExpiresRenewToken(SasTokenStageTestC
|
|||
# Another alarm was created and started for the expected time
|
||||
assert mock_alarm.call_count == 2
|
||||
expected_alarm_time = (
|
||||
stage.pipeline_nucleus.pipeline_configuration.sastoken.expiry_time
|
||||
stage.nucleus.pipeline_configuration.sastoken.expiry_time
|
||||
- pipeline_stages_base.SasTokenStage.DEFAULT_TOKEN_UPDATE_MARGIN
|
||||
)
|
||||
assert mock_alarm.call_args[0][0] == expected_alarm_time
|
||||
|
@ -886,14 +884,14 @@ class TestSasTokenStageOCCURRENCEUpdateAlarmExpiresRenewToken(SasTokenStageTestC
|
|||
# I am sorry for this test length, but IDK how else to test this...
|
||||
# ... other than throwing everything at it at once
|
||||
def test_new_alarm_long_expiry(self, mocker, stage, init_op, mock_alarm, connected):
|
||||
token = stage.pipeline_nucleus.pipeline_configuration.sastoken
|
||||
token = stage.nucleus.pipeline_configuration.sastoken
|
||||
# Manually change the token TTL and expiry time to exceed max timeout
|
||||
# Note that time.time() is implicitly mocked to return a constant value
|
||||
token.ttl = threading.TIMEOUT_MAX + 3600
|
||||
token._expiry_time = int(time.time() + token.ttl)
|
||||
|
||||
# Set connected state
|
||||
stage.pipeline_nucleus.connected = connected
|
||||
stage.nucleus.connected = connected
|
||||
|
||||
# Apply the alarm
|
||||
stage.run_op(init_op)
|
||||
|
@ -981,7 +979,7 @@ class TestSasTokenStageOCCURRENCEUpdateAlarmExpiresReplaceToken(SasTokenStageTes
|
|||
)
|
||||
def test_sends_event(self, stage, init_op, mock_alarm, connected):
|
||||
# Set connected state
|
||||
stage.pipeline_nucleus.connected = connected
|
||||
stage.nucleus.connected = connected
|
||||
# Apply the alarm
|
||||
stage.run_op(init_op)
|
||||
# Alarm was created
|
||||
|
@ -1035,8 +1033,8 @@ class SasTokenStageOCCURRENCEReauthorizeConnectionOperationFailsTests(SasTokenSt
|
|||
assert stage.report_background_exception.call_count == 0
|
||||
|
||||
# Set the connection state and retry feature
|
||||
stage.pipeline_nucleus.connected = connected
|
||||
stage.pipeline_nucleus.pipeline_configuration.connection_retry = connection_retry
|
||||
stage.nucleus.connected = connected
|
||||
stage.nucleus.pipeline_configuration.connection_retry = connection_retry
|
||||
|
||||
# Complete ReauthorizeConnectionOperation with error
|
||||
reauth_op.complete(error=arbitrary_exception)
|
||||
|
@ -1049,8 +1047,8 @@ class SasTokenStageOCCURRENCEReauthorizeConnectionOperationFailsTests(SasTokenSt
|
|||
"Starts a reauth retry timer for the connection retry interval if the pipeline is not connected and connection retry is enabled on the pipeline"
|
||||
)
|
||||
def test_starts_retry_timer(self, mocker, stage, reauth_op, arbitrary_exception, mock_timer):
|
||||
stage.pipeline_nucleus.connected = False
|
||||
stage.pipeline_nucleus.pipeline_configuration.connection_retry = True
|
||||
stage.nucleus.connected = False
|
||||
stage.nucleus.pipeline_configuration.connection_retry = True
|
||||
|
||||
assert mock_timer.call_count == 0
|
||||
|
||||
|
@ -1058,7 +1056,7 @@ class SasTokenStageOCCURRENCEReauthorizeConnectionOperationFailsTests(SasTokenSt
|
|||
|
||||
assert mock_timer.call_count == 1
|
||||
assert mock_timer.call_args == mocker.call(
|
||||
stage.pipeline_nucleus.pipeline_configuration.connection_retry_interval, mocker.ANY
|
||||
stage.nucleus.pipeline_configuration.connection_retry_interval, mocker.ANY
|
||||
)
|
||||
assert mock_timer.return_value.start.call_count == 1
|
||||
assert mock_timer.return_value.start.call_args == mocker.call()
|
||||
|
@ -1074,7 +1072,7 @@ class TestSasTokenStageOCCURRENCEReauthorizeConnectionOperationFromAlarmFails(
|
|||
@pytest.fixture
|
||||
def reauth_op(self, mocker, stage, mock_alarm):
|
||||
# Initialize the pipeline
|
||||
stage.pipeline_nucleus.connected = True
|
||||
stage.nucleus.connected = True
|
||||
init_op = pipeline_ops_base.InitializePipelineOperation(callback=mocker.MagicMock())
|
||||
stage.run_op(init_op)
|
||||
|
||||
|
@ -1115,8 +1113,8 @@ class TestSasTokenStageOCCURRENCEReauthRetryTimerExpires(SasTokenStageTestConfig
|
|||
self, mocker, stage, init_op, mock_alarm, mock_timer, arbitrary_exception
|
||||
):
|
||||
# Initialize stage with alarm
|
||||
stage.pipeline_nucleus.connected = True
|
||||
stage.pipeline_nucleus.pipeline_configuration.connection_retry = True
|
||||
stage.nucleus.connected = True
|
||||
stage.nucleus.pipeline_configuration.connection_retry = True
|
||||
stage.run_op(init_op)
|
||||
|
||||
# Only the InitializePipeline op has been sent down
|
||||
|
@ -1124,7 +1122,7 @@ class TestSasTokenStageOCCURRENCEReauthRetryTimerExpires(SasTokenStageTestConfig
|
|||
assert stage.send_op_down.call_args == mocker.call(init_op)
|
||||
|
||||
# Pipeline is still connected
|
||||
assert stage.pipeline_nucleus.connected is True
|
||||
assert stage.nucleus.connected is True
|
||||
|
||||
# Call alarm complete callback (as if alarm expired)
|
||||
assert mock_alarm.call_count == 1
|
||||
|
@ -1138,13 +1136,13 @@ class TestSasTokenStageOCCURRENCEReauthRetryTimerExpires(SasTokenStageTestConfig
|
|||
assert isinstance(reauth_op, pipeline_ops_base.ReauthorizeConnectionOperation)
|
||||
|
||||
# Complete the ReauthorizeConnectionOperation with failure, triggering retry
|
||||
stage.pipeline_nucleus.connected = False
|
||||
stage.nucleus.connected = False
|
||||
reauth_op.complete(error=arbitrary_exception)
|
||||
|
||||
# Call timer complete callback (as if timer expired)
|
||||
assert mock_timer.call_count == 1
|
||||
assert stage._reauth_retry_timer is mock_timer.return_value
|
||||
assert stage.pipeline_nucleus.connected is False
|
||||
assert stage.nucleus.connected is False
|
||||
on_timer_complete = mock_timer.call_args[0][1]
|
||||
on_timer_complete()
|
||||
|
||||
|
@ -1160,8 +1158,8 @@ class TestSasTokenStageOCCURRENCEReauthRetryTimerExpires(SasTokenStageTestConfig
|
|||
self, mocker, stage, init_op, mock_alarm, mock_timer, arbitrary_exception
|
||||
):
|
||||
# Initialize stage with alarm
|
||||
stage.pipeline_nucleus.connected = True
|
||||
stage.pipeline_nucleus.pipeline_configuration.connection_retry = True
|
||||
stage.nucleus.connected = True
|
||||
stage.nucleus.pipeline_configuration.connection_retry = True
|
||||
stage.run_op(init_op)
|
||||
|
||||
# Only the InitializePipeline op has been sent down
|
||||
|
@ -1169,7 +1167,7 @@ class TestSasTokenStageOCCURRENCEReauthRetryTimerExpires(SasTokenStageTestConfig
|
|||
assert stage.send_op_down.call_args == mocker.call(init_op)
|
||||
|
||||
# Pipeline is still connected
|
||||
assert stage.pipeline_nucleus.connected is True
|
||||
assert stage.nucleus.connected is True
|
||||
|
||||
# Call alarm complete callback (as if alarm expired)
|
||||
assert mock_alarm.call_count == 1
|
||||
|
@ -1183,13 +1181,13 @@ class TestSasTokenStageOCCURRENCEReauthRetryTimerExpires(SasTokenStageTestConfig
|
|||
assert isinstance(reauth_op, pipeline_ops_base.ReauthorizeConnectionOperation)
|
||||
|
||||
# Complete the ReauthorizeConnectionOperation with failure, triggering retry
|
||||
stage.pipeline_nucleus.connected = False
|
||||
stage.nucleus.connected = False
|
||||
reauth_op.complete(error=arbitrary_exception)
|
||||
|
||||
# Call timer complete callback (as if timer expired)
|
||||
assert mock_timer.call_count == 1
|
||||
assert stage._reauth_retry_timer is mock_timer.return_value
|
||||
stage.pipeline_nucleus.connected = True # Re-establish before timer completes
|
||||
stage.nucleus.connected = True # Re-establish before timer completes
|
||||
on_timer_complete = mock_timer.call_args[0][1]
|
||||
on_timer_complete()
|
||||
|
||||
|
@ -1206,7 +1204,7 @@ class TestSasTokenStageOCCURRENCEReauthorizeConnectionOperationFromTimerFails(
|
|||
@pytest.fixture
|
||||
def reauth_op(self, mocker, stage, mock_alarm, mock_timer, arbitrary_exception):
|
||||
# Initialize the pipeline
|
||||
stage.pipeline_nucleus.connected = True
|
||||
stage.nucleus.connected = True
|
||||
init_op = pipeline_ops_base.InitializePipelineOperation(callback=mocker.MagicMock())
|
||||
stage.run_op(init_op)
|
||||
|
||||
|
@ -1221,13 +1219,13 @@ class TestSasTokenStageOCCURRENCEReauthorizeConnectionOperationFromTimerFails(
|
|||
assert isinstance(reauth_op, pipeline_ops_base.ReauthorizeConnectionOperation)
|
||||
|
||||
# Complete the ReauthorizeConnectionOperation with failure, triggering retry
|
||||
stage.pipeline_nucleus.connected = False
|
||||
stage.nucleus.connected = False
|
||||
reauth_op.complete(error=arbitrary_exception)
|
||||
|
||||
# Call timer complete callback (as if timer expired)
|
||||
assert mock_timer.call_count == 1
|
||||
assert stage._reauth_retry_timer is mock_timer.return_value
|
||||
assert stage.pipeline_nucleus.connected is False
|
||||
assert stage.nucleus.connected is False
|
||||
assert stage.report_background_exception.call_count == 1
|
||||
on_timer_complete = mock_timer.call_args[0][1]
|
||||
on_timer_complete()
|
||||
|
@ -1267,7 +1265,7 @@ class AutoConnectStageTestConfig(object):
|
|||
@pytest.fixture
|
||||
def stage(self, mocker, pl_config, cls_type, init_kwargs):
|
||||
stage = cls_type(**init_kwargs)
|
||||
stage.pipeline_nucleus = pipeline_nucleus.PipelineNucleus(pipeline_configuration=pl_config)
|
||||
stage.nucleus = pipeline_nucleus.PipelineNucleus(pipeline_configuration=pl_config)
|
||||
# Mock flow methods
|
||||
stage.send_op_down = mocker.MagicMock()
|
||||
stage.send_event_up = mocker.MagicMock()
|
||||
|
@ -1312,7 +1310,7 @@ class TestAutoConnectStageRunOpWithOpThatRequiresConnectionPipelineConnected(
|
|||
|
||||
@pytest.mark.it("Immediately sends the operation down the pipeline")
|
||||
def test_already_connected(self, mocker, stage, op):
|
||||
stage.pipeline_nucleus.connected = True
|
||||
stage.nucleus.connected = True
|
||||
|
||||
stage.run_op(op)
|
||||
|
||||
|
@ -1351,7 +1349,7 @@ class TestAutoConnectStageRunOpWithOpThatRequiresConnectionNotConnected(
|
|||
@pytest.mark.it("Sends a new ConnectOperation down the pipeline")
|
||||
def test_not_connected(self, mocker, stage, op):
|
||||
mock_connect_op = mocker.patch.object(pipeline_ops_base, "ConnectOperation").return_value
|
||||
stage.pipeline_nucleus.connected = False
|
||||
stage.nucleus.connected = False
|
||||
|
||||
stage.run_op(op)
|
||||
|
||||
|
@ -1362,7 +1360,7 @@ class TestAutoConnectStageRunOpWithOpThatRequiresConnectionNotConnected(
|
|||
"Sends the operation down the pipeline once the ConnectOperation completes successfully"
|
||||
)
|
||||
def test_connect_success(self, mocker, stage, op):
|
||||
stage.pipeline_nucleus.connected = False
|
||||
stage.nucleus.connected = False
|
||||
mocker.spy(stage, "run_op")
|
||||
|
||||
# Run the original operation
|
||||
|
@ -1384,7 +1382,7 @@ class TestAutoConnectStageRunOpWithOpThatRequiresConnectionNotConnected(
|
|||
"Completes the operation with the error from the ConnectOperation, if the ConnectOperation completes with an error"
|
||||
)
|
||||
def test_connect_failure(self, mocker, stage, op, arbitrary_exception):
|
||||
stage.pipeline_nucleus.connected = False
|
||||
stage.nucleus.connected = False
|
||||
|
||||
# Run the original operation
|
||||
stage.run_op(op)
|
||||
|
@ -1417,7 +1415,7 @@ class TestAutoConnectStageRunOpWithOpThatDoesNotRequireConnection(
|
|||
"Sends the operation down the pipeline if the pipeline is in a 'connected' state"
|
||||
)
|
||||
def test_connected(self, mocker, stage, op):
|
||||
stage.pipeline_nucleus.connected = True
|
||||
stage.nucleus.connected = True
|
||||
|
||||
stage.run_op(op)
|
||||
assert stage.send_op_down.call_count == 1
|
||||
|
@ -1427,7 +1425,7 @@ class TestAutoConnectStageRunOpWithOpThatDoesNotRequireConnection(
|
|||
"Sends the operation down the pipeline if the pipeline is in a 'disconnected' state"
|
||||
)
|
||||
def test_disconnected(self, mocker, stage, op):
|
||||
assert not stage.pipeline_nucleus.connected
|
||||
assert not stage.nucleus.connected
|
||||
|
||||
stage.run_op(op)
|
||||
assert stage.send_op_down.call_count == 1
|
||||
|
@ -1458,7 +1456,7 @@ class TestAutoConnectStageRunOpWithAutoConnectDisabled(
|
|||
"Sends the operation down the pipeline if the pipeline is in a 'connected' state"
|
||||
)
|
||||
def test_connected(self, mocker, stage, op):
|
||||
stage.pipeline_nucleus.connected = True
|
||||
stage.nucleus.connected = True
|
||||
|
||||
stage.run_op(op)
|
||||
assert stage.send_op_down.call_count == 1
|
||||
|
@ -1468,7 +1466,7 @@ class TestAutoConnectStageRunOpWithAutoConnectDisabled(
|
|||
"Sends the operation down the pipeline if the pipeline is in a 'disconnected' state"
|
||||
)
|
||||
def test_disconnected(self, mocker, stage, op):
|
||||
assert not stage.pipeline_nucleus.connected
|
||||
assert not stage.nucleus.connected
|
||||
|
||||
stage.run_op(op)
|
||||
assert stage.send_op_down.call_count == 1
|
||||
|
@ -1499,9 +1497,7 @@ class ConnectionLockStageTestConfig(object):
|
|||
@pytest.fixture
|
||||
def stage(self, mocker, cls_type, init_kwargs):
|
||||
stage = cls_type(**init_kwargs)
|
||||
stage.pipeline_nucleus = pipeline_nucleus.PipelineNucleus(
|
||||
pipeline_configuration=mocker.MagicMock()
|
||||
)
|
||||
stage.nucleus = pipeline_nucleus.PipelineNucleus(pipeline_configuration=mocker.MagicMock())
|
||||
stage.send_op_down = mocker.MagicMock()
|
||||
stage.send_event_up = mocker.MagicMock()
|
||||
mocker.spy(stage, "report_background_exception")
|
||||
|
@ -1541,7 +1537,7 @@ class TestConnectionLockStageRunOpWithConnectOpWhileUnblocked(
|
|||
|
||||
@pytest.mark.it("Completes the operation immediately if the pipeline is already connected")
|
||||
def test_already_connected(self, mocker, stage, op):
|
||||
stage.pipeline_nucleus.connected = True
|
||||
stage.nucleus.connected = True
|
||||
|
||||
# Run the operation
|
||||
stage.run_op(op)
|
||||
|
@ -1557,7 +1553,7 @@ class TestConnectionLockStageRunOpWithConnectOpWhileUnblocked(
|
|||
"Puts the stage in a blocking state and sends the operation down the pipeline, if the pipeline is not currently connected"
|
||||
)
|
||||
def test_not_connected(self, mocker, stage, op):
|
||||
stage.pipeline_nucleus.connected = False
|
||||
stage.nucleus.connected = False
|
||||
|
||||
# Stage is not blocked
|
||||
assert not stage.blocked
|
||||
|
@ -1588,7 +1584,7 @@ class TestConnectionLockStageRunOpWithDisconnectOpWhileUnblocked(
|
|||
|
||||
@pytest.mark.it("Completes the operation immediately if the pipeline is already disconnected")
|
||||
def test_already_disconnected(self, mocker, stage, op):
|
||||
stage.pipeline_nucleus.connected = False
|
||||
stage.nucleus.connected = False
|
||||
|
||||
# Run the operation
|
||||
stage.run_op(op)
|
||||
|
@ -1604,7 +1600,7 @@ class TestConnectionLockStageRunOpWithDisconnectOpWhileUnblocked(
|
|||
"Puts the stage in a blocking state and sends the operation down the pipeline, if the pipeline is currently connected"
|
||||
)
|
||||
def test_connected(self, mocker, stage, op):
|
||||
stage.pipeline_nucleus.connected = True
|
||||
stage.nucleus.connected = True
|
||||
|
||||
# Stage is not blocked
|
||||
assert not stage.blocked
|
||||
|
@ -1642,7 +1638,7 @@ class TestConnectionLockStageRunOpWithReconnectOpWhileUnblocked(
|
|||
],
|
||||
)
|
||||
def test_not_connected(self, mocker, connected, stage, op):
|
||||
stage.pipeline_nucleus.connected = connected
|
||||
stage.nucleus.connected = connected
|
||||
|
||||
# Stage is not blocked
|
||||
assert not stage.blocked
|
||||
|
@ -1680,7 +1676,7 @@ class TestConnectionLockStageRunOpWithArbitraryOpWhileUnblocked(
|
|||
],
|
||||
)
|
||||
def test_sends_down(self, mocker, connected, stage, op):
|
||||
stage.pipeline_nucleus.connected = connected
|
||||
stage.nucleus.connected = connected
|
||||
|
||||
stage.run_op(op)
|
||||
|
||||
|
@ -1697,9 +1693,7 @@ class TestConnectionLockStageRunOpWhileBlocked(ConnectionLockStageTestConfig, St
|
|||
@pytest.fixture
|
||||
def stage(self, mocker, init_kwargs, blocking_op):
|
||||
stage = pipeline_stages_base.ConnectionLockStage(**init_kwargs)
|
||||
stage.pipeline_nucleus = pipeline_nucleus.PipelineNucleus(
|
||||
pipeline_configuration=mocker.MagicMock()
|
||||
)
|
||||
stage.nucleus = pipeline_nucleus.PipelineNucleus(pipeline_configuration=mocker.MagicMock())
|
||||
stage.send_op_down = mocker.MagicMock()
|
||||
stage.send_event_up = mocker.MagicMock()
|
||||
mocker.spy(stage, "report_background_exception")
|
||||
|
@ -1753,9 +1747,9 @@ class TestConnectionLockStageRunOpWhileBlocked(ConnectionLockStageTestConfig, St
|
|||
# Set the pipeline connection state to be the one desired by the operation.
|
||||
# If the stage were unblocked, this would lead to immediate completion of the op.
|
||||
if isinstance(op, pipeline_ops_base.ConnectOperation):
|
||||
stage.pipeline_nucleus.connected = True
|
||||
stage.nucleus.connected = True
|
||||
else:
|
||||
stage.pipeline_nucleus.connected = False
|
||||
stage.nucleus.connected = False
|
||||
|
||||
assert stage.queue.empty()
|
||||
|
||||
|
@ -1812,9 +1806,7 @@ class ConnectionLockStageBlockingOpCompletedTestConfig(ConnectionLockStageTestCo
|
|||
@pytest.fixture
|
||||
def blocked_stage(self, mocker, init_kwargs, blocking_op, pending_ops):
|
||||
stage = pipeline_stages_base.ConnectionLockStage(**init_kwargs)
|
||||
stage.pipeline_nucleus = pipeline_nucleus.PipelineNucleus(
|
||||
pipeline_configuration=mocker.MagicMock()
|
||||
)
|
||||
stage.nucleus = pipeline_nucleus.PipelineNucleus(pipeline_configuration=mocker.MagicMock())
|
||||
stage.send_op_down = mocker.MagicMock()
|
||||
stage.send_event_up = mocker.MagicMock()
|
||||
mocker.spy(stage, "report_background_exception")
|
||||
|
@ -1823,9 +1815,9 @@ class ConnectionLockStageBlockingOpCompletedTestConfig(ConnectionLockStageTestCo
|
|||
|
||||
# Set the pipeline connection state to ensure op will block
|
||||
if isinstance(blocking_op, pipeline_ops_base.ConnectOperation):
|
||||
stage.pipeline_nucleus.connected = False
|
||||
stage.nucleus.connected = False
|
||||
else:
|
||||
stage.pipeline_nucleus.connected = True
|
||||
stage.nucleus.connected = True
|
||||
|
||||
# Block the stage by running the blocking operation
|
||||
stage.run_op(blocking_op)
|
||||
|
@ -1905,7 +1897,7 @@ class TestConnectionLockStageBlockingOpCompletedNoError(
|
|||
op5 = ArbitraryOperation(callback=mocker.MagicMock())
|
||||
|
||||
# Block the stage on op1
|
||||
assert not stage.pipeline_nucleus.connected
|
||||
assert not stage.nucleus.connected
|
||||
assert not stage.blocked
|
||||
stage.run_op(op1)
|
||||
assert stage.blocked
|
||||
|
@ -1926,7 +1918,7 @@ class TestConnectionLockStageBlockingOpCompletedNoError(
|
|||
op1.complete()
|
||||
|
||||
# Manually set pipeline to be connected (this doesn't happen naturally due to the scope of this test)
|
||||
stage.pipeline_nucleus.connected = True
|
||||
stage.nucleus.connected = True
|
||||
|
||||
# op2 and op3 have now been passed down, but no others
|
||||
assert stage.send_op_down.call_count == 3
|
||||
|
@ -2028,9 +2020,7 @@ class CoordinateRequestAndResponseStageTestConfig(object):
|
|||
@pytest.fixture
|
||||
def stage(self, mocker, cls_type, init_kwargs):
|
||||
stage = cls_type(**init_kwargs)
|
||||
stage.pipeline_nucleus = pipeline_nucleus.PipelineNucleus(
|
||||
pipeline_configuration=mocker.MagicMock()
|
||||
)
|
||||
stage.nucleus = pipeline_nucleus.PipelineNucleus(pipeline_configuration=mocker.MagicMock())
|
||||
stage.send_op_down = mocker.MagicMock()
|
||||
stage.send_event_up = mocker.MagicMock()
|
||||
mocker.spy(stage, "report_background_exception")
|
||||
|
@ -2233,9 +2223,7 @@ class TestCoordinateRequestAndResponseStageHandlePipelineEventWithResponseEvent(
|
|||
@pytest.fixture
|
||||
def stage(self, mocker, cls_type, init_kwargs, fake_uuid, pending_op):
|
||||
stage = cls_type(**init_kwargs)
|
||||
stage.pipeline_nucleus = pipeline_nucleus.PipelineNucleus(
|
||||
pipeline_configuration=mocker.MagicMock()
|
||||
)
|
||||
stage.nucleus = pipeline_nucleus.PipelineNucleus(pipeline_configuration=mocker.MagicMock())
|
||||
stage.send_event_up = mocker.MagicMock()
|
||||
stage.send_op_down = mocker.MagicMock()
|
||||
mocker.spy(stage, "report_background_exception")
|
||||
|
@ -2317,9 +2305,7 @@ class TestCoordinateRequestAndResponseStageHandlePipelineEventWithConnectedEvent
|
|||
@pytest.fixture
|
||||
def stage(self, mocker, cls_type, init_kwargs):
|
||||
stage = cls_type(**init_kwargs)
|
||||
stage.pipeline_nucleus = pipeline_nucleus.PipelineNucleus(
|
||||
pipeline_configuration=mocker.MagicMock()
|
||||
)
|
||||
stage.nucleus = pipeline_nucleus.PipelineNucleus(pipeline_configuration=mocker.MagicMock())
|
||||
stage.send_event_up = mocker.MagicMock()
|
||||
stage.send_op_down = mocker.MagicMock()
|
||||
mocker.spy(stage, "report_background_exception")
|
||||
|
@ -2577,9 +2563,7 @@ class OpTimeoutStageTestConfig(object):
|
|||
@pytest.fixture
|
||||
def stage(self, mocker, cls_type, init_kwargs):
|
||||
stage = cls_type(**init_kwargs)
|
||||
stage.pipeline_nucleus = pipeline_nucleus.PipelineNucleus(
|
||||
pipeline_configuration=mocker.MagicMock()
|
||||
)
|
||||
stage.nucleus = pipeline_nucleus.PipelineNucleus(pipeline_configuration=mocker.MagicMock())
|
||||
stage.send_op_down = mocker.MagicMock()
|
||||
stage.send_event_up = mocker.MagicMock()
|
||||
mocker.spy(stage, "report_background_exception")
|
||||
|
@ -2740,9 +2724,7 @@ class RetryStageTestConfig(object):
|
|||
@pytest.fixture
|
||||
def stage(self, mocker, cls_type, init_kwargs):
|
||||
stage = cls_type(**init_kwargs)
|
||||
stage.pipeline_nucleus = pipeline_nucleus.PipelineNucleus(
|
||||
pipeline_configuration=mocker.MagicMock()
|
||||
)
|
||||
stage.nucleus = pipeline_nucleus.PipelineNucleus(pipeline_configuration=mocker.MagicMock())
|
||||
mocker.spy(stage, "run_op")
|
||||
stage.send_op_down = mocker.MagicMock()
|
||||
stage.send_event_up = mocker.MagicMock()
|
||||
|
@ -3137,12 +3119,10 @@ class ReconnectStageTestConfig(object):
|
|||
@pytest.fixture
|
||||
def stage(self, mocker, cls_type, init_kwargs):
|
||||
stage = cls_type(**init_kwargs)
|
||||
stage.pipeline_nucleus = pipeline_nucleus.PipelineNucleus(
|
||||
pipeline_configuration=mocker.MagicMock()
|
||||
)
|
||||
stage.nucleus = pipeline_nucleus.PipelineNucleus(pipeline_configuration=mocker.MagicMock())
|
||||
# Majority of tests will want connection retry enabled
|
||||
stage.pipeline_nucleus.pipeline_configuration.connection_retry = True
|
||||
stage.pipeline_nucleus.pipeline_configuration.connection_retry_interval = 1234
|
||||
stage.nucleus.pipeline_configuration.connection_retry = True
|
||||
stage.nucleus.pipeline_configuration.connection_retry_interval = 1234
|
||||
mocker.spy(stage, "run_op")
|
||||
stage.send_op_down = mocker.MagicMock()
|
||||
stage.send_event_up = mocker.MagicMock()
|
||||
|
@ -3832,10 +3812,8 @@ class TestReconnectStageRunOpWhileConnectionRetryDisabled(
|
|||
def stage(self, mocker, cls_type, init_kwargs):
|
||||
# Override fixture to set connect retry to False
|
||||
stage = cls_type(**init_kwargs)
|
||||
stage.pipeline_nucleus = pipeline_nucleus.PipelineNucleus(
|
||||
pipeline_configuration=mocker.MagicMock()
|
||||
)
|
||||
stage.pipeline_nucleus.pipeline_configuration.connection_retry = False
|
||||
stage.nucleus = pipeline_nucleus.PipelineNucleus(pipeline_configuration=mocker.MagicMock())
|
||||
stage.nucleus.pipeline_configuration.connection_retry = False
|
||||
mocker.spy(stage, "run_op")
|
||||
stage.send_op_down = mocker.MagicMock()
|
||||
stage.send_event_up = mocker.MagicMock()
|
||||
|
@ -4070,10 +4048,8 @@ class TestReconnectStageHandlePipelineEventConnectionRetryDisabled(
|
|||
def stage(self, mocker, cls_type, init_kwargs):
|
||||
# Override fixture to set connect retry to False
|
||||
stage = cls_type(**init_kwargs)
|
||||
stage.pipeline_nucleus = pipeline_nucleus.PipelineNucleus(
|
||||
pipeline_configuration=mocker.MagicMock()
|
||||
)
|
||||
stage.pipeline_nucleus.pipeline_configuration.connection_retry = False
|
||||
stage.nucleus = pipeline_nucleus.PipelineNucleus(pipeline_configuration=mocker.MagicMock())
|
||||
stage.nucleus.pipeline_configuration.connection_retry = False
|
||||
mocker.spy(stage, "run_op")
|
||||
stage.send_op_down = mocker.MagicMock()
|
||||
stage.send_event_up = mocker.MagicMock()
|
||||
|
@ -4224,7 +4200,7 @@ class TestReconnectStageOCCURRENCEReconnectTimerExpires(ReconnectStageTestConfig
|
|||
assert stage.send_op_down.call_count == 0
|
||||
assert mock_timer.call_count == 1
|
||||
assert mock_timer.call_args == mocker.call(
|
||||
stage.pipeline_nucleus.pipeline_configuration.connection_retry_interval, mocker.ANY
|
||||
stage.nucleus.pipeline_configuration.connection_retry_interval, mocker.ANY
|
||||
)
|
||||
assert stage.reconnect_timer is mock_timer.return_value
|
||||
assert stage.reconnect_timer is not old_reconnect_timer
|
||||
|
@ -4422,7 +4398,7 @@ class TestReconnectStageOCCURRENCEReconnectionCompletes(ReconnectStageTestConfig
|
|||
|
||||
assert mock_timer.call_count == 1
|
||||
assert mock_timer.call_args == mocker.call(
|
||||
stage.pipeline_nucleus.pipeline_configuration.connection_retry_interval, mocker.ANY
|
||||
stage.nucleus.pipeline_configuration.connection_retry_interval, mocker.ANY
|
||||
)
|
||||
assert stage.reconnect_timer is mock_timer.return_value
|
||||
|
||||
|
|
|
@ -54,10 +54,8 @@ class HTTPTransportStageTestConfig(object):
|
|||
@pytest.fixture
|
||||
def stage(self, mocker, cls_type, init_kwargs):
|
||||
stage = cls_type(**init_kwargs)
|
||||
stage.pipeline_nucleus = pipeline_nucleus.PipelineNucleus(
|
||||
pipeline_configuration=mocker.MagicMock()
|
||||
)
|
||||
stage.pipeline_nucleus.pipeline_configuration.hostname = "some.fake-host.name.com"
|
||||
stage.nucleus = pipeline_nucleus.PipelineNucleus(pipeline_configuration=mocker.MagicMock())
|
||||
stage.nucleus.pipeline_configuration.hostname = "some.fake-host.name.com"
|
||||
stage.send_op_down = mocker.MagicMock()
|
||||
stage.send_event_up = mocker.MagicMock()
|
||||
mocker.spy(stage, "report_background_exception")
|
||||
|
@ -100,13 +98,13 @@ class TestHTTPTransportStageRunOpCalledWithInitializePipelineOperation(
|
|||
)
|
||||
def test_creates_transport(self, mocker, stage, op, mock_transport, gateway_hostname):
|
||||
# Setup pipeline config
|
||||
stage.pipeline_nucleus.pipeline_configuration.gateway_hostname = gateway_hostname
|
||||
stage.nucleus.pipeline_configuration.gateway_hostname = gateway_hostname
|
||||
|
||||
# NOTE: if more of this type of logic crops up, consider splitting this test up
|
||||
if stage.pipeline_nucleus.pipeline_configuration.gateway_hostname:
|
||||
expected_hostname = stage.pipeline_nucleus.pipeline_configuration.gateway_hostname
|
||||
if stage.nucleus.pipeline_configuration.gateway_hostname:
|
||||
expected_hostname = stage.nucleus.pipeline_configuration.gateway_hostname
|
||||
else:
|
||||
expected_hostname = stage.pipeline_nucleus.pipeline_configuration.hostname
|
||||
expected_hostname = stage.nucleus.pipeline_configuration.hostname
|
||||
|
||||
assert stage.transport is None
|
||||
|
||||
|
@ -115,10 +113,10 @@ class TestHTTPTransportStageRunOpCalledWithInitializePipelineOperation(
|
|||
assert mock_transport.call_count == 1
|
||||
assert mock_transport.call_args == mocker.call(
|
||||
hostname=expected_hostname,
|
||||
server_verification_cert=stage.pipeline_nucleus.pipeline_configuration.server_verification_cert,
|
||||
x509_cert=stage.pipeline_nucleus.pipeline_configuration.x509,
|
||||
cipher=stage.pipeline_nucleus.pipeline_configuration.cipher,
|
||||
proxy_options=stage.pipeline_nucleus.pipeline_configuration.proxy_options,
|
||||
server_verification_cert=stage.nucleus.pipeline_configuration.server_verification_cert,
|
||||
x509_cert=stage.nucleus.pipeline_configuration.x509,
|
||||
cipher=stage.nucleus.pipeline_configuration.cipher,
|
||||
proxy_options=stage.nucleus.pipeline_configuration.proxy_options,
|
||||
)
|
||||
assert stage.transport is mock_transport.return_value
|
||||
|
||||
|
@ -143,9 +141,7 @@ class HTTPTransportStageTestConfigComplex(HTTPTransportStageTestConfig):
|
|||
@pytest.fixture
|
||||
def stage(self, mocker, request, cls_type, init_kwargs, mock_transport):
|
||||
stage = cls_type(**init_kwargs)
|
||||
stage.pipeline_nucleus = pipeline_nucleus.PipelineNucleus(
|
||||
pipeline_configuration=mocker.MagicMock()
|
||||
)
|
||||
stage.nucleus = pipeline_nucleus.PipelineNucleus(pipeline_configuration=mocker.MagicMock())
|
||||
stage.send_op_down = mocker.MagicMock()
|
||||
stage.send_event_up = mocker.MagicMock()
|
||||
mocker.spy(stage, "report_background_exception")
|
||||
|
@ -196,7 +192,7 @@ class TestHTTPTransportStageRunOpCalledWithHTTPRequestAndResponseOperation(
|
|||
)
|
||||
def test_headers_with_sas_auth(self, mocker, stage, op):
|
||||
# A SasToken is set on the pipeline, but Authorization headers have not yet been set
|
||||
assert stage.pipeline_nucleus.pipeline_configuration.sastoken is not None
|
||||
assert stage.nucleus.pipeline_configuration.sastoken is not None
|
||||
assert op.headers.get("Authorization") is None
|
||||
|
||||
stage.run_op(op)
|
||||
|
@ -204,16 +200,14 @@ class TestHTTPTransportStageRunOpCalledWithHTTPRequestAndResponseOperation(
|
|||
# Need to get the headers sent to the transport, not provided by the op, due to a
|
||||
# deep copy that occurs
|
||||
headers = stage.transport.request.call_args[1]["headers"]
|
||||
assert headers["Authorization"] == str(
|
||||
stage.pipeline_nucleus.pipeline_configuration.sastoken
|
||||
)
|
||||
assert headers["Authorization"] == str(stage.nucleus.pipeline_configuration.sastoken)
|
||||
|
||||
@pytest.mark.it(
|
||||
"Does NOT add the 'Authorization' header to the request if NOT using SAS-based authentication"
|
||||
)
|
||||
def test_headers_with_no_sas(self, mocker, stage, op):
|
||||
# NO SasToken is set on the pipeline, and Authorization headers have not yet been set
|
||||
stage.pipeline_nucleus.pipeline_configuration.sastoken = None
|
||||
stage.nucleus.pipeline_configuration.sastoken = None
|
||||
assert op.headers.get("Authorization") is None
|
||||
|
||||
stage.run_op(op)
|
||||
|
|
|
@ -65,10 +65,8 @@ class MQTTTransportStageTestConfig(object):
|
|||
@pytest.fixture
|
||||
def stage(self, mocker, cls_type, init_kwargs):
|
||||
stage = cls_type(**init_kwargs)
|
||||
stage.pipeline_nucleus = pipeline_nucleus.PipelineNucleus(
|
||||
pipeline_configuration=mocker.MagicMock()
|
||||
)
|
||||
stage.pipeline_nucleus.pipeline_configuration.hostname = "some.fake-host.name.com"
|
||||
stage.nucleus = pipeline_nucleus.PipelineNucleus(pipeline_configuration=mocker.MagicMock())
|
||||
stage.nucleus.pipeline_configuration.hostname = "some.fake-host.name.com"
|
||||
stage.send_op_down = mocker.MagicMock()
|
||||
stage.send_event_up = mocker.MagicMock()
|
||||
mocker.spy(stage, "report_background_exception")
|
||||
|
@ -163,17 +161,17 @@ class TestMQTTTransportStageRunOpCalledWithInitializePipelineOperation(
|
|||
keep_alive,
|
||||
):
|
||||
# Configure websockets & cipher & keep alive
|
||||
stage.pipeline_nucleus.pipeline_configuration.websockets = websockets
|
||||
stage.pipeline_nucleus.pipeline_configuration.cipher = cipher
|
||||
stage.pipeline_nucleus.pipeline_configuration.proxy_options = proxy_options
|
||||
stage.pipeline_nucleus.pipeline_configuration.gateway_hostname = gateway_hostname
|
||||
stage.pipeline_nucleus.pipeline_configuration.keep_alive = keep_alive
|
||||
stage.nucleus.pipeline_configuration.websockets = websockets
|
||||
stage.nucleus.pipeline_configuration.cipher = cipher
|
||||
stage.nucleus.pipeline_configuration.proxy_options = proxy_options
|
||||
stage.nucleus.pipeline_configuration.gateway_hostname = gateway_hostname
|
||||
stage.nucleus.pipeline_configuration.keep_alive = keep_alive
|
||||
|
||||
# NOTE: if more of this type of logic crops up, consider splitting this test up
|
||||
if stage.pipeline_nucleus.pipeline_configuration.gateway_hostname:
|
||||
expected_hostname = stage.pipeline_nucleus.pipeline_configuration.gateway_hostname
|
||||
if stage.nucleus.pipeline_configuration.gateway_hostname:
|
||||
expected_hostname = stage.nucleus.pipeline_configuration.gateway_hostname
|
||||
else:
|
||||
expected_hostname = stage.pipeline_nucleus.pipeline_configuration.hostname
|
||||
expected_hostname = stage.nucleus.pipeline_configuration.hostname
|
||||
|
||||
assert stage.transport is None
|
||||
|
||||
|
@ -184,8 +182,8 @@ class TestMQTTTransportStageRunOpCalledWithInitializePipelineOperation(
|
|||
client_id=op.client_id,
|
||||
hostname=expected_hostname,
|
||||
username=op.username,
|
||||
server_verification_cert=stage.pipeline_nucleus.pipeline_configuration.server_verification_cert,
|
||||
x509_cert=stage.pipeline_nucleus.pipeline_configuration.x509,
|
||||
server_verification_cert=stage.nucleus.pipeline_configuration.server_verification_cert,
|
||||
x509_cert=stage.nucleus.pipeline_configuration.x509,
|
||||
websockets=websockets,
|
||||
cipher=cipher,
|
||||
proxy_options=proxy_options,
|
||||
|
@ -233,9 +231,7 @@ class MQTTTransportStageTestConfigComplex(MQTTTransportStageTestConfig):
|
|||
@pytest.fixture
|
||||
def stage(self, mocker, cls_type, init_kwargs, mock_transport):
|
||||
stage = cls_type(**init_kwargs)
|
||||
stage.pipeline_nucleus = pipeline_nucleus.PipelineNucleus(
|
||||
pipeline_configuration=mocker.MagicMock()
|
||||
)
|
||||
stage.nucleus = pipeline_nucleus.PipelineNucleus(pipeline_configuration=mocker.MagicMock())
|
||||
stage.send_op_down = mocker.MagicMock()
|
||||
stage.send_event_up = mocker.MagicMock()
|
||||
mocker.spy(stage, "report_background_exception")
|
||||
|
@ -338,11 +334,11 @@ class TestMQTTTransportStageRunOpCalledWithConnectOperation(
|
|||
"Performs an MQTT connect via the MQTTTransport, using the PipelineNucleus' SasToken as a password, if using SAS-based authentication"
|
||||
)
|
||||
def test_mqtt_connect_sastoken(self, mocker, stage, op):
|
||||
assert stage.pipeline_nucleus.pipeline_configuration.sastoken is not None
|
||||
assert stage.nucleus.pipeline_configuration.sastoken is not None
|
||||
stage.run_op(op)
|
||||
assert stage.transport.connect.call_count == 1
|
||||
assert stage.transport.connect.call_args == mocker.call(
|
||||
password=str(stage.pipeline_nucleus.pipeline_configuration.sastoken)
|
||||
password=str(stage.nucleus.pipeline_configuration.sastoken)
|
||||
)
|
||||
|
||||
@pytest.mark.it(
|
||||
|
@ -350,7 +346,7 @@ class TestMQTTTransportStageRunOpCalledWithConnectOperation(
|
|||
)
|
||||
def test_mqtt_connect_no_sastoken(self, mocker, stage, op):
|
||||
# no token
|
||||
stage.pipeline_nucleus.pipeline_configuration.sastoken = None
|
||||
stage.nucleus.pipeline_configuration.sastoken = None
|
||||
stage.run_op(op)
|
||||
assert stage.transport.connect.call_count == 1
|
||||
assert stage.transport.connect.call_args == mocker.call(password=None)
|
||||
|
@ -1176,7 +1172,7 @@ class TestMQTTTransportStageOnDisconnectedUnexpectedNoPendingConnectionOp(
|
|||
def test_inflight_no_retry(self, mocker, stage, cause):
|
||||
stage.transport._op_manager = mocker.MagicMock()
|
||||
mock_cancel = stage.transport._op_manager.cancel_all_operations
|
||||
stage.pipeline_nucleus.pipeline_configuration.connection_retry = False
|
||||
stage.nucleus.pipeline_configuration.connection_retry = False
|
||||
assert stage._pending_connection_op is None
|
||||
assert mock_cancel.call_count == 0
|
||||
|
||||
|
@ -1192,7 +1188,7 @@ class TestMQTTTransportStageOnDisconnectedUnexpectedNoPendingConnectionOp(
|
|||
def test_inflight_unexpected_with_retry(self, mocker, stage, cause):
|
||||
stage.transport._op_manager = mocker.MagicMock()
|
||||
mock_cancel = stage.transport._op_manager.cancel_all_operations
|
||||
stage.pipeline_nucleus.pipeline_configuration.connection_retry = True
|
||||
stage.nucleus.pipeline_configuration.connection_retry = True
|
||||
assert stage._pending_connection_op is None
|
||||
assert mock_cancel.call_count == 0
|
||||
|
||||
|
@ -1305,7 +1301,7 @@ class TestMQTTTransportStageWatchdogExpired(MQTTTransportStageTestConfigComplex)
|
|||
if disconnect_raises:
|
||||
stage.transport.disconnect = mocker.MagicMock(side_effect=arbitrary_exception)
|
||||
|
||||
stage.pipeline_nucleus.connected = True
|
||||
stage.nucleus.connected = True
|
||||
stage.run_op(pending_op)
|
||||
|
||||
watchdog_expiration = mock_timer.call_args[0][1]
|
||||
|
@ -1326,7 +1322,7 @@ class TestMQTTTransportStageWatchdogExpired(MQTTTransportStageTestConfigComplex)
|
|||
if disconnect_raises:
|
||||
stage.transport.disconnect = mocker.MagicMock(side_effect=arbitrary_exception)
|
||||
|
||||
stage.pipeline_nucleus.connected = False
|
||||
stage.nucleus.connected = False
|
||||
stage.run_op(pending_op)
|
||||
|
||||
watchdog_expiration = mock_timer.call_args[0][1]
|
||||
|
@ -1344,7 +1340,7 @@ class TestMQTTTransportStageWatchdogExpired(MQTTTransportStageTestConfigComplex)
|
|||
if disconnect_raises:
|
||||
stage.transport.disconnect = mocker.MagicMock(side_effect=arbitrary_exception)
|
||||
|
||||
stage.pipeline_nucleus.connected = True
|
||||
stage.nucleus.connected = True
|
||||
stage.run_op(pending_op)
|
||||
stage._pending_connection_op = None
|
||||
|
||||
|
@ -1363,7 +1359,7 @@ class TestMQTTTransportStageWatchdogExpired(MQTTTransportStageTestConfigComplex)
|
|||
if disconnect_raises:
|
||||
stage.transport.disconnect = mocker.MagicMock(side_effect=arbitrary_exception)
|
||||
|
||||
stage.pipeline_nucleus.connected = False
|
||||
stage.nucleus.connected = False
|
||||
stage.run_op(pending_op)
|
||||
stage._pending_connection_op = None
|
||||
|
||||
|
|
|
@ -61,8 +61,8 @@ class TestHTTPPipelineInstantiation(object):
|
|||
def test_pipeline_nucleus(self, pipeline_configuration):
|
||||
pipeline = HTTPPipeline(pipeline_configuration)
|
||||
|
||||
assert isinstance(pipeline._pipeline_nucleus, pipeline_nucleus.PipelineNucleus)
|
||||
assert pipeline._pipeline_nucleus.pipeline_configuration is pipeline_configuration
|
||||
assert isinstance(pipeline._nucleus, pipeline_nucleus.PipelineNucleus)
|
||||
assert pipeline._nucleus.pipeline_configuration is pipeline_configuration
|
||||
|
||||
@pytest.mark.it("Configures the pipeline with a series of PipelineStages")
|
||||
def test_pipeline_stages(self, pipeline_configuration):
|
||||
|
@ -79,7 +79,7 @@ class TestHTTPPipelineInstantiation(object):
|
|||
for i in range(len(expected_stage_order)):
|
||||
expected_stage = expected_stage_order[i]
|
||||
assert isinstance(curr_stage, expected_stage)
|
||||
assert curr_stage.pipeline_nucleus is pipeline._pipeline_nucleus
|
||||
assert curr_stage.nucleus is pipeline._nucleus
|
||||
curr_stage = curr_stage.next
|
||||
|
||||
# Assert there are no more additional stages
|
||||
|
@ -140,7 +140,7 @@ class TestHTTPPipelineInvokeMethod(object):
|
|||
"Calls the callback with the error if the pipeline_configuration.method_invoke is not True"
|
||||
)
|
||||
def test_op_configuration_fail(self, mocker, pipeline, arbitrary_exception):
|
||||
pipeline._pipeline_nucleus.pipeline_configuration.method_invoke = False
|
||||
pipeline._nucleus.pipeline_configuration.method_invoke = False
|
||||
cb = mocker.MagicMock()
|
||||
|
||||
pipeline.invoke_method(
|
||||
|
@ -231,7 +231,7 @@ class TestHTTPPipelineGetStorageInfo(object):
|
|||
"Calls the callback with the error upon unsuccessful completion of the GetStorageInfoOperation"
|
||||
)
|
||||
def test_op_configuration_fail(self, mocker, pipeline):
|
||||
pipeline._pipeline_nucleus.pipeline_configuration.blob_upload = False
|
||||
pipeline._nucleus.pipeline_configuration.blob_upload = False
|
||||
cb = mocker.MagicMock()
|
||||
pipeline.get_storage_info_for_blob(blob_name="__fake_blob_name__", callback=cb)
|
||||
|
||||
|
@ -292,7 +292,7 @@ class TestHTTPPipelineNotifyBlobUploadStatus(object):
|
|||
"Calls the callback with the error if pipeline_configuration.blob_upload is not True"
|
||||
)
|
||||
def test_op_configuration_fail(self, mocker, pipeline):
|
||||
pipeline._pipeline_nucleus.pipeline_configuration.blob_upload = False
|
||||
pipeline._nucleus.pipeline_configuration.blob_upload = False
|
||||
cb = mocker.MagicMock()
|
||||
pipeline.notify_blob_upload_status(
|
||||
correlation_id="__fake_correlation_id__",
|
||||
|
|
|
@ -99,8 +99,8 @@ class TestMQTTPipelineInstantiation(object):
|
|||
def test_pipeline_nucleus(self, pipeline_configuration):
|
||||
pipeline = MQTTPipeline(pipeline_configuration)
|
||||
|
||||
assert isinstance(pipeline._pipeline_nucleus, pipeline_nucleus.PipelineNucleus)
|
||||
assert pipeline._pipeline_nucleus.pipeline_configuration is pipeline_configuration
|
||||
assert isinstance(pipeline._nucleus, pipeline_nucleus.PipelineNucleus)
|
||||
assert pipeline._nucleus.pipeline_configuration is pipeline_configuration
|
||||
|
||||
@pytest.mark.it("Configures the pipeline with a series of PipelineStages")
|
||||
def test_pipeline_stages(self, pipeline_configuration):
|
||||
|
@ -126,7 +126,7 @@ class TestMQTTPipelineInstantiation(object):
|
|||
for i in range(len(expected_stage_order)):
|
||||
expected_stage = expected_stage_order[i]
|
||||
assert isinstance(curr_stage, expected_stage)
|
||||
assert curr_stage.pipeline_nucleus is pipeline._pipeline_nucleus
|
||||
assert curr_stage.nucleus is pipeline._nucleus
|
||||
curr_stage = curr_stage.next
|
||||
|
||||
# Assert there are no more additional stages
|
||||
|
@ -1118,7 +1118,7 @@ class TestMQTTPipelinePROPERTYPipelineConfiguration(object):
|
|||
|
||||
@pytest.mark.it("Reflects the value of the PipelineNucleus attribute of the same name")
|
||||
def test_reflects_pipeline_attribute(self, pipeline):
|
||||
assert pipeline.pipeline_configuration is pipeline._pipeline_nucleus.pipeline_configuration
|
||||
assert pipeline.pipeline_configuration is pipeline._nucleus.pipeline_configuration
|
||||
|
||||
|
||||
@pytest.mark.describe("MQTTPipeline - PROPERTY .connected")
|
||||
|
@ -1130,7 +1130,7 @@ class TestMQTTPipelinePROPERTYConnected(object):
|
|||
|
||||
@pytest.mark.it("Reflects the value of the PipelineNucleus attribute of the same name")
|
||||
def test_reflects_pipeline_attribute(self, pipeline):
|
||||
pipeline._pipeline_nucleus.connected = True
|
||||
pipeline._nucleus.connected = True
|
||||
assert pipeline.connected
|
||||
pipeline._pipeline_nucleus.connected = False
|
||||
pipeline._nucleus.connected = False
|
||||
assert not pipeline.connected
|
||||
|
|
|
@ -71,7 +71,7 @@ class IoTHubHTTPTranslationStageTestConfig(object):
|
|||
@pytest.fixture
|
||||
def stage(self, mocker, cls_type, init_kwargs, pipeline_config):
|
||||
stage = cls_type(**init_kwargs)
|
||||
stage.pipeline_nucleus = pipeline_nucleus.PipelineNucleus(pipeline_config)
|
||||
stage.nucleus = pipeline_nucleus.PipelineNucleus(pipeline_config)
|
||||
stage.send_op_down = mocker.MagicMock()
|
||||
stage.send_event_up = mocker.MagicMock()
|
||||
mocker.spy(stage, "report_background_exception")
|
||||
|
@ -168,7 +168,7 @@ class TestIoTHubHTTPTranslationStageRunOpCalledWithMethodInvokeOperation(
|
|||
],
|
||||
)
|
||||
def test_new_op_headers(self, mocker, stage, op, custom_user_agent, pipeline_config):
|
||||
stage.pipeline_nucleus.pipeline_configuration.product_info = custom_user_agent
|
||||
stage.nucleus.pipeline_configuration.product_info = custom_user_agent
|
||||
stage.run_op(op)
|
||||
|
||||
# Op was sent down
|
||||
|
@ -398,7 +398,7 @@ class TestIoTHubHTTPTranslationStageRunOpCalledWithGetStorageInfoOperation(
|
|||
],
|
||||
)
|
||||
def test_new_op_headers(self, mocker, stage, op, custom_user_agent, pipeline_config):
|
||||
stage.pipeline_nucleus.pipeline_configuration.product_info = custom_user_agent
|
||||
stage.nucleus.pipeline_configuration.product_info = custom_user_agent
|
||||
stage.run_op(op)
|
||||
|
||||
# Op was sent down
|
||||
|
@ -628,7 +628,7 @@ class TestIoTHubHTTPTranslationStageRunOpCalledWithNotifyBlobUploadStatusOperati
|
|||
],
|
||||
)
|
||||
def test_new_op_headers(self, mocker, stage, op, custom_user_agent, pipeline_config):
|
||||
stage.pipeline_nucleus.pipeline_configuration.product_info = custom_user_agent
|
||||
stage.nucleus.pipeline_configuration.product_info = custom_user_agent
|
||||
stage.run_op(op)
|
||||
|
||||
# Op was sent down
|
||||
|
|
|
@ -88,11 +88,11 @@ def expected_mqtt_topic_fn(mock_mqtt_topic, iothub_pipeline_feature):
|
|||
@pytest.fixture
|
||||
def expected_mqtt_topic_fn_call(mocker, iothub_pipeline_feature, stage):
|
||||
if iothub_pipeline_feature == constant.C2D_MSG:
|
||||
return mocker.call(stage.pipeline_nucleus.pipeline_configuration.device_id)
|
||||
return mocker.call(stage.nucleus.pipeline_configuration.device_id)
|
||||
elif iothub_pipeline_feature == constant.INPUT_MSG:
|
||||
return mocker.call(
|
||||
stage.pipeline_nucleus.pipeline_configuration.device_id,
|
||||
stage.pipeline_nucleus.pipeline_configuration.module_id,
|
||||
stage.nucleus.pipeline_configuration.device_id,
|
||||
stage.nucleus.pipeline_configuration.module_id,
|
||||
)
|
||||
else:
|
||||
return mocker.call()
|
||||
|
@ -120,7 +120,7 @@ class IoTHubMQTTTranslationStageTestConfig(object):
|
|||
@pytest.fixture
|
||||
def stage(self, mocker, cls_type, init_kwargs, pipeline_config):
|
||||
stage = cls_type(**init_kwargs)
|
||||
stage.pipeline_nucleus = pipeline_nucleus.PipelineNucleus(pipeline_config)
|
||||
stage.nucleus = pipeline_nucleus.PipelineNucleus(pipeline_config)
|
||||
stage.send_op_down = mocker.MagicMock()
|
||||
stage.send_event_up = mocker.MagicMock()
|
||||
mocker.spy(stage, "report_background_exception")
|
||||
|
@ -761,7 +761,7 @@ class TestIoTHubMQTTTranslationStageHandlePipelineEventWithIncomingMQTTMessageEv
|
|||
"Sends the original event up the pipeline instead, if the device id in the topic string does not match the client details"
|
||||
)
|
||||
def test_nonmatching_device_id(self, mocker, event, stage):
|
||||
stage.pipeline_nucleus.pipeline_configuration.device_id = "different_device_id"
|
||||
stage.nucleus.pipeline_configuration.device_id = "different_device_id"
|
||||
stage.handle_pipeline_event(event)
|
||||
|
||||
assert stage.send_event_up.call_count == 1
|
||||
|
@ -847,9 +847,9 @@ class TestIoTHubMQTTTranslationStageHandlePipelineEventWithIncomingMQTTMessageEv
|
|||
)
|
||||
def test_nonmatching_ids(self, mocker, event, stage, alt_device_id, alt_module_id):
|
||||
if alt_device_id:
|
||||
stage.pipeline_nucleus.pipeline_configuration.device_id = alt_device_id
|
||||
stage.nucleus.pipeline_configuration.device_id = alt_device_id
|
||||
if alt_module_id:
|
||||
stage.pipeline_nucleus.pipeline_configuration.module_id = alt_module_id
|
||||
stage.nucleus.pipeline_configuration.module_id = alt_module_id
|
||||
stage.handle_pipeline_event(event)
|
||||
|
||||
assert stage.send_event_up.call_count == 1
|
||||
|
|
|
@ -91,8 +91,8 @@ class TestMQTTPipelineInstantiation(object):
|
|||
def test_pipeline_nucleus(self, pipeline_configuration):
|
||||
pipeline = MQTTPipeline(pipeline_configuration)
|
||||
|
||||
assert isinstance(pipeline._pipeline_nucleus, pipeline_nucleus.PipelineNucleus)
|
||||
assert pipeline._pipeline_nucleus.pipeline_configuration is pipeline_configuration
|
||||
assert isinstance(pipeline._nucleus, pipeline_nucleus.PipelineNucleus)
|
||||
assert pipeline._nucleus.pipeline_configuration is pipeline_configuration
|
||||
|
||||
@pytest.mark.it("Configures the pipeline with a series of PipelineStages")
|
||||
def test_pipeline_stages(self, pipeline_configuration):
|
||||
|
@ -118,7 +118,7 @@ class TestMQTTPipelineInstantiation(object):
|
|||
for i in range(len(expected_stage_order)):
|
||||
expected_stage = expected_stage_order[i]
|
||||
assert isinstance(curr_stage, expected_stage)
|
||||
assert curr_stage.pipeline_nucleus is pipeline._pipeline_nucleus
|
||||
assert curr_stage.nucleus is pipeline._nucleus
|
||||
curr_stage = curr_stage.next
|
||||
|
||||
# Assert there are no more additional stages
|
||||
|
@ -369,9 +369,7 @@ class TestSendRegister(object):
|
|||
assert pipeline._pipeline.run_op.call_count == 1
|
||||
op = pipeline._pipeline.run_op.call_args[0][0]
|
||||
assert isinstance(op, pipeline_ops_provisioning.RegisterOperation)
|
||||
assert (
|
||||
op.registration_id == pipeline._pipeline_nucleus.pipeline_configuration.registration_id
|
||||
)
|
||||
assert op.registration_id == pipeline._nucleus.pipeline_configuration.registration_id
|
||||
|
||||
@pytest.mark.it("passes the payload parameter as request_payload on the RegistrationRequest")
|
||||
def test_sets_request_payload(self, pipeline, mocker):
|
||||
|
|
|
@ -69,7 +69,7 @@ class ProvisioningMQTTTranslationStageTestConfig(object):
|
|||
@pytest.fixture
|
||||
def stage(self, mocker, cls_type, init_kwargs, pipeline_config):
|
||||
stage = cls_type(**init_kwargs)
|
||||
stage.pipeline_nucleus = pipeline_nucleus.PipelineNucleus(pipeline_config)
|
||||
stage.nucleus = pipeline_nucleus.PipelineNucleus(pipeline_config)
|
||||
stage.send_op_down = mocker.MagicMock()
|
||||
stage.send_event_up = mocker.MagicMock()
|
||||
mocker.spy(stage, "report_background_exception")
|
||||
|
|
Загрузка…
Ссылка в новой задаче