Родитель
759dfc16e1
Коммит
70fe35cd2b
|
@ -739,33 +739,6 @@ class CoordinateRequestAndResponseStage(PipelineStage):
|
|||
)
|
||||
)
|
||||
|
||||
elif isinstance(event, pipeline_events_base.ConnectedEvent):
|
||||
"""
|
||||
If we're reconnecting, send all pending requests down again. This is necessary
|
||||
because any response that might have been sent by the service was possibly lost
|
||||
when the connection dropped. The fact that the operation is still pending means
|
||||
that we haven't received the response yet. Sending the request more than once
|
||||
will result in a reasonable response for all known operations, aside from extra
|
||||
processing on the server in the case of a re-sent provisioning request, or the
|
||||
appearance of a jump in $version attributes in the case of a lost twin PATCH
|
||||
operation. Since we're reusing the same $rid, the server, of course, _could_
|
||||
recognize that this is a duplicate request, but the behavior in this case is
|
||||
undefined.
|
||||
"""
|
||||
|
||||
for request_id in self.pending_responses:
|
||||
logger.info(
|
||||
"{stage}: ConnectedEvent: re-publishing request {id} for {method} {type} ".format(
|
||||
stage=self.name,
|
||||
id=request_id,
|
||||
method=self.pending_responses[request_id].method,
|
||||
type=self.pending_responses[request_id].request_type,
|
||||
)
|
||||
)
|
||||
self._send_request_down(request_id, self.pending_responses[request_id])
|
||||
|
||||
self.send_event_up(event)
|
||||
|
||||
else:
|
||||
self.send_event_up(event)
|
||||
|
||||
|
|
|
@ -1813,246 +1813,6 @@ class TestCoordinateRequestAndResponseStageHandlePipelineEventWithResponseEvent(
|
|||
assert not pending_op.completed
|
||||
|
||||
|
||||
@pytest.mark.describe(
|
||||
"CoordinateRequestAndResponseStage - .handle_pipeline_event() -- Called with ConnectedEvent"
|
||||
)
|
||||
class TestCoordinateRequestAndResponseStageHandlePipelineEventWithConnectedEvent(
|
||||
CoordinateRequestAndResponseStageTestConfig, StageHandlePipelineEventTestBase
|
||||
):
|
||||
@pytest.fixture
|
||||
def event(self):
|
||||
return pipeline_events_base.ConnectedEvent()
|
||||
|
||||
def make_new_request_response_op(self, mocker, id):
|
||||
return pipeline_ops_base.RequestAndResponseOperation(
|
||||
request_type="some_request_type",
|
||||
method="SOME_METHOD",
|
||||
resource_location="some/resource/location/{}".format(id),
|
||||
request_body="some_request_body",
|
||||
callback=mocker.MagicMock(),
|
||||
)
|
||||
|
||||
@pytest.fixture
|
||||
def stage(self, mocker, cls_type, init_kwargs, nucleus):
|
||||
stage = cls_type(**init_kwargs)
|
||||
stage.nucleus = nucleus
|
||||
stage.send_event_up = mocker.MagicMock()
|
||||
stage.send_op_down = mocker.MagicMock()
|
||||
mocker.spy(stage, "report_background_exception")
|
||||
|
||||
return stage
|
||||
|
||||
@pytest.mark.it("Sends a RequestOperation down again if that RequestOperation never completed")
|
||||
def test_request_never_completed(self, stage, event, mocker):
|
||||
op1 = self.make_new_request_response_op(mocker, "op1")
|
||||
|
||||
# send it down but don't complete it
|
||||
stage.run_op(op1)
|
||||
assert stage.send_op_down.call_count == 1
|
||||
op1_guid = stage.send_op_down.call_args[0][0].request_id
|
||||
|
||||
# simulate a connected event
|
||||
stage.handle_pipeline_event(event)
|
||||
|
||||
# verify that it got sent again
|
||||
assert stage.send_op_down.call_count == 2
|
||||
assert isinstance(stage.send_op_down.call_args[0][0], pipeline_ops_base.RequestOperation)
|
||||
assert stage.send_op_down.call_args[0][0].request_id == op1_guid
|
||||
|
||||
@pytest.mark.it(
|
||||
"Sends a RequestOperation down again if that RequestOperation completed, but no corresponding ResponseEvent was received"
|
||||
)
|
||||
def test_response_never_received(self, stage, event, mocker):
|
||||
op1 = self.make_new_request_response_op(mocker, "op1")
|
||||
|
||||
# send it down and completed the RequestOperation
|
||||
stage.run_op(op1)
|
||||
assert stage.send_op_down.call_count == 1
|
||||
op1_guid = stage.send_op_down.call_args[0][0].request_id
|
||||
stage.send_op_down.call_args[0][0].complete()
|
||||
|
||||
# simulate a connected event
|
||||
stage.handle_pipeline_event(event)
|
||||
|
||||
# verify that it got sent again
|
||||
assert stage.send_op_down.call_count == 2
|
||||
assert isinstance(stage.send_op_down.call_args[0][0], pipeline_ops_base.RequestOperation)
|
||||
assert stage.send_op_down.call_args[0][0].request_id == op1_guid
|
||||
|
||||
@pytest.mark.it(
|
||||
"Sends down multiple RequestOperations again if those RequestOperations never completed"
|
||||
)
|
||||
def test_multiple_requests_never_completed(self, stage, event, mocker):
|
||||
op1 = self.make_new_request_response_op(mocker, "op1")
|
||||
op2 = self.make_new_request_response_op(mocker, "op2")
|
||||
|
||||
# send 2 ops down but don't complete them
|
||||
stage.run_op(op1)
|
||||
assert stage.send_op_down.call_count == 1
|
||||
op1_guid = stage.send_op_down.call_args[0][0].request_id
|
||||
|
||||
stage.run_op(op2)
|
||||
assert stage.send_op_down.call_count == 2
|
||||
op2_guid = stage.send_op_down.call_args[0][0].request_id
|
||||
|
||||
# simulate a connected event
|
||||
stage.handle_pipeline_event(event)
|
||||
|
||||
# verify that 2 more RequestOperation ops were send down
|
||||
assert stage.send_op_down.call_count == 4
|
||||
assert isinstance(
|
||||
stage.send_op_down.call_args_list[0][0][0], pipeline_ops_base.RequestOperation
|
||||
)
|
||||
assert isinstance(
|
||||
stage.send_op_down.call_args_list[1][0][0], pipeline_ops_base.RequestOperation
|
||||
)
|
||||
|
||||
assert (
|
||||
stage.send_op_down.call_args_list[2][0][0].request_id == op1_guid
|
||||
and stage.send_op_down.call_args_list[3][0][0].request_id == op2_guid
|
||||
) or (
|
||||
stage.send_op_down.call_args_list[2][0][0].request_id == op2_guid
|
||||
and stage.send_op_down.call_args_list[3][0][0].request_id == op1_guid
|
||||
)
|
||||
|
||||
@pytest.mark.it(
|
||||
"Sends down multiple RequestOperations again if those RequestOperations completed, but the corresponding ResponseEvents were never received"
|
||||
)
|
||||
def test_multiple_responses_never_received(self, stage, event, mocker):
|
||||
op1 = self.make_new_request_response_op(mocker, "op1")
|
||||
op2 = self.make_new_request_response_op(mocker, "op2")
|
||||
|
||||
# send 2 ops down
|
||||
stage.run_op(op1)
|
||||
assert stage.send_op_down.call_count == 1
|
||||
op1_guid = stage.send_op_down.call_args[0][0].request_id
|
||||
|
||||
stage.run_op(op2)
|
||||
assert stage.send_op_down.call_count == 2
|
||||
op2_guid = stage.send_op_down.call_args[0][0].request_id
|
||||
|
||||
# complete the 2 RequestOperation ops
|
||||
stage.send_op_down.call_arg_list[0][0][0].complete()
|
||||
stage.send_op_down.call_arg_list[1][0][0].complete()
|
||||
|
||||
# simulate a connected event
|
||||
stage.handle_pipeline_event(event)
|
||||
|
||||
# verify that 2 more RequestOperation ops were send down
|
||||
assert stage.send_op_down.call_count == 4
|
||||
assert isinstance(
|
||||
stage.send_op_down.call_args_list[0][0][0], pipeline_ops_base.RequestOperation
|
||||
)
|
||||
assert isinstance(
|
||||
stage.send_op_down.call_args_list[1][0][0], pipeline_ops_base.RequestOperation
|
||||
)
|
||||
|
||||
assert (
|
||||
stage.send_op_down.call_args_list[2][0][0].request_id == op1_guid
|
||||
and stage.send_op_down.call_args_list[3][0][0].request_id == op2_guid
|
||||
) or (
|
||||
stage.send_op_down.call_args_list[2][0][0].request_id == op2_guid
|
||||
and stage.send_op_down.call_args_list[3][0][0].request_id == op1_guid
|
||||
)
|
||||
|
||||
@pytest.mark.it(
|
||||
"Does not send down any RequestOperations if the RequestAndResponseOperation completed"
|
||||
)
|
||||
def test_request_and_response_completed(self, stage, event, mocker):
|
||||
op1 = self.make_new_request_response_op(mocker, "op1")
|
||||
|
||||
# send it down and complete the RequestOperation op
|
||||
stage.run_op(op1)
|
||||
assert stage.send_op_down.call_count == 1
|
||||
op1_guid = stage.send_op_down.call_args[0][0].request_id
|
||||
stage.send_op_down.call_args[0][0].complete()
|
||||
|
||||
# simulate the corresponding response
|
||||
response_event = pipeline_events_base.ResponseEvent(
|
||||
request_id=op1_guid, status_code=200, response_body="response body"
|
||||
)
|
||||
stage.handle_pipeline_event(response_event)
|
||||
|
||||
# verify that the op is complete
|
||||
assert op1.completed
|
||||
|
||||
# simulate a connected event
|
||||
stage.handle_pipeline_event(event)
|
||||
|
||||
# verify that nothing else was sent
|
||||
assert stage.send_op_down.call_count == 1
|
||||
|
||||
@pytest.mark.it("Can independently track and resend multiple RequestOperations")
|
||||
def test_one_completed_one_outstanding(self, stage, event, mocker):
|
||||
op1 = self.make_new_request_response_op(mocker, "op1")
|
||||
op2 = self.make_new_request_response_op(mocker, "op2")
|
||||
|
||||
# send 2 ops down
|
||||
stage.run_op(op1)
|
||||
assert stage.send_op_down.call_count == 1
|
||||
op1_guid = stage.send_op_down.call_args[0][0].request_id
|
||||
|
||||
stage.run_op(op2)
|
||||
assert stage.send_op_down.call_count == 2
|
||||
op2_guid = stage.send_op_down.call_args[0][0].request_id
|
||||
|
||||
# complete the 2 RequestOperation ops
|
||||
stage.send_op_down.call_arg_list[0][0][0].complete()
|
||||
stage.send_op_down.call_arg_list[1][0][0].complete()
|
||||
|
||||
# simulate a response for the first RequestOperation
|
||||
response_event = pipeline_events_base.ResponseEvent(
|
||||
request_id=op1_guid, status_code=200, response_body="response body"
|
||||
)
|
||||
stage.handle_pipeline_event(response_event)
|
||||
|
||||
# simulate a connected event
|
||||
stage.handle_pipeline_event(event)
|
||||
|
||||
# verify that the re-sent RequestOperation was sent down for the incomplete RequestAndResponseOperation
|
||||
assert stage.send_op_down.call_count == 3
|
||||
assert isinstance(stage.send_op_down.call_args[0][0], pipeline_ops_base.RequestOperation)
|
||||
assert stage.send_op_down.call_args[0][0].request_id == op2_guid
|
||||
|
||||
@pytest.mark.it(
|
||||
"Does not send down any RequestOperations if all those RequestAndResponseOperations are complete"
|
||||
)
|
||||
def test_all_completed(self, stage, event, mocker):
|
||||
op1 = self.make_new_request_response_op(mocker, "op1")
|
||||
op2 = self.make_new_request_response_op(mocker, "op2")
|
||||
|
||||
# send 2 ops down
|
||||
stage.run_op(op1)
|
||||
assert stage.send_op_down.call_count == 1
|
||||
op1_guid = stage.send_op_down.call_args[0][0].request_id
|
||||
|
||||
stage.run_op(op2)
|
||||
assert stage.send_op_down.call_count == 2
|
||||
op2_guid = stage.send_op_down.call_args[0][0].request_id
|
||||
|
||||
# complete the 2 RequestOperation ops
|
||||
stage.send_op_down.call_arg_list[0][0][0].complete()
|
||||
stage.send_op_down.call_arg_list[1][0][0].complete()
|
||||
|
||||
# simulate 2 responses
|
||||
stage.handle_pipeline_event(
|
||||
pipeline_events_base.ResponseEvent(
|
||||
request_id=op1_guid, status_code=200, response_body="response body"
|
||||
)
|
||||
)
|
||||
stage.handle_pipeline_event(
|
||||
pipeline_events_base.ResponseEvent(
|
||||
request_id=op2_guid, status_code=200, response_body="response body"
|
||||
)
|
||||
)
|
||||
|
||||
# simulate a connected event
|
||||
stage.handle_pipeline_event(event)
|
||||
|
||||
# verify that nothing else was sent down
|
||||
assert stage.send_op_down.call_count == 2
|
||||
|
||||
|
||||
@pytest.mark.describe(
|
||||
"CoordinateRequestAndResponseStage - .handle_pipeline_event() -- Called with arbitrary other event"
|
||||
)
|
||||
|
|
Загрузка…
Ссылка в новой задаче