зеркало из https://github.com/microsoft/AzureTRE.git
Upgrade Python packages, switch to FastAPI lifespan async context manager (#3765)
* Many Python packages are outdated and need updating Fixes #3764
This commit is contained in:
Родитель
7d22ed1fb8
Коммит
5848fcb9f8
|
@ -64,6 +64,8 @@ RUN export PORTER_VERSION=${PORTER_VERSION} \
|
|||
ENV PATH ${PORTER_HOME_V1}:$PATH
|
||||
|
||||
# Install requirements
|
||||
ARG PIP_VERSION=23.3.1
|
||||
RUN pip3 --no-cache-dir install pip==${PIP_VERSION} && pip3 config set global.disable-pip-version-check true
|
||||
COPY ["requirements.txt", "/tmp/pip-tmp/" ]
|
||||
COPY ["api_app/requirements.txt", "api_app/requirements-dev.txt", "/tmp/pip-tmp/api_app/" ]
|
||||
COPY ["resource_processor/vmss_porter/requirements.txt", "/tmp/pip-tmp/resource_processor/vmss_porter/" ]
|
||||
|
@ -73,7 +75,7 @@ COPY ["airlock_processor/requirements.txt", "/tmp/pip-tmp/airlock_processor/"]
|
|||
RUN pip3 --disable-pip-version-check --no-cache-dir install -r /tmp/pip-tmp/requirements.txt
|
||||
|
||||
# Install azure-cli
|
||||
ARG AZURE_CLI_VERSION=2.37.0-1~bullseye
|
||||
ARG AZURE_CLI_VERSION=2.50.0-1~bullseye
|
||||
COPY .devcontainer/scripts/azure-cli.sh /tmp/
|
||||
RUN export AZURE_CLI_VERSION=${AZURE_CLI_VERSION} \
|
||||
&& /tmp/azure-cli.sh
|
||||
|
|
|
@ -9,6 +9,7 @@ FEATURES:
|
|||
ENHANCEMENTS:
|
||||
|
||||
BUG FIXES:
|
||||
* Update Python packages, and fix breaking changes ([#3764](https://github.com/microsoft/AzureTRE/issues/3764))
|
||||
* Enabling support for more than 20 users/groups in Workspace API ([#3759](https://github.com/microsoft/AzureTRE/pull/3759 ))
|
||||
* Airlock Import Review workspace uses dedicated DNS zone to prevent conflict with core ([#3767](https://github.com/microsoft/AzureTRE/pull/3767))
|
||||
|
||||
|
|
|
@ -1 +1 @@
|
|||
__version__ = "0.6.1"
|
||||
__version__ = "0.7.4"
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
# Do not include azure-functions-worker as it may conflict with the Azure Functions platform
|
||||
azure-core
|
||||
azure-functions
|
||||
azure-storage-blob
|
||||
azure-identity
|
||||
azure-mgmt-storage
|
||||
azure-mgmt-resource
|
||||
pydantic
|
||||
azure-core==1.29.5
|
||||
azure-functions==1.17.0
|
||||
azure-storage-blob==12.19.0
|
||||
azure-identity==1.14.1
|
||||
azure-mgmt-storage==21.1.0
|
||||
azure-mgmt-resource==23.0.1
|
||||
pydantic==1.10.13
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
import os
|
||||
import datetime
|
||||
import logging
|
||||
import json
|
||||
import re
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Tuple
|
||||
|
||||
from azure.core.exceptions import ResourceExistsError
|
||||
|
@ -69,16 +69,18 @@ def copy_data(source_account_name: str, destination_account_name: str, request_i
|
|||
logging.error(msg)
|
||||
raise NoFilesInRequestException(msg)
|
||||
|
||||
udk = source_blob_service_client.get_user_delegation_key(datetime.datetime.utcnow() - datetime.timedelta(hours=1),
|
||||
datetime.datetime.utcnow() + datetime.timedelta(hours=1))
|
||||
|
||||
# token geneation with expiry of 1 hour. since its not shared, we can leave it to expire (no need to track/delete)
|
||||
# Remove sas token if not needed: https://github.com/microsoft/AzureTRE/issues/2034
|
||||
sas_token = generate_container_sas(account_name=source_account_name,
|
||||
container_name=container_name,
|
||||
start = datetime.utcnow() - timedelta(minutes=15)
|
||||
expiry = datetime.utcnow() + timedelta(hours=1)
|
||||
udk = source_blob_service_client.get_user_delegation_key(key_start_time=start, key_expiry_time=expiry)
|
||||
|
||||
sas_token = generate_container_sas(container_name=container_name,
|
||||
account_name=source_account_name,
|
||||
user_delegation_key=udk,
|
||||
permission=ContainerSasPermissions(read=True),
|
||||
expiry=datetime.datetime.utcnow() + datetime.timedelta(hours=1))
|
||||
start=start,
|
||||
expiry=expiry)
|
||||
|
||||
source_blob = source_container_client.get_blob_client(blob_name)
|
||||
source_url = f'{source_blob.url}?{sas_token}'
|
||||
|
|
|
@ -19,4 +19,4 @@ COPY . /api
|
|||
WORKDIR /api
|
||||
RUN groupadd -r api && useradd -r -s /bin/false -g api api_user
|
||||
USER api_user
|
||||
CMD ["gunicorn", "main:app", "--bind", "0.0.0.0:8000", "-k", "uvicorn.workers.UvicornWorker"]
|
||||
CMD ["gunicorn", "main:app", "--bind", "0.0.0.0:8000", "-k", "uvicorn.workers.UvicornWorker","--timeout", "60", "--workers", "5"]
|
||||
|
|
|
@ -1 +1 @@
|
|||
__version__ = "0.15.18"
|
||||
__version__ = "0.16.7"
|
||||
|
|
|
@ -1,19 +0,0 @@
|
|||
from typing import Callable
|
||||
|
||||
from fastapi import FastAPI
|
||||
|
||||
from db.events import bootstrap_database
|
||||
|
||||
|
||||
def create_start_app_handler(app: FastAPI) -> Callable:
|
||||
async def start_app() -> None:
|
||||
app.state.cosmos_client = None
|
||||
await bootstrap_database(app)
|
||||
return start_app
|
||||
|
||||
|
||||
def create_stop_app_handler(app: FastAPI) -> Callable:
|
||||
async def stop_app() -> None:
|
||||
pass
|
||||
|
||||
return stop_app
|
|
@ -3,13 +3,18 @@ import logging
|
|||
from azure.cosmos.aio import CosmosClient
|
||||
|
||||
from api.dependencies.database import get_db_client
|
||||
from db.repositories.resources import ResourceRepository
|
||||
from core import config
|
||||
|
||||
|
||||
async def bootstrap_database(app) -> None:
|
||||
async def bootstrap_database(app) -> bool:
|
||||
try:
|
||||
client: CosmosClient = await get_db_client(app)
|
||||
if client:
|
||||
await client.create_database_if_not_exists(id=config.STATE_STORE_DATABASE)
|
||||
# Test access to database
|
||||
await ResourceRepository.create(client)
|
||||
return True
|
||||
except Exception as e:
|
||||
logging.debug(e)
|
||||
return False
|
||||
|
|
|
@ -30,7 +30,7 @@ class BaseRepository:
|
|||
raise UnableToAccessDatabase
|
||||
|
||||
async def query(self, query: str, parameters: Optional[dict] = None):
|
||||
items = self.container.query_items(query=query, parameters=parameters, enable_cross_partition_query=True)
|
||||
items = self.container.query_items(query=query, parameters=parameters)
|
||||
return [i async for i in items]
|
||||
|
||||
async def read_item_by_id(self, item_id: str) -> dict:
|
||||
|
|
|
@ -192,6 +192,6 @@ class OperationRepository(BaseRepository):
|
|||
return parse_obj_as(List[Operation], operations)
|
||||
|
||||
async def resource_has_deployed_operation(self, resource_id: str) -> bool:
|
||||
query = self.operations_query() + f' c.resourceId = "{resource_id}" AND c.action = "{RequestAction.Install}" AND c.status = "{Status.Deployed}"'
|
||||
query = self.operations_query() + f' c.resourceId = "{resource_id}" AND ((c.action = "{RequestAction.Install}" AND c.status = "{Status.Deployed}") OR (c.action = "{RequestAction.Upgrade}" AND c.status = "{Status.Updated}"))'
|
||||
operations = await self.query(query=query)
|
||||
return len(operations) > 0
|
||||
|
|
|
@ -1,13 +1,13 @@
|
|||
import asyncio
|
||||
import logging
|
||||
from opencensus.ext.azure.trace_exporter import AzureExporter
|
||||
import os
|
||||
import uvicorn
|
||||
|
||||
from fastapi import FastAPI
|
||||
from fastapi.exceptions import RequestValidationError
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi_utils.tasks import repeat_every
|
||||
from service_bus.airlock_request_status_update import receive_step_result_message_and_update_status
|
||||
from fastapi.concurrency import asynccontextmanager
|
||||
|
||||
from services.tracing import RequestTracerMiddleware
|
||||
from opencensus.trace.samplers import ProbabilitySampler
|
||||
|
@ -20,9 +20,29 @@ from api.errors.http_error import http_error_handler
|
|||
from api.errors.validation_error import http422_error_handler
|
||||
from api.errors.generic_error import generic_error_handler
|
||||
from core import config
|
||||
from core.events import create_start_app_handler, create_stop_app_handler
|
||||
from db.events import bootstrap_database
|
||||
from services.logging import initialize_logging, telemetry_processor_callback_function
|
||||
from service_bus.deployment_status_updater import DeploymentStatusUpdater
|
||||
from service_bus.airlock_request_status_update import AirlockStatusUpdater
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
app.state.cosmos_client = None
|
||||
|
||||
while not await bootstrap_database(app):
|
||||
await asyncio.sleep(5)
|
||||
logging.warning("Database connection could not be established")
|
||||
|
||||
deploymentStatusUpdater = DeploymentStatusUpdater(app)
|
||||
await deploymentStatusUpdater.init_repos()
|
||||
|
||||
airlockStatusUpdater = AirlockStatusUpdater(app)
|
||||
await airlockStatusUpdater.init_repos()
|
||||
|
||||
asyncio.create_task(deploymentStatusUpdater.receive_messages())
|
||||
asyncio.create_task(airlockStatusUpdater.receive_messages())
|
||||
yield
|
||||
|
||||
|
||||
def get_application() -> FastAPI:
|
||||
|
@ -33,16 +53,15 @@ def get_application() -> FastAPI:
|
|||
version=config.VERSION,
|
||||
docs_url=None,
|
||||
redoc_url=None,
|
||||
openapi_url=None
|
||||
openapi_url=None,
|
||||
lifespan=lifespan
|
||||
)
|
||||
|
||||
application.add_event_handler("startup", create_start_app_handler(application))
|
||||
application.add_event_handler("shutdown", create_stop_app_handler(application))
|
||||
|
||||
try:
|
||||
exporter = AzureExporter(sampler=ProbabilitySampler(1.0))
|
||||
exporter.add_telemetry_processor(telemetry_processor_callback_function)
|
||||
application.add_middleware(RequestTracerMiddleware, exporter=exporter)
|
||||
if os.getenv("APPLICATIONINSIGHTS_CONNECTION_STRING"):
|
||||
exporter = AzureExporter(sampler=ProbabilitySampler(1.0))
|
||||
exporter.add_telemetry_processor(telemetry_processor_callback_function)
|
||||
application.add_middleware(RequestTracerMiddleware, exporter=exporter)
|
||||
except Exception:
|
||||
logging.exception("Failed to add RequestTracerMiddleware")
|
||||
|
||||
|
@ -64,27 +83,12 @@ def get_application() -> FastAPI:
|
|||
|
||||
|
||||
if config.DEBUG:
|
||||
initialize_logging(logging.DEBUG)
|
||||
initialize_logging(logging.DEBUG, add_console_handler=True)
|
||||
else:
|
||||
initialize_logging(logging.INFO)
|
||||
initialize_logging(logging.INFO, add_console_handler=False)
|
||||
|
||||
app = get_application()
|
||||
|
||||
|
||||
@app.on_event("startup")
|
||||
async def watch_deployment_status() -> None:
|
||||
logging.info("Starting deployment status watcher thread")
|
||||
statusWatcher = DeploymentStatusUpdater(app)
|
||||
await statusWatcher.init_repos()
|
||||
current_event_loop = asyncio.get_event_loop()
|
||||
asyncio.run_coroutine_threadsafe(statusWatcher.receive_messages(), loop=current_event_loop)
|
||||
|
||||
|
||||
@app.on_event("startup")
|
||||
@repeat_every(seconds=20, wait_first=True, logger=logging.getLogger())
|
||||
async def update_airlock_request_status() -> None:
|
||||
await receive_step_result_message_and_update_status(app)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
uvicorn.run(app, host="0.0.0.0", port=8000, loop="asyncio")
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
# Dev requirements
|
||||
pytest-asyncio==0.20.3
|
||||
asgi-lifespan~=2.0.0
|
||||
httpx~=0.23.1
|
||||
mock==5.0.0
|
||||
pytest==7.2.0
|
||||
pytest-asyncio==0.21.1
|
||||
httpx==0.25.0
|
||||
mock==5.1.0
|
||||
pytest==7.4.3
|
||||
|
|
|
@ -1,25 +1,25 @@
|
|||
# API
|
||||
azure-core==1.26.1
|
||||
aiohttp==3.8.5
|
||||
azure-cosmos==4.3.0
|
||||
azure-identity==1.12.0
|
||||
azure-mgmt-cosmosdb==9.0.0
|
||||
azure-mgmt-compute==29.1.0
|
||||
azure-mgmt-costmanagement==3.0.0
|
||||
azure-storage-blob==12.15.0
|
||||
azure-servicebus==7.8.1
|
||||
azure-eventgrid==4.9.1
|
||||
fastapi[all]==0.95.0
|
||||
fastapi-utils==0.2.1
|
||||
gunicorn==20.1.0
|
||||
jsonschema[format_nongpl]==4.17.1
|
||||
msal~=1.20.0
|
||||
opencensus-ext-azure==1.1.7
|
||||
azure-core==1.29.5
|
||||
aiohttp==3.8.6
|
||||
azure-cosmos==4.5.1
|
||||
azure-identity==1.14.1
|
||||
azure-mgmt-cosmosdb==9.3.0
|
||||
azure-mgmt-compute==30.3.0
|
||||
azure-mgmt-costmanagement==4.0.1
|
||||
azure-storage-blob==12.19.0
|
||||
azure-servicebus==7.11.3
|
||||
azure-eventgrid==4.15.0
|
||||
fastapi==0.104.0
|
||||
gunicorn==21.2.0
|
||||
jsonschema[format_nongpl]==4.19.1
|
||||
msal==1.22.0
|
||||
opencensus-ext-azure==1.1.11
|
||||
opencensus-ext-logging==0.1.1
|
||||
PyJWT==2.6.0
|
||||
uvicorn[standard]==0.20.0
|
||||
PyJWT==2.8.0
|
||||
uvicorn[standard]==0.23.2
|
||||
semantic-version==2.10.0
|
||||
pytz~=2022.7
|
||||
python-dateutil~=2.8.2
|
||||
azure-mgmt-resource==22.0.0
|
||||
pandas==1.5.2
|
||||
pytz==2022.7
|
||||
python-dateutil==2.8.2
|
||||
azure-mgmt-resource==23.0.1
|
||||
pandas==2.0.3
|
||||
pydantic==1.10.13
|
||||
|
|
|
@ -6,6 +6,6 @@
|
|||
rm -f ../test-results/pytest_api*
|
||||
mkdir -p ../test-results
|
||||
|
||||
if ! pytest --junit-xml ../test-results/pytest_api_unit.xml --ignore e2e_tests; then
|
||||
if ! pytest --junit-xml ../test-results/pytest_api_unit.xml --ignore e2e_tests -W ignore::pytest.PytestUnraisableExceptionWarning -W ignore::DeprecationWarning; then
|
||||
touch ../test-results/pytest_api_unit_failed
|
||||
fi
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
|
||||
from azure.servicebus.aio import ServiceBusClient
|
||||
from azure.servicebus.aio import ServiceBusClient, AutoLockRenewer
|
||||
from azure.servicebus.exceptions import OperationTimeoutError, ServiceBusConnectionError
|
||||
from fastapi import HTTPException
|
||||
from pydantic import ValidationError, parse_obj_as
|
||||
|
||||
|
@ -16,92 +18,103 @@ from core import config, credentials
|
|||
from resources import strings
|
||||
|
||||
|
||||
async def receive_message_from_step_result_queue():
|
||||
"""
|
||||
This method is an async generator which receives messages from service bus
|
||||
and yields those messages. If the yielded function return True the message is
|
||||
marked complete.
|
||||
"""
|
||||
async with credentials.get_credential_async() as credential:
|
||||
service_bus_client = ServiceBusClient(config.SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE, credential)
|
||||
class AirlockStatusUpdater():
|
||||
|
||||
async with service_bus_client:
|
||||
receiver = service_bus_client.get_queue_receiver(queue_name=config.SERVICE_BUS_STEP_RESULT_QUEUE)
|
||||
def __init__(self, app):
|
||||
self.app = app
|
||||
|
||||
async with receiver:
|
||||
received_msgs = await receiver.receive_messages(max_message_count=10, max_wait_time=5)
|
||||
async def init_repos(self):
|
||||
db_client = await get_db_client(self.app)
|
||||
self.airlock_request_repo = await AirlockRequestRepository.create(db_client)
|
||||
self.workspace_repo = await WorkspaceRepository.create(db_client)
|
||||
|
||||
for msg in received_msgs:
|
||||
result = True
|
||||
message = ""
|
||||
import time
|
||||
|
||||
try:
|
||||
message = json.loads(str(msg))
|
||||
result = (yield parse_obj_as(StepResultStatusUpdateMessage, message))
|
||||
except (json.JSONDecodeError, ValidationError):
|
||||
logging.exception(strings.STEP_RESULT_MESSAGE_FORMAT_INCORRECT)
|
||||
...
|
||||
|
||||
if result:
|
||||
logging.info(f"Received step_result status update message with correlation ID {msg.correlation_id}: {message}")
|
||||
await receiver.complete_message(msg)
|
||||
async def receive_messages(self):
|
||||
while True:
|
||||
try:
|
||||
async with credentials.get_credential_async() as credential:
|
||||
service_bus_client = ServiceBusClient(config.SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE, credential)
|
||||
receiver = service_bus_client.get_queue_receiver(queue_name=config.SERVICE_BUS_STEP_RESULT_QUEUE)
|
||||
logging.info(f"Looking for new messages on {config.SERVICE_BUS_STEP_RESULT_QUEUE} queue...")
|
||||
async with receiver:
|
||||
received_msgs = await receiver.receive_messages(max_message_count=10, max_wait_time=1)
|
||||
for msg in received_msgs:
|
||||
async with AutoLockRenewer() as renewer:
|
||||
renewer.register(receiver, msg, max_lock_renewal_duration=60)
|
||||
complete_message = await self.process_message(msg)
|
||||
if complete_message:
|
||||
await receiver.complete_message(msg)
|
||||
else:
|
||||
# could have been any kind of transient issue, we'll abandon back to the queue, and retry
|
||||
await receiver.abandon_message(msg)
|
||||
|
||||
await asyncio.sleep(10)
|
||||
|
||||
async def update_status_in_database(airlock_request_repo: AirlockRequestRepository, workspace_repo: WorkspaceRepository, step_result_message: StepResultStatusUpdateMessage):
|
||||
"""
|
||||
Updates an airlock request and with the new status from step_result message contents.
|
||||
except OperationTimeoutError:
|
||||
# Timeout occurred whilst connecting to a session - this is expected and indicates no non-empty sessions are available
|
||||
logging.debug("No sessions for this process. Will look again...")
|
||||
|
||||
"""
|
||||
result = False
|
||||
try:
|
||||
step_result_data = step_result_message.data
|
||||
airlock_request_id = step_result_data.request_id
|
||||
current_status = step_result_data.completed_step
|
||||
new_status = AirlockRequestStatus(step_result_data.new_status) if step_result_data.new_status else None
|
||||
status_message = step_result_data.status_message
|
||||
request_files = step_result_data.request_files
|
||||
# Find the airlock request by id
|
||||
airlock_request = await get_airlock_request_by_id_from_path(airlock_request_id=airlock_request_id, airlock_request_repo=airlock_request_repo)
|
||||
# Validate that the airlock request status is the same as current status
|
||||
if airlock_request.status == current_status:
|
||||
workspace = await workspace_repo.get_workspace_by_id(airlock_request.workspaceId)
|
||||
# update to new status and send to event grid
|
||||
await update_and_publish_event_airlock_request(airlock_request=airlock_request, airlock_request_repo=airlock_request_repo, updated_by=airlock_request.updatedBy, workspace=workspace, new_status=new_status, request_files=request_files, status_message=status_message)
|
||||
result = True
|
||||
else:
|
||||
logging.error(strings.STEP_RESULT_MESSAGE_STATUS_DOES_NOT_MATCH.format(airlock_request_id, current_status, airlock_request.status))
|
||||
except HTTPException as e:
|
||||
if e.status_code == 404:
|
||||
# Marking as true as this message will never succeed anyways and should be removed from the queue.
|
||||
result = True
|
||||
logging.exception(strings.STEP_RESULT_ID_NOT_FOUND.format(airlock_request_id))
|
||||
if e.status_code == 400:
|
||||
result = True
|
||||
logging.exception(strings.STEP_RESULT_MESSAGE_INVALID_STATUS.format(airlock_request_id, current_status, new_status))
|
||||
if e.status_code == 503:
|
||||
logging.exception(strings.STATE_STORE_ENDPOINT_NOT_RESPONDING)
|
||||
except Exception:
|
||||
logging.exception("Failed updating request status")
|
||||
except ServiceBusConnectionError:
|
||||
# Occasionally there will be a transient / network-level error in connecting to SB.
|
||||
logging.info("Unknown Service Bus connection error. Will retry...")
|
||||
|
||||
return result
|
||||
except Exception as e:
|
||||
# Catch all other exceptions, log them via .exception to get the stack trace, and reconnect
|
||||
logging.exception(f"Unknown exception. Will retry - {e}")
|
||||
|
||||
async def process_message(self, msg):
|
||||
complete_message = False
|
||||
|
||||
async def receive_step_result_message_and_update_status(app) -> None:
|
||||
"""
|
||||
Receives messages from the step result eventgrid topic and updates the status for
|
||||
the airlock request in the state store.
|
||||
Args:
|
||||
app ([FastAPI]): Handle to the currently running app
|
||||
"""
|
||||
receive_message_gen = receive_message_from_step_result_queue()
|
||||
try:
|
||||
message = parse_obj_as(StepResultStatusUpdateMessage, json.loads(str(msg)))
|
||||
logging.info(f"Received step_result status update message with correlation ID {message.id}: {message}")
|
||||
complete_message = await self.update_status_in_database(message)
|
||||
logging.info(f"Update status in DB for {message.id}")
|
||||
except (json.JSONDecodeError, ValidationError):
|
||||
logging.exception(f"{strings.STEP_RESULT_MESSAGE_FORMAT_INCORRECT}: {msg.correlation_id}")
|
||||
complete_message = True
|
||||
except Exception:
|
||||
logging.exception(f"Exception processing message: {msg.correlation_id}")
|
||||
|
||||
try:
|
||||
async for message in receive_message_gen:
|
||||
db_client = await get_db_client(app)
|
||||
airlock_request_repo = await AirlockRequestRepository.create(db_client)
|
||||
workspace_repo = await WorkspaceRepository.create(db_client)
|
||||
logging.info("Fetched step_result message from queue, start updating airlock request")
|
||||
result = await update_status_in_database(airlock_request_repo, workspace_repo, message)
|
||||
await receive_message_gen.asend(result)
|
||||
logging.info("Finished updating airlock request")
|
||||
except StopAsyncIteration: # the async generator when finished signals end with this exception.
|
||||
pass
|
||||
return complete_message
|
||||
|
||||
async def update_status_in_database(self, step_result_message: StepResultStatusUpdateMessage):
|
||||
"""
|
||||
Updates an airlock request and with the new status from step_result message contents.
|
||||
|
||||
"""
|
||||
result = False
|
||||
try:
|
||||
step_result_data = step_result_message.data
|
||||
airlock_request_id = step_result_data.request_id
|
||||
current_status = step_result_data.completed_step
|
||||
new_status = AirlockRequestStatus(step_result_data.new_status) if step_result_data.new_status else None
|
||||
status_message = step_result_data.status_message
|
||||
request_files = step_result_data.request_files
|
||||
# Find the airlock request by id
|
||||
airlock_request = await get_airlock_request_by_id_from_path(airlock_request_id=airlock_request_id, airlock_request_repo=self.airlock_request_repo)
|
||||
# Validate that the airlock request status is the same as current status
|
||||
if airlock_request.status == current_status:
|
||||
workspace = await self.workspace_repo.get_workspace_by_id(airlock_request.workspaceId)
|
||||
# update to new status and send to event grid
|
||||
await update_and_publish_event_airlock_request(airlock_request=airlock_request, airlock_request_repo=self.airlock_request_repo, updated_by=airlock_request.updatedBy, workspace=workspace, new_status=new_status, request_files=request_files, status_message=status_message)
|
||||
result = True
|
||||
else:
|
||||
logging.error(strings.STEP_RESULT_MESSAGE_STATUS_DOES_NOT_MATCH.format(airlock_request_id, current_status, airlock_request.status))
|
||||
except HTTPException as e:
|
||||
if e.status_code == 404:
|
||||
# Marking as true as this message will never succeed anyways and should be removed from the queue.
|
||||
result = True
|
||||
logging.exception(strings.STEP_RESULT_ID_NOT_FOUND.format(airlock_request_id))
|
||||
if e.status_code == 400:
|
||||
result = True
|
||||
logging.exception(strings.STEP_RESULT_MESSAGE_INVALID_STATUS.format(airlock_request_id, current_status, new_status))
|
||||
if e.status_code == 503:
|
||||
logging.exception(strings.STATE_STORE_ENDPOINT_NOT_RESPONDING)
|
||||
except Exception:
|
||||
logging.exception("Failed updating request status")
|
||||
|
||||
return result
|
||||
|
|
|
@ -107,14 +107,22 @@ def get_airlock_request_container_sas_token(account_name: str,
|
|||
airlock_request: AirlockRequest):
|
||||
blob_service_client = BlobServiceClient(account_url=get_account_url(account_name),
|
||||
credential=credentials.get_credential())
|
||||
|
||||
start = datetime.utcnow() - timedelta(minutes=15)
|
||||
expiry = datetime.utcnow() + timedelta(hours=config.AIRLOCK_SAS_TOKEN_EXPIRY_PERIOD_IN_HOURS)
|
||||
udk = blob_service_client.get_user_delegation_key(datetime.utcnow(), expiry)
|
||||
|
||||
try:
|
||||
udk = blob_service_client.get_user_delegation_key(key_start_time=start, key_expiry_time=expiry)
|
||||
except Exception:
|
||||
raise Exception(f"Failed getting user delegation key, has the API identity been granted 'Storage Blob Data Contributor' access to the storage account {account_name}?")
|
||||
|
||||
required_permission = get_required_permission(airlock_request)
|
||||
|
||||
token = generate_container_sas(container_name=airlock_request.id,
|
||||
account_name=account_name,
|
||||
user_delegation_key=udk,
|
||||
permission=required_permission,
|
||||
start=start,
|
||||
expiry=expiry)
|
||||
|
||||
return "https://{}.blob.{}/{}?{}" \
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import logging
|
||||
import os
|
||||
from typing import Optional
|
||||
|
||||
from opencensus.ext.azure.log_exporter import AzureLogHandler
|
||||
|
@ -18,9 +19,14 @@ UNWANTED_LOGGERS = [
|
|||
"azure.identity.aio._credentials.chained",
|
||||
"azure.identity",
|
||||
"msal.token_cache"
|
||||
# Remove these once the following PR is merged:
|
||||
# https://github.com/Azure/azure-sdk-for-python/pull/30832
|
||||
# Issue: https://github.com/microsoft/AzureTRE/issues/3766
|
||||
"azure.servicebus._pyamqp.aio._session_async"
|
||||
]
|
||||
|
||||
LOGGERS_FOR_ERRORS_ONLY = [
|
||||
"urllib3.connectionpool",
|
||||
"uamqp",
|
||||
"uamqp.authentication.cbs_auth_async",
|
||||
"uamqp.async_ops.client_async",
|
||||
|
@ -33,7 +39,14 @@ LOGGERS_FOR_ERRORS_ONLY = [
|
|||
"uamqp.async_ops.session_async",
|
||||
"uamqp.sender",
|
||||
"uamqp.client",
|
||||
"azure.servicebus.aio._base_handler_async"
|
||||
"azure.identity._persistent_cache",
|
||||
"azure.servicebus.aio._base_handler_async",
|
||||
"azure.servicebus._pyamqp.aio._cbs_async",
|
||||
"azure.servicebus._pyamqp.aio._connection_async",
|
||||
"azure.servicebus._pyamqp.aio._link_async",
|
||||
"azure.servicebus._pyamqp.aio._management_link_async",
|
||||
"azure.servicebus._pyamqp.aio._session_async",
|
||||
"azure.servicebus._pyamqp.aio._client_async"
|
||||
]
|
||||
|
||||
|
||||
|
@ -41,12 +54,12 @@ def disable_unwanted_loggers():
|
|||
"""
|
||||
Disables the unwanted loggers.
|
||||
"""
|
||||
for logger_name in UNWANTED_LOGGERS:
|
||||
logging.getLogger(logger_name).disabled = True
|
||||
|
||||
for logger_name in LOGGERS_FOR_ERRORS_ONLY:
|
||||
logging.getLogger(logger_name).setLevel(logging.ERROR)
|
||||
|
||||
for logger_name in UNWANTED_LOGGERS:
|
||||
logging.getLogger(logger_name).disabled = True
|
||||
|
||||
|
||||
def telemetry_processor_callback_function(envelope):
|
||||
envelope.tags['ai.cloud.role'] = 'api'
|
||||
|
@ -68,7 +81,7 @@ class ExceptionTracebackFilter(logging.Filter):
|
|||
return True
|
||||
|
||||
|
||||
def initialize_logging(logging_level: int, correlation_id: Optional[str] = None) -> logging.LoggerAdapter:
|
||||
def initialize_logging(logging_level: int, correlation_id: Optional[str] = None, add_console_handler: bool = False) -> logging.LoggerAdapter:
|
||||
"""
|
||||
Adds the Application Insights handler for the root logger and sets the given logging level.
|
||||
Creates and returns a logger adapter that integrates the correlation ID, if given, to the log messages.
|
||||
|
@ -81,12 +94,19 @@ def initialize_logging(logging_level: int, correlation_id: Optional[str] = None)
|
|||
|
||||
disable_unwanted_loggers()
|
||||
|
||||
if add_console_handler:
|
||||
console_formatter = logging.Formatter(fmt='%(module)-7s %(name)-7s %(process)-7s %(asctime)s %(levelname)-7s %(message)s')
|
||||
console_handler = logging.StreamHandler()
|
||||
console_handler.setFormatter(console_formatter)
|
||||
logger.addHandler(console_handler)
|
||||
|
||||
try:
|
||||
# picks up APPLICATIONINSIGHTS_CONNECTION_STRING automatically
|
||||
azurelog_handler = AzureLogHandler()
|
||||
azurelog_handler.add_telemetry_processor(telemetry_processor_callback_function)
|
||||
azurelog_handler.addFilter(ExceptionTracebackFilter())
|
||||
logger.addHandler(azurelog_handler)
|
||||
if os.getenv("APPLICATIONINSIGHTS_CONNECTION_STRING"):
|
||||
azurelog_handler = AzureLogHandler()
|
||||
azurelog_handler.add_telemetry_processor(telemetry_processor_callback_function)
|
||||
azurelog_handler.addFilter(ExceptionTracebackFilter())
|
||||
logger.addHandler(azurelog_handler)
|
||||
except ValueError as e:
|
||||
logger.error(f"Failed to set Application Insights logger handler: {e}")
|
||||
|
||||
|
|
|
@ -580,5 +580,5 @@ def no_database():
|
|||
with patch(
|
||||
"db.repositories.base.BaseRepository._get_container", return_value=None
|
||||
):
|
||||
with patch("core.events.bootstrap_database", return_value=None):
|
||||
with patch("db.events.bootstrap_database", return_value=None):
|
||||
yield
|
||||
|
|
|
@ -2,20 +2,25 @@ import pytest
|
|||
import pytest_asyncio
|
||||
from mock import patch
|
||||
|
||||
from asgi_lifespan import LifespanManager
|
||||
from fastapi import FastAPI
|
||||
from httpx import AsyncClient
|
||||
|
||||
from models.domain.authentication import User
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True, scope='module')
|
||||
def no_lifespan_events():
|
||||
with patch("main.lifespan"):
|
||||
yield
|
||||
|
||||
|
||||
@pytest_asyncio.fixture(autouse=True)
|
||||
def no_database():
|
||||
""" overrides connecting to the database for all tests"""
|
||||
with patch('api.dependencies.database.connect_to_db', return_value=None):
|
||||
with patch('api.dependencies.database.get_db_client', return_value=None):
|
||||
with patch('db.repositories.base.BaseRepository._get_container', return_value=None):
|
||||
with patch('core.events.bootstrap_database', return_value=None):
|
||||
with patch('db.events.bootstrap_database', return_value=None):
|
||||
yield
|
||||
|
||||
|
||||
|
@ -134,12 +139,7 @@ def app() -> FastAPI:
|
|||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def initialized_app(app: FastAPI) -> FastAPI:
|
||||
async with LifespanManager(app):
|
||||
yield app
|
||||
async def client(app: FastAPI) -> AsyncClient:
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def client(initialized_app: FastAPI) -> AsyncClient:
|
||||
async with AsyncClient(app=initialized_app, base_url="http://testserver", headers={"Content-Type": "application/json"}) as client:
|
||||
async with AsyncClient(app=app, base_url="http://testserver", headers={"Content-Type": "application/json"}) as client:
|
||||
yield client
|
||||
|
|
|
@ -345,4 +345,4 @@ class TestSharedServiceRoutesThatRequireAdminRights:
|
|||
response = await client.patch(app.url_path_for(strings.API_UPDATE_SHARED_SERVICE, shared_service_id=SHARED_SERVICE_ID), json=shared_service_patch, headers={"etag": ETAG})
|
||||
|
||||
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
|
||||
assert response.text == '1 validation error for Request\nbody -> fakeField\n extra fields not permitted (type=value_error.extra)'
|
||||
assert response.text == "[{'loc': ('body', 'fakeField'), 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}]"
|
||||
|
|
|
@ -503,7 +503,7 @@ class TestWorkspaceRoutesThatRequireAdminRights:
|
|||
|
||||
response = await client.patch(app.url_path_for(strings.API_UPDATE_WORKSPACE, workspace_id=WORKSPACE_ID), json=workspace_patch)
|
||||
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
|
||||
assert ("header -> etag" in response.text and "field required" in response.text)
|
||||
assert ("('header', 'etag')" in response.text and "field required" in response.text)
|
||||
|
||||
# [PATCH] /workspaces/{workspace_id}
|
||||
@ patch("api.dependencies.workspaces.WorkspaceRepository.get_workspace_by_id", side_effect=EntityDoesNotExist)
|
||||
|
|
|
@ -152,4 +152,4 @@ async def test_get_airlock_requests_queries_db(airlock_request_repo):
|
|||
]
|
||||
|
||||
await airlock_request_repo.get_airlock_requests(WORKSPACE_ID)
|
||||
airlock_request_repo.container.query_items.assert_called_once_with(query=expected_query, parameters=expected_parameters, enable_cross_partition_query=True)
|
||||
airlock_request_repo.container.query_items.assert_called_once_with(query=expected_query, parameters=expected_parameters)
|
||||
|
|
|
@ -53,7 +53,7 @@ async def test_get_workspaces_queries_db(workspace_repo):
|
|||
expected_query = workspace_repo.workspaces_query_string()
|
||||
|
||||
await workspace_repo.get_workspaces()
|
||||
workspace_repo.container.query_items.assert_called_once_with(query=expected_query, parameters=None, enable_cross_partition_query=True)
|
||||
workspace_repo.container.query_items.assert_called_once_with(query=expected_query, parameters=None)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
@ -62,7 +62,7 @@ async def test_get_active_workspaces_queries_db(workspace_repo):
|
|||
expected_query = workspace_repo.active_workspaces_query_string()
|
||||
|
||||
await workspace_repo.get_active_workspaces()
|
||||
workspace_repo.container.query_items.assert_called_once_with(query=expected_query, parameters=None, enable_cross_partition_query=True)
|
||||
workspace_repo.container.query_items.assert_called_once_with(query=expected_query, parameters=None)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
@ -94,7 +94,7 @@ async def test_get_workspace_by_id_queries_db(workspace_repo, workspace):
|
|||
expected_query = f'SELECT * FROM c WHERE c.resourceType = "workspace" AND c.id = "{workspace.id}"'
|
||||
|
||||
await workspace_repo.get_workspace_by_id(workspace.id)
|
||||
workspace_repo.container.query_items.assert_called_once_with(query=expected_query, parameters=None, enable_cross_partition_query=True)
|
||||
workspace_repo.container.query_items.assert_called_once_with(query=expected_query, parameters=None)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
|
|
@ -4,10 +4,10 @@ import pytest
|
|||
import time
|
||||
|
||||
from mock import AsyncMock, patch
|
||||
from service_bus.airlock_request_status_update import AirlockStatusUpdater
|
||||
from models.domain.events import AirlockNotificationUserData, AirlockFile
|
||||
from models.domain.airlock_request import AirlockRequest, AirlockRequestStatus, AirlockRequestType
|
||||
from models.domain.workspace import Workspace
|
||||
from service_bus.airlock_request_status_update import receive_step_result_message_and_update_status
|
||||
from db.errors import EntityDoesNotExist
|
||||
from resources import strings
|
||||
|
||||
|
@ -108,21 +108,21 @@ class ServiceBusReceivedMessageMock:
|
|||
@patch('service_bus.airlock_request_status_update.AirlockRequestRepository.create')
|
||||
@patch('service_bus.airlock_request_status_update.WorkspaceRepository.create')
|
||||
@patch('logging.exception')
|
||||
@patch('service_bus.airlock_request_status_update.ServiceBusClient')
|
||||
@patch('fastapi.FastAPI')
|
||||
@patch("services.aad_authentication.AzureADAuthorization.get_workspace_role_assignment_details", return_value={"researcher_emails": ["researcher@outlook.com"], "owner_emails": ["owner@outlook.com"]})
|
||||
async def test_receiving_good_message(_, app, sb_client, logging_mock, workspace_repo, airlock_request_repo, eg_client):
|
||||
service_bus_received_message_mock = ServiceBusReceivedMessageMock(test_sb_step_result_message)
|
||||
async def test_receiving_good_message(_, app, logging_mock, workspace_repo, airlock_request_repo, eg_client):
|
||||
|
||||
sb_client().get_queue_receiver().receive_messages = AsyncMock(return_value=[service_bus_received_message_mock])
|
||||
sb_client().get_queue_receiver().complete_message = AsyncMock()
|
||||
eg_client().send = AsyncMock()
|
||||
expected_airlock_request = sample_airlock_request()
|
||||
airlock_request_repo.return_value.get_airlock_request_by_id.return_value = expected_airlock_request
|
||||
airlock_request_repo.return_value.update_airlock_request.return_value = sample_airlock_request(status=AirlockRequestStatus.InReview)
|
||||
workspace_repo.return_value.get_workspace_by_id.return_value = sample_workspace()
|
||||
await receive_step_result_message_and_update_status(app)
|
||||
|
||||
airlockStatusUpdater = AirlockStatusUpdater(app)
|
||||
await airlockStatusUpdater.init_repos()
|
||||
complete_message = await airlockStatusUpdater.process_message(ServiceBusReceivedMessageMock(test_sb_step_result_message))
|
||||
|
||||
assert complete_message is True
|
||||
airlock_request_repo.return_value.get_airlock_request_by_id.assert_called_once_with(test_sb_step_result_message["data"]["request_id"])
|
||||
airlock_request_repo.return_value.update_airlock_request.assert_called_once_with(
|
||||
original_request=expected_airlock_request,
|
||||
|
@ -134,22 +134,22 @@ async def test_receiving_good_message(_, app, sb_client, logging_mock, workspace
|
|||
review_user_resource=None)
|
||||
assert eg_client().send.call_count == 2
|
||||
logging_mock.assert_not_called()
|
||||
sb_client().get_queue_receiver().complete_message.assert_called_once_with(service_bus_received_message_mock)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("payload", test_data)
|
||||
@patch('service_bus.airlock_request_status_update.AirlockRequestRepository.create')
|
||||
@patch('service_bus.airlock_request_status_update.WorkspaceRepository.create')
|
||||
@patch('logging.exception')
|
||||
@patch('service_bus.airlock_request_status_update.ServiceBusClient')
|
||||
@patch('fastapi.FastAPI')
|
||||
async def test_receiving_bad_json_logs_error(app, sb_client, logging_mock, payload):
|
||||
async def test_receiving_bad_json_logs_error(app, logging_mock, workspace_repo, airlock_request_repo, payload):
|
||||
service_bus_received_message_mock = ServiceBusReceivedMessageMock(payload)
|
||||
sb_client().get_queue_receiver().receive_messages = AsyncMock(return_value=[service_bus_received_message_mock])
|
||||
sb_client().get_queue_receiver().complete_message = AsyncMock()
|
||||
await receive_step_result_message_and_update_status(app)
|
||||
airlockStatusUpdater = AirlockStatusUpdater(app)
|
||||
await airlockStatusUpdater.init_repos()
|
||||
complete_message = await airlockStatusUpdater.process_message(service_bus_received_message_mock)
|
||||
|
||||
assert complete_message is True
|
||||
error_message = logging_mock.call_args.args[0]
|
||||
assert error_message.startswith(strings.STEP_RESULT_MESSAGE_FORMAT_INCORRECT)
|
||||
sb_client().get_queue_receiver().complete_message.assert_called_once_with(service_bus_received_message_mock)
|
||||
|
||||
|
||||
@patch('service_bus.airlock_request_status_update.WorkspaceRepository.create')
|
||||
|
@ -160,50 +160,48 @@ async def test_receiving_bad_json_logs_error(app, sb_client, logging_mock, paylo
|
|||
async def test_updating_non_existent_airlock_request_error_is_logged(app, sb_client, logging_mock, airlock_request_repo, _):
|
||||
service_bus_received_message_mock = ServiceBusReceivedMessageMock(test_sb_step_result_message)
|
||||
|
||||
sb_client().get_queue_receiver().receive_messages = AsyncMock(return_value=[service_bus_received_message_mock])
|
||||
sb_client().get_queue_receiver().complete_message = AsyncMock()
|
||||
airlock_request_repo.return_value.get_airlock_request_by_id.side_effect = EntityDoesNotExist
|
||||
await receive_step_result_message_and_update_status(app)
|
||||
airlockStatusUpdater = AirlockStatusUpdater(app)
|
||||
await airlockStatusUpdater.init_repos()
|
||||
complete_message = await airlockStatusUpdater.process_message(service_bus_received_message_mock)
|
||||
|
||||
assert complete_message is True
|
||||
expected_error_message = strings.STEP_RESULT_ID_NOT_FOUND.format(test_sb_step_result_message["data"]["request_id"])
|
||||
logging_mock.assert_called_once_with(expected_error_message)
|
||||
sb_client().get_queue_receiver().complete_message.assert_called_once_with(service_bus_received_message_mock)
|
||||
|
||||
|
||||
@patch('service_bus.airlock_request_status_update.WorkspaceRepository.create')
|
||||
@patch('service_bus.airlock_request_status_update.AirlockRequestRepository.create')
|
||||
@patch('logging.exception')
|
||||
@patch('service_bus.airlock_request_status_update.ServiceBusClient')
|
||||
@patch('fastapi.FastAPI')
|
||||
async def test_when_updating_and_state_store_exception_error_is_logged(app, sb_client, logging_mock, airlock_request_repo, _):
|
||||
async def test_when_updating_and_state_store_exception_error_is_logged(app, logging_mock, airlock_request_repo, _):
|
||||
service_bus_received_message_mock = ServiceBusReceivedMessageMock(test_sb_step_result_message)
|
||||
|
||||
sb_client().get_queue_receiver().receive_messages = AsyncMock(return_value=[service_bus_received_message_mock])
|
||||
sb_client().get_queue_receiver().complete_message = AsyncMock()
|
||||
airlock_request_repo.return_value.get_airlock_request_by_id.side_effect = Exception
|
||||
await receive_step_result_message_and_update_status(app)
|
||||
airlockStatusUpdater = AirlockStatusUpdater(app)
|
||||
await airlockStatusUpdater.init_repos()
|
||||
complete_message = await airlockStatusUpdater.process_message(service_bus_received_message_mock)
|
||||
|
||||
assert complete_message is False
|
||||
logging_mock.assert_called_once_with("Failed updating request status")
|
||||
sb_client().get_queue_receiver().complete_message.assert_not_called()
|
||||
|
||||
|
||||
@patch('service_bus.airlock_request_status_update.WorkspaceRepository.create')
|
||||
@patch('service_bus.airlock_request_status_update.AirlockRequestRepository.create')
|
||||
@patch('logging.error')
|
||||
@patch('service_bus.airlock_request_status_update.ServiceBusClient')
|
||||
@patch('fastapi.FastAPI')
|
||||
async def test_when_updating_and_current_status_differs_from_status_in_state_store_error_is_logged(app, sb_client, logging_mock, airlock_request_repo, _):
|
||||
async def test_when_updating_and_current_status_differs_from_status_in_state_store_error_is_logged(app, logging_mock, airlock_request_repo, _):
|
||||
service_bus_received_message_mock = ServiceBusReceivedMessageMock(test_sb_step_result_message)
|
||||
|
||||
sb_client().get_queue_receiver().receive_messages = AsyncMock(return_value=[service_bus_received_message_mock])
|
||||
sb_client().get_queue_receiver().complete_message = AsyncMock()
|
||||
expected_airlock_request = sample_airlock_request(AirlockRequestStatus.Draft)
|
||||
airlock_request_repo.return_value.get_airlock_request_by_id.return_value = expected_airlock_request
|
||||
await receive_step_result_message_and_update_status(app)
|
||||
airlockStatusUpdater = AirlockStatusUpdater(app)
|
||||
await airlockStatusUpdater.init_repos()
|
||||
complete_message = await airlockStatusUpdater.process_message(service_bus_received_message_mock)
|
||||
|
||||
assert complete_message is False
|
||||
expected_error_message = strings.STEP_RESULT_MESSAGE_STATUS_DOES_NOT_MATCH.format(test_sb_step_result_message["data"]["request_id"], test_sb_step_result_message["data"]["completed_step"], expected_airlock_request.status)
|
||||
logging_mock.assert_called_once_with(expected_error_message)
|
||||
sb_client().get_queue_receiver().complete_message.assert_not_called()
|
||||
|
||||
|
||||
@patch('service_bus.airlock_request_status_update.WorkspaceRepository.create')
|
||||
|
@ -214,11 +212,11 @@ async def test_when_updating_and_current_status_differs_from_status_in_state_sto
|
|||
async def test_when_updating_and_status_update_is_illegal_error_is_logged(app, sb_client, logging_mock, airlock_request_repo, _):
|
||||
service_bus_received_message_mock = ServiceBusReceivedMessageMock(test_sb_step_result_message_with_invalid_status)
|
||||
|
||||
sb_client().get_queue_receiver().receive_messages = AsyncMock(return_value=[service_bus_received_message_mock])
|
||||
sb_client().get_queue_receiver().complete_message = AsyncMock()
|
||||
airlock_request_repo.return_value.get_airlock_request_by_id.side_effect = HTTPException(status_code=status.HTTP_400_BAD_REQUEST)
|
||||
await receive_step_result_message_and_update_status(app)
|
||||
airlockStatusUpdater = AirlockStatusUpdater(app)
|
||||
await airlockStatusUpdater.init_repos()
|
||||
complete_message = await airlockStatusUpdater.process_message(service_bus_received_message_mock)
|
||||
|
||||
assert complete_message is True
|
||||
expected_error_message = strings.STEP_RESULT_MESSAGE_INVALID_STATUS.format(test_sb_step_result_message_with_invalid_status["data"]["request_id"], test_sb_step_result_message_with_invalid_status["data"]["completed_step"], test_sb_step_result_message_with_invalid_status["data"]["new_status"])
|
||||
logging_mock.assert_called_once_with(expected_error_message)
|
||||
sb_client().get_queue_receiver().complete_message.assert_called_once_with(service_bus_received_message_mock)
|
||||
|
|
|
@ -4,7 +4,7 @@ import pytest_asyncio
|
|||
import time
|
||||
from resources import strings
|
||||
from services.airlock import validate_user_allowed_to_access_storage_account, get_required_permission, \
|
||||
validate_request_status, cancel_request, delete_review_user_resource
|
||||
validate_request_status, cancel_request, delete_review_user_resource, check_email_exists
|
||||
from models.domain.airlock_request import AirlockRequest, AirlockRequestStatus, AirlockRequestType, AirlockReview, AirlockReviewDecision, AirlockActions, AirlockReviewUserResource
|
||||
from tests_ma.test_api.conftest import create_workspace_owner_user, create_workspace_researcher_user, get_required_roles
|
||||
from mock import AsyncMock, patch, MagicMock
|
||||
|
@ -305,6 +305,19 @@ async def test_save_and_publish_event_airlock_request_raises_503_if_publish_even
|
|||
assert ex.value.status_code == status.HTTP_503_SERVICE_UNAVAILABLE
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize('role_assignment_details_mock_return', [{},
|
||||
{"AirlockManager": ["owner@outlook.com"]},
|
||||
{"WorkspaceResearcher": [], "AirlockManager": ["owner@outlook.com"]},
|
||||
{"WorkspaceResearcher": ["researcher@outlook.com"], "owner_emails": []},
|
||||
{"WorkspaceResearcher": ["researcher@outlook.com"]}])
|
||||
async def test_check_email_exists_raises_417_if_email_not_present(role_assignment_details_mock_return):
|
||||
role_assignment_details = role_assignment_details_mock_return
|
||||
with pytest.raises(HTTPException) as ex:
|
||||
check_email_exists(role_assignment_details)
|
||||
assert ex.value.status_code == status.HTTP_417_EXPECTATION_FAILED
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize('email_mock_return', [{},
|
||||
{"AirlockManager": ["owner@outlook.com"]},
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
# if you update this file, update the install_requires in setup.py as well
|
||||
click==8.1.3
|
||||
httpx~=0.23.0
|
||||
msal==1.20.0
|
||||
msal==1.22.0
|
||||
jmespath==1.0.1
|
||||
tabulate==0.9.0
|
||||
pygments==2.15.0
|
||||
PyJWT==2.6.0
|
||||
azure-cli-core==2.47.0
|
||||
azure-identity==1.12.0
|
||||
aiohttp==3.8.5
|
||||
pygments==2.16.1
|
||||
PyJWT==2.8.0
|
||||
azure-cli-core==2.50.0
|
||||
azure-identity==1.14.1
|
||||
aiohttp==3.8.6
|
||||
|
|
16
cli/setup.py
16
cli/setup.py
|
@ -4,7 +4,7 @@ from setuptools import find_packages
|
|||
from setuptools import setup
|
||||
|
||||
PROJECT = 'azure-tre-cli'
|
||||
VERSION = '0.1.4'
|
||||
VERSION = '0.2.0'
|
||||
|
||||
try:
|
||||
long_description = open('README.md', 'rt').read()
|
||||
|
@ -41,15 +41,15 @@ setup(
|
|||
provides=[],
|
||||
install_requires=[
|
||||
"click==8.1.3",
|
||||
"httpx~=0.23.1",
|
||||
"msal==1.20.0",
|
||||
"httpx==0.25.0",
|
||||
"msal==1.22.0",
|
||||
"jmespath==1.0.1",
|
||||
"tabulate==0.9.0",
|
||||
"pygments==2.15.0",
|
||||
"PyJWT==2.6.0",
|
||||
"azure-cli-core==2.47.0",
|
||||
"azure-identity==1.12.0",
|
||||
"aiohttp==3.8.5"
|
||||
"pygments==2.16.1",
|
||||
"PyJWT==2.8.0",
|
||||
"azure-cli-core==2.50.0",
|
||||
"azure-identity==1.14.1",
|
||||
"aiohttp==3.8.6"
|
||||
],
|
||||
|
||||
namespace_packages=[],
|
||||
|
|
|
@ -3,8 +3,7 @@ import logging
|
|||
from httpx import AsyncClient, Timeout
|
||||
import os
|
||||
from urllib.parse import urlparse
|
||||
import mimetypes
|
||||
from azure.storage.blob import ContentSettings
|
||||
from azure.storage.blob import BlobClient
|
||||
from airlock import strings
|
||||
from e2e_tests.helpers import get_auth_header, get_full_endpoint
|
||||
|
||||
|
@ -66,21 +65,12 @@ async def upload_blob_using_sas(file_path: str, sas_url: str):
|
|||
|
||||
blob_url = f"{storage_account_url}{container_name}/{file_name}?{parsed_sas_url.query}"
|
||||
LOGGER.info(f"uploading [{file_name}] to container [{blob_url}]")
|
||||
with open(file_path, "rb") as fh:
|
||||
headers = {"x-ms-blob-type": "BlockBlob"}
|
||||
content_type = ""
|
||||
if file_ext != "":
|
||||
content_type = ContentSettings(
|
||||
content_type=mimetypes.types_map[file_ext]
|
||||
).content_type
|
||||
|
||||
response = await client.put(
|
||||
url=blob_url,
|
||||
files={'upload-file': (file_name, fh, content_type)},
|
||||
headers=headers
|
||||
)
|
||||
LOGGER.info(f"response code: {response.status_code}")
|
||||
return response
|
||||
client = BlobClient.from_blob_url(blob_url)
|
||||
with open(file_name, 'rb') as data:
|
||||
response = client.upload_blob(data)
|
||||
|
||||
return response
|
||||
|
||||
|
||||
async def wait_for_status(
|
||||
|
|
|
@ -21,7 +21,12 @@ def pytest_addoption(parser):
|
|||
|
||||
@pytest.fixture(scope="session")
|
||||
def event_loop():
|
||||
return asyncio.get_event_loop()
|
||||
try:
|
||||
loop = asyncio.get_running_loop()
|
||||
except RuntimeError:
|
||||
loop = asyncio.new_event_loop()
|
||||
yield loop
|
||||
loop.close()
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
|
@ -111,7 +116,7 @@ async def clean_up_test_workspace_service(pre_created_workspace_service_id: str,
|
|||
@pytest.fixture(scope="session")
|
||||
async def setup_test_workspace(verify) -> Tuple[str, str, str]:
|
||||
pre_created_workspace_id = config.TEST_WORKSPACE_ID
|
||||
# Set up
|
||||
# Set up - uses a pre created app reg as has appropriate roles assigned
|
||||
workspace_path, workspace_id = await create_or_get_test_workspace(
|
||||
auth_type="Manual", verify=verify, pre_created_workspace_id=pre_created_workspace_id, client_id=config.TEST_WORKSPACE_APP_ID, client_secret=config.TEST_WORKSPACE_APP_SECRET)
|
||||
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
# API
|
||||
httpx~=0.23.0
|
||||
pytest==7.2.0
|
||||
pytest-asyncio==0.20.3
|
||||
starlette
|
||||
pytest-timeout==2.1.0
|
||||
pytest-xdist==3.2.1
|
||||
httpx==0.25.0
|
||||
pytest==7.4.3
|
||||
pytest-asyncio==0.21.1
|
||||
starlette==0.27.0
|
||||
pytest-timeout==2.2.0
|
||||
pytest-xdist==3.3.1
|
||||
backoff==2.2.1
|
||||
|
|
|
@ -51,16 +51,21 @@ async def submit_airlock_import_request(workspace_path: str, workspace_owner_tok
|
|||
wait_time = 30
|
||||
while not blob_uploaded:
|
||||
LOGGER.info(f"try #{i} to upload a blob to container [{container_url}]")
|
||||
upload_response = await upload_blob_using_sas(BLOB_FILE_PATH, container_url)
|
||||
|
||||
if upload_response.status_code == 404:
|
||||
try:
|
||||
await asyncio.sleep(5)
|
||||
upload_response = await upload_blob_using_sas(BLOB_FILE_PATH, container_url)
|
||||
if "etag" in upload_response:
|
||||
blob_uploaded = True
|
||||
else:
|
||||
raise Exception("upload failed")
|
||||
except ResourceNotFoundError:
|
||||
i += 1
|
||||
LOGGER.info(f"sleeping for {wait_time} sec until container would be created")
|
||||
await asyncio.sleep(wait_time)
|
||||
else:
|
||||
assert upload_response.status_code == 201
|
||||
LOGGER.info("upload blob succeeded")
|
||||
blob_uploaded = True
|
||||
pass
|
||||
except Exception as e:
|
||||
LOGGER.error(f"upload blob failed with exception: {e}")
|
||||
raise e
|
||||
|
||||
# submit request
|
||||
LOGGER.info("Submitting airlock request")
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
# Dev requirements
|
||||
flake8==6.0.0 # same as super linter
|
||||
pre-commit==3.2.2
|
||||
flake8==6.0.0 # ensure same as super linter
|
||||
pre-commit==3.5.0
|
||||
semantic-version==2.10.0
|
||||
|
||||
-r api_app/requirements.txt
|
||||
|
|
|
@ -1 +1 @@
|
|||
__version__ = "0.6.7"
|
||||
__version__ = "0.7.0"
|
||||
|
|
|
@ -18,7 +18,11 @@ UNWANTED_LOGGERS = [
|
|||
"azure.identity.aio._internal.decorators",
|
||||
"azure.identity.aio._credentials.chained",
|
||||
"azure.identity",
|
||||
"msal.token_cache"
|
||||
"msal.token_cache",
|
||||
# Remove these once the following PR is merged:
|
||||
# https://github.com/Azure/azure-sdk-for-python/pull/30832
|
||||
# Issue: https://github.com/microsoft/AzureTRE/issues/3766
|
||||
"azure.servicebus._pyamqp.aio._session_async"
|
||||
]
|
||||
|
||||
LOGGERS_FOR_ERRORS_ONLY = [
|
||||
|
@ -34,7 +38,12 @@ LOGGERS_FOR_ERRORS_ONLY = [
|
|||
"uamqp.async_ops.session_async",
|
||||
"uamqp.sender",
|
||||
"uamqp.client",
|
||||
"azure.servicebus.aio._base_handler_async"
|
||||
"azure.servicebus.aio._base_handler_async",
|
||||
"azure.servicebus._pyamqp.aio._cbs_async",
|
||||
"azure.servicebus._pyamqp.aio._connection_async",
|
||||
"azure.servicebus._pyamqp.aio._link_async",
|
||||
"azure.servicebus._pyamqp.aio._management_link_async",
|
||||
"azure.servicebus._pyamqp.aio._session_async"
|
||||
]
|
||||
|
||||
debug = os.environ.get('DEBUG', 'False').lower() in ('true', '1')
|
||||
|
|
|
@ -6,7 +6,7 @@ SHELL ["/bin/bash", "-o", "pipefail", "-c"]
|
|||
RUN rm -f /etc/apt/apt.conf.d/docker-clean; echo 'Binary::apt::APT::Keep-Downloaded-Packages "true";' > /etc/apt/apt.conf.d/keep-cache
|
||||
|
||||
# Install Azure CLI
|
||||
ARG AZURE_CLI_VERSION=2.47.0-1~bullseye
|
||||
ARG AZURE_CLI_VERSION=2.50.0-1~bullseye
|
||||
COPY scripts/azure-cli.sh /tmp/
|
||||
RUN --mount=type=cache,target=/var/cache/apt --mount=type=cache,target=/var/lib/apt \
|
||||
export AZURE_CLI_VERSION=${AZURE_CLI_VERSION} \
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
azure-servicebus==7.8.1
|
||||
opencensus-ext-azure==1.1.7
|
||||
azure-servicebus==7.11.3
|
||||
opencensus-ext-azure==1.1.11
|
||||
opencensus-ext-logging==0.1.1
|
||||
azure-identity==1.12.0
|
||||
aiohttp==3.8.5
|
||||
azure-cli-core==2.46.0
|
||||
azure-identity==1.14.1
|
||||
aiohttp==3.8.6
|
||||
azure-cli-core==2.50.0
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
---
|
||||
schemaVersion: 1.0.0
|
||||
name: tre-workspace-airlock-import-review
|
||||
version: 0.12.15
|
||||
version: 0.12.16
|
||||
description: "A workspace to do Airlock Data Import Reviews for Azure TRE"
|
||||
dockerfile: Dockerfile.tmpl
|
||||
registry: azuretre
|
||||
|
|
|
@ -17,11 +17,6 @@ data "azurerm_storage_account" "sa_import_inprogress" {
|
|||
resource_group_name = local.core_resource_group_name
|
||||
}
|
||||
|
||||
data "azurerm_private_dns_zone" "blobcore" {
|
||||
name = module.terraform_azurerm_environment_configuration.private_links["privatelink.blob.core.windows.net"]
|
||||
resource_group_name = local.core_resource_group_name
|
||||
}
|
||||
|
||||
resource "azurerm_private_endpoint" "sa_import_inprogress_pe" {
|
||||
name = "stg-ip-import-blob-${local.workspace_resource_name_suffix}"
|
||||
location = var.location
|
||||
|
@ -45,15 +40,8 @@ resource "azurerm_private_dns_zone" "stg_import_inprogress_blob" {
|
|||
resource_group_name = azurerm_resource_group.ws.name
|
||||
|
||||
tags = local.tre_workspace_tags
|
||||
}
|
||||
|
||||
resource "azurerm_private_dns_zone_virtual_network_link" "stg_import_inprogress_blob" {
|
||||
name = "vnl-stg-ip-import-blob-${local.workspace_resource_name_suffix}"
|
||||
resource_group_name = azurerm_resource_group.ws.name
|
||||
private_dns_zone_name = azurerm_private_dns_zone.stg_import_inprogress_blob.name
|
||||
virtual_network_id = module.network.vnet_id
|
||||
|
||||
tags = local.tre_workspace_tags
|
||||
depends_on = [ azurerm_private_endpoint.sa_import_inprogress_pe ]
|
||||
}
|
||||
|
||||
resource "azurerm_private_dns_a_record" "stg_import_inprogress_blob" {
|
||||
|
@ -64,4 +52,16 @@ resource "azurerm_private_dns_a_record" "stg_import_inprogress_blob" {
|
|||
records = [azurerm_private_endpoint.sa_import_inprogress_pe.private_service_connection[0].private_ip_address]
|
||||
|
||||
tags = local.tre_workspace_tags
|
||||
|
||||
}
|
||||
|
||||
resource "azurerm_private_dns_zone_virtual_network_link" "stg_import_inprogress_blob" {
|
||||
name = "vnl-stg-ip-import-blob-${local.workspace_resource_name_suffix}"
|
||||
resource_group_name = azurerm_resource_group.ws.name
|
||||
private_dns_zone_name = azurerm_private_dns_zone.stg_import_inprogress_blob.name
|
||||
virtual_network_id = module.network.vnet_id
|
||||
|
||||
tags = local.tre_workspace_tags
|
||||
|
||||
depends_on = [ azurerm_private_dns_a_record.stg_import_inprogress_blob ]
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче