Align 'Airlock Get Container URL' permissions with 'Create Airlock Request's permissions (#3208)

* add test to add coverage to get_allowed_actions

* fix test argument name

* add test that verifies that if a user can create airlock request, they can also upload files to that request

* add ws owner to roles that can access container link - as a ws owner can create airlock requests

* minor change in lines order

* update api version

* update changelog
This commit is contained in:
Yuval Yaron 2023-02-08 18:12:47 +02:00 коммит произвёл GitHub
Родитель 93dfd25b44
Коммит 06ec5a8118
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
6 изменённых файлов: 72 добавлений и 16 удалений

Просмотреть файл

@ -45,6 +45,7 @@ BUG FIXES:
* Workspace module dependency to resolve _AnotherOperationInProgress_ errors [#3194](https://github.com/microsoft/AzureTRE/pull/3194)
* Skip Certs shared service E2E on Friday & Saturday due to LetsEncrypt limits [#3203](https://github.com/microsoft/AzureTRE/pull/3203)
* Create Workspace AppInsights via AzAPI provider due to an issue with AzureRM [#3207](https://github.com/microsoft/AzureTRE/pull/3207)
* 'Workspace Owner' is now able to access Airlock request's SAS URL even if the request is not in review [#3208](https://github.com/microsoft/AzureTRE/pull/3208)
COMPONENTS:

Просмотреть файл

@ -1 +1 @@
__version__ = "0.11.4"
__version__ = "0.11.5"

Просмотреть файл

@ -66,11 +66,16 @@ def get_account_by_request(airlock_request: AirlockRequest, workspace: Workspace
def validate_user_allowed_to_access_storage_account(user: User, airlock_request: AirlockRequest):
if "WorkspaceResearcher" not in user.roles and airlock_request.status != AirlockRequestStatus.InReview:
allowed_roles = []
if (airlock_request.status == AirlockRequestStatus.InReview):
allowed_roles = ["AirlockManager", "WorkspaceOwner"]
else:
allowed_roles = ["WorkspaceResearcher", "WorkspaceOwner"]
if not _user_has_one_of_roles(user=user, roles=allowed_roles):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=strings.AIRLOCK_UNAUTHORIZED_TO_SA)
if "WorkspaceOwner" not in user.roles and "AirlockManager" not in user.roles and airlock_request.status == AirlockRequestStatus.InReview:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=strings.AIRLOCK_UNAUTHORIZED_TO_SA)
return
@ -460,3 +465,7 @@ async def cancel_request(airlock_request: AirlockRequest, user: User, workspace:
updated_request = await update_and_publish_event_airlock_request(airlock_request=airlock_request, airlock_request_repo=airlock_request_repo, updated_by=user, workspace=workspace, new_status=AirlockRequestStatus.Cancelled)
await delete_all_review_user_resources(airlock_request, user_resource_repo, workspace_service_repo, resource_template_repo, operations_repo, resource_history_repo, user)
return updated_request
def _user_has_one_of_roles(user: User, roles) -> bool:
return any(role in roles for role in user.roles)

Просмотреть файл

@ -76,6 +76,12 @@ def override_get_user():
return user
def get_required_roles(endpoint):
dependencies = list(filter(lambda x: hasattr(x.dependency, 'require_one_of_roles'), endpoint.__defaults__))
required_roles = dependencies[0].dependency.require_one_of_roles
return required_roles
@pytest.fixture(scope='module')
def admin_user():
def inner():

Просмотреть файл

@ -6,6 +6,8 @@ from fastapi import status
from azure.core.exceptions import HttpResponseError
from azure.cosmos.exceptions import CosmosResourceNotFoundError
from tests_ma.test_api.conftest import create_test_user, get_required_roles
from api.routes.airlock import create_draft_request
from db.errors import EntityDoesNotExist, UnableToAccessDatabase
from models.domain.airlock_request import AirlockRequest, AirlockRequestStatus, AirlockReview, AirlockReviewDecision, AirlockReviewUserResource
from models.domain.user_resource import UserResource
@ -116,6 +118,14 @@ def sample_review_resource(healthy=True) -> UserResource:
return resource
def create_test_user_with_roles(roles):
def inner():
user = create_test_user()
user.roles = roles
return user
return inner
class TestAirlockRoutesThatRequireOwnerOrResearcherRights():
@pytest_asyncio.fixture(autouse=True, scope='class')
def log_in_with_researcher_user(self, app, researcher_user):
@ -405,3 +415,25 @@ class TestAirlockRoutesThatRequireAirlockManagerRights():
await client.post(app.url_path_for(strings.API_CREATE_AIRLOCK_REVIEW_USER_RESOURCE, workspace_id=WORKSPACE_ID, airlock_request_id=AIRLOCK_REQUEST_ID))
assert delete_review_resource_mock.call_count == 1
assert deploy_resource_mock.call_count == 1
class TestAirlockRoutesPermissions():
@pytest_asyncio.fixture()
def log_in_with_user(self, app):
def inner(user):
app.dependency_overrides[get_current_workspace_owner_or_researcher_user] = user
app.dependency_overrides[get_current_airlock_manager_user] = user
app.dependency_overrides[get_current_workspace_owner_or_researcher_user_or_airlock_manager] = user
return inner
@pytest.mark.parametrize("role", (role for role in get_required_roles(endpoint=create_draft_request)))
@patch("api.routes.workspaces.OperationRepository.resource_has_deployed_operation")
@patch("api.dependencies.workspaces.WorkspaceRepository.get_workspace_by_id", return_value=sample_workspace(WORKSPACE_ID))
@patch("api.routes.airlock.AirlockRequestRepository.read_item_by_id", return_value=sample_airlock_request_object(status=AirlockRequestStatus.Draft))
@patch("services.airlock.get_airlock_request_container_sas_token", return_value="valid-sas-token")
async def test_get_airlock_container_link_is_accessible_to_every_role_that_can_create_request(self, _, __, ___, ____, app, client, log_in_with_user, role):
user = create_test_user_with_roles([role])
log_in_with_user(user)
response = await client.get(app.url_path_for(strings.API_AIRLOCK_REQUEST_LINK, workspace_id=WORKSPACE_ID, airlock_request_id=AIRLOCK_REQUEST_ID))
assert response.status_code == status.HTTP_200_OK

Просмотреть файл

@ -6,7 +6,7 @@ from resources import strings
from services.airlock import validate_user_allowed_to_access_storage_account, get_required_permission, \
validate_request_status, cancel_request, delete_review_user_resource
from models.domain.airlock_request import AirlockRequest, AirlockRequestStatus, AirlockRequestType, AirlockReview, AirlockReviewDecision, AirlockActions, AirlockReviewUserResource
from tests_ma.test_api.conftest import create_workspace_owner_user, create_workspace_researcher_user
from tests_ma.test_api.conftest import create_workspace_owner_user, create_workspace_researcher_user, get_required_roles
from mock import AsyncMock, patch, MagicMock
from models.domain.events import AirlockNotificationData, AirlockNotificationUserData, StatusChangedData, \
AirlockNotificationRequestData, AirlockNotificationWorkspaceData, AirlockFile
@ -17,7 +17,7 @@ from models.domain.workspace import Workspace
from tests_ma.test_api.conftest import create_test_user, create_workspace_airlock_manager_user
from azure.eventgrid import EventGridEvent
from api.routes.airlock import create_airlock_review, create_cancel_request, create_submit_request
from services.aad_authentication import AzureADAuthorization
WORKSPACE_ID = "abc000d3-82da-4bfc-b6e9-9a7853ef753e"
AIRLOCK_REQUEST_ID = "5dbc15ae-40e1-49a5-834b-595f59d626b7"
@ -25,6 +25,7 @@ AIRLOCK_REVIEW_ID = "96d909c5-e913-4c05-ae53-668a702ba2e5"
USER_RESOURCE_ID = "cce59042-1dee-42dc-9388-6db846feeb3b"
WORKSPACE_SERVICE_ID = "30f2fefa-e7bb-4e5b-93aa-e50bb037502a"
CURRENT_TIME = time.time()
ALL_ROLES = AzureADAuthorization.WORKSPACE_ROLES_DICT.keys()
@pytest_asyncio.fixture
@ -138,19 +139,12 @@ def sample_airlock_review(review_decision=AirlockReviewDecision.Approved):
return airlock_review
def get_required_roles(endpoint):
dependencies = list(filter(lambda x: hasattr(x.dependency, 'require_one_of_roles'), endpoint.__defaults__))
required_roles = dependencies[0].dependency.require_one_of_roles
return required_roles
def test_validate_user_is_allowed_to_access_sa_blocks_access_as_expected():
# Workspace owner can access only in review
ws_owner_user = create_workspace_owner_user()
airlock_manager_user = create_workspace_airlock_manager_user()
draft_airlock_request = sample_airlock_request()
with pytest.raises(HTTPException) as ex:
validate_user_allowed_to_access_storage_account(
user=ws_owner_user,
user=airlock_manager_user,
airlock_request=draft_airlock_request
)
@ -168,7 +162,6 @@ def test_validate_user_is_allowed_to_access_sa_blocks_access_as_expected():
def test_validate_user_is_allowed_to_access_grants_access_to_user_with_a_valid_role():
# Workspace owner can access only in review
ws_owner_user = create_workspace_owner_user()
draft_airlock_request = sample_airlock_request(AirlockRequestStatus.InReview)
@ -466,6 +459,21 @@ async def test_get_allowed_actions_requires_same_roles_as_endpoint(action, requi
assert action in allowed_actions
@pytest.mark.asyncio
@pytest.mark.parametrize("action, endpoint_roles, airlock_request_repo_mock", [
(AirlockActions.Review, get_required_roles(endpoint=create_airlock_review), airlock_request_repo_mock),
(AirlockActions.Cancel, get_required_roles(endpoint=create_cancel_request), airlock_request_repo_mock),
(AirlockActions.Submit, get_required_roles(endpoint=create_submit_request), airlock_request_repo_mock)])
async def test_get_allowed_actions_does_not_return_actions_that_are_forbidden_to_the_user_role(action, endpoint_roles, airlock_request_repo_mock):
airlock_request_repo_mock.validate_status_update = MagicMock(return_value=True)
user = create_test_user()
forbidden_roles = [role for role in ALL_ROLES if role not in endpoint_roles]
for forbidden_role in forbidden_roles:
user.roles = [forbidden_role]
allowed_actions = get_allowed_actions(request=sample_airlock_request(), user=user, airlock_request_repo=airlock_request_repo_mock)
assert action not in allowed_actions
@pytest.mark.asyncio
@patch("services.airlock.delete_review_user_resource")
@patch("services.airlock.update_and_publish_event_airlock_request")