Prepend `DAG:` to dag permissions (#11189)

This adds the prefix DAG: to newly created dag permissions. It supports checking permissions on both prefixed and un-prefixed DAG permission names.

This will make it easier to identify permissions that related to granular dag access.

This PR does not modify existing dag permission names to use the new prefixed naming scheme. That will come in a separate PR.

Related to issue #10469
This commit is contained in:
James Timmins 2020-10-15 16:32:38 -07:00 коммит произвёл GitHub
Родитель 89d4dd8802
Коммит 7ab62100af
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
29 изменённых файлов: 641 добавлений и 257 удалений

Просмотреть файл

@ -50,6 +50,16 @@ assists users migrating to a new version.
## Airflow Master
### Change to Permissions
The DAG-level permission actions, `can_dag_read` and `can_dag_edit` are going away. They are being replaced with `can_read` and `can_edit`. When a role is given DAG-level access, the resource name (or "view menu", in Flask App-Builder parlance) will now be prefixed with `DAG:`. So the action `can_dag_read` on `example_dag_id`, is now represented as `can_read` on `DAG:example_dag_id`.
*As part of running `db upgrade`, existing permissions will be migrated for you.*
When DAGs are initialized with the `access_control` variable set, any usage of the old permission names will automatically be updated in the database, so this won't be a breaking change. A DeprecationWarning will be raised.
### Changes to Airflow Plugins
If you are using Airflow Plugins and were passing `admin_views` & `menu_links` which were used in the
non-RBAC UI (`flask-admin` based UI), upto it to use `flask_appbuilder_views` and `flask_appbuilder_menu_links`.

Просмотреть файл

@ -28,10 +28,11 @@ from airflow.api_connexion.schemas.dag_schema import (
dags_collection_schema,
)
from airflow.models.dag import DagModel
from airflow.security import permissions
from airflow.utils.session import provide_session
@security.requires_access([("can_read", "Dag")])
@security.requires_access([("can_read", permissions.RESOURCE_DAGS)])
@provide_session
def get_dag(dag_id, session):
"""
@ -45,7 +46,7 @@ def get_dag(dag_id, session):
return dag_schema.dump(dag)
@security.requires_access([("can_read", "Dag")])
@security.requires_access([("can_read", permissions.RESOURCE_DAGS)])
def get_dag_details(dag_id):
"""
Get details of DAG.
@ -56,7 +57,7 @@ def get_dag_details(dag_id):
return dag_detail_schema.dump(dag)
@security.requires_access([("can_read", "Dag")])
@security.requires_access([("can_read", permissions.RESOURCE_DAGS)])
@format_parameters({'limit': check_limit})
def get_dags(limit, offset=0):
"""
@ -69,7 +70,7 @@ def get_dags(limit, offset=0):
return dags_collection_schema.dump(DAGCollection(dags=dags, total_entries=total_entries))
@security.requires_access([("can_edit", "Dag")])
@security.requires_access([("can_edit", permissions.RESOURCE_DAGS)])
@provide_session
def patch_dag(session, dag_id, update_mask=None):
"""

Просмотреть файл

@ -28,11 +28,12 @@ from airflow.api_connexion.schemas.dag_run_schema import (
dagruns_batch_form_schema,
)
from airflow.models import DagModel, DagRun
from airflow.security import permissions
from airflow.utils.session import provide_session
from airflow.utils.types import DagRunType
@security.requires_access([("can_read", "Dag"), ("can_delete", "DagRun")])
@security.requires_access([("can_read", permissions.RESOURCE_DAGS), ("can_delete", "DagRun")])
@provide_session
def delete_dag_run(dag_id, dag_run_id, session):
"""
@ -43,7 +44,7 @@ def delete_dag_run(dag_id, dag_run_id, session):
return NoContent, 204
@security.requires_access([("can_read", "Dag"), ("can_read", "DagRun")])
@security.requires_access([("can_read", permissions.RESOURCE_DAGS), ("can_read", "DagRun")])
@provide_session
def get_dag_run(dag_id, dag_run_id, session):
"""
@ -58,7 +59,7 @@ def get_dag_run(dag_id, dag_run_id, session):
return dagrun_schema.dump(dag_run)
@security.requires_access([("can_read", "Dag"), ("can_read", "DagRun")])
@security.requires_access([("can_read", permissions.RESOURCE_DAGS), ("can_read", "DagRun")])
@format_parameters(
{
'start_date_gte': format_datetime,
@ -157,7 +158,7 @@ def _apply_date_filters_to_query(
return query
@security.requires_access([("can_read", "Dag"), ("can_read", "DagRun")])
@security.requires_access([("can_read", permissions.RESOURCE_DAGS), ("can_read", "DagRun")])
@provide_session
def get_dag_runs_batch(session):
"""
@ -193,7 +194,7 @@ def get_dag_runs_batch(session):
return dagrun_collection_schema.dump(DAGRunCollection(dag_runs=dag_runs, total_entries=total_entries))
@security.requires_access([("can_read", "Dag"), ("can_create", "DagRun")])
@security.requires_access([("can_read", permissions.RESOURCE_DAGS), ("can_create", "DagRun")])
@provide_session
def post_dag_run(dag_id, session):
"""

Просмотреть файл

@ -23,12 +23,13 @@ from airflow.api_connexion.exceptions import NotFound
from airflow.exceptions import TaskNotFound
from airflow.models.dagbag import DagBag
from airflow.models.dagrun import DagRun as DR
from airflow.security import permissions
from airflow.utils.session import provide_session
@security.requires_access(
[
('can_read', 'Dag'),
('can_read', permissions.RESOURCE_DAGS),
('can_read', 'DagRun'),
('can_read', 'Task'),
('can_read', 'TaskInstance'),

Просмотреть файл

@ -23,11 +23,14 @@ from airflow.api_connexion import security
from airflow.api_connexion.exceptions import BadRequest, NotFound
from airflow.api_connexion.schemas.log_schema import LogResponseObject, logs_schema
from airflow.models import DagRun
from airflow.security import permissions
from airflow.utils.log.log_reader import TaskLogReader
from airflow.utils.session import provide_session
@security.requires_access([('can_read', 'Dag'), ('can_read', 'DagRun'), ('can_read', 'Task')])
@security.requires_access(
[('can_read', permissions.RESOURCE_DAGS), ('can_read', 'DagRun'), ('can_read', 'Task')]
)
@provide_session
def get_log(session, dag_id, dag_run_id, task_id, task_try_number, full_content=False, token=None):
"""

Просмотреть файл

@ -21,9 +21,10 @@ from airflow.api_connexion import security
from airflow.api_connexion.exceptions import NotFound
from airflow.api_connexion.schemas.task_schema import TaskCollection, task_collection_schema, task_schema
from airflow.exceptions import TaskNotFound
from airflow.security import permissions
@security.requires_access([("can_read", "Dag"), ("can_read", "Task")])
@security.requires_access([("can_read", permissions.RESOURCE_DAGS), ("can_read", "Task")])
def get_task(dag_id, task_id):
"""
Get simplified representation of a task.
@ -39,7 +40,7 @@ def get_task(dag_id, task_id):
return task_schema.dump(task)
@security.requires_access([("can_read", "Dag"), ("can_read", "Task")])
@security.requires_access([("can_read", permissions.RESOURCE_DAGS), ("can_read", "Task")])
def get_tasks(dag_id):
"""
Get tasks for DAG

Просмотреть файл

@ -37,13 +37,14 @@ from airflow.api_connexion.schemas.task_instance_schema import (
from airflow.models.dagrun import DagRun as DR
from airflow.models.taskinstance import clear_task_instances, TaskInstance as TI
from airflow.models import SlaMiss
from airflow.security import permissions
from airflow.utils.state import State
from airflow.utils.session import provide_session
@security.requires_access(
[
("can_read", "Dag"),
("can_read", permissions.RESOURCE_DAGS),
("can_read", "DagRun"),
("can_read", "Task"),
]
@ -101,7 +102,7 @@ def _apply_range_filter(query, key, value_range: Tuple[Any, Any]):
)
@security.requires_access(
[
("can_read", "Dag"),
("can_read", permissions.RESOURCE_DAGS),
("can_read", "DagRun"),
("can_read", "Task"),
]
@ -169,7 +170,9 @@ def get_task_instances(
)
@security.requires_access([("can_read", "Dag"), ("can_read", "DagRun"), ("can_read", "Task")])
@security.requires_access(
[("can_read", permissions.RESOURCE_DAGS), ("can_read", "DagRun"), ("can_read", "Task")]
)
@provide_session
def get_task_instances_batch(session=None):
"""
@ -223,7 +226,9 @@ def get_task_instances_batch(session=None):
)
@security.requires_access([("can_read", "Dag"), ("can_read", "DagRun"), ("can_edit", "Task")])
@security.requires_access(
[("can_read", permissions.RESOURCE_DAGS), ("can_read", "DagRun"), ("can_edit", "Task")]
)
@provide_session
def post_clear_task_instances(dag_id: str, session=None):
"""
@ -263,7 +268,9 @@ def post_clear_task_instances(dag_id: str, session=None):
)
@security.requires_access([("can_read", "Dag"), ("can_read", "DagRun"), ("can_edit", "Task")])
@security.requires_access(
[("can_read", permissions.RESOURCE_DAGS), ("can_read", "DagRun"), ("can_edit", "Task")]
)
@provide_session
def post_set_task_instances_state(dag_id, session):
"""Set a state of task instances."""

Просмотреть файл

@ -31,11 +31,17 @@ from airflow.api_connexion.schemas.xcom_schema import (
xcom_collection_schema,
)
from airflow.models import DagRun as DR, XCom
from airflow.security import permissions
from airflow.utils.session import provide_session
@security.requires_access(
[("can_read", "Dag"), ("can_read", "DagRun"), ("can_read", "Task"), ("can_read", "XCom")]
[
("can_read", permissions.RESOURCE_DAGS),
("can_read", "DagRun"),
("can_read", "Task"),
("can_read", "XCom"),
]
)
@format_parameters({'limit': check_limit})
@provide_session
@ -71,7 +77,12 @@ def get_xcom_entries(
@security.requires_access(
[("can_read", "Dag"), ("can_read", "DagRun"), ("can_read", "Task"), ("can_read", "XCom")]
[
("can_read", permissions.RESOURCE_DAGS),
("can_read", "DagRun"),
("can_read", "Task"),
("can_read", "XCom"),
]
)
@provide_session
def get_xcom_entry(

Просмотреть файл

@ -20,6 +20,7 @@ from typing import Callable, Optional, Sequence, Tuple, TypeVar, cast
from flask import Response, current_app, g
from airflow.security.permissions import RESOURCE_DAGS
from airflow.api_connexion.exceptions import PermissionDenied, Unauthenticated
T = TypeVar("T", bound=Callable) # pylint: disable=invalid-name
@ -38,7 +39,7 @@ def can_access_any_dags(action: str, dag_id: Optional[int] = None) -> bool:
"""Checks if user has read or write access to some dags."""
appbuilder = current_app.appbuilder
if dag_id and dag_id != '~':
return appbuilder.sm.has_access(action, dag_id)
return appbuilder.sm.has_access(action, appbuilder.sm.prefixed_dag_id(dag_id))
user = g.user
if action == 'can_read':
@ -54,7 +55,7 @@ def check_authorization(
return
appbuilder = current_app.appbuilder
for permission in permissions:
if permission in (('can_read', 'Dag'), ('can_edit', 'Dag')):
if permission in (('can_read', RESOURCE_DAGS), ('can_edit', RESOURCE_DAGS)):
can_access_all_dags = appbuilder.sm.has_access(*permission)
if can_access_all_dags:
continue
@ -79,7 +80,6 @@ def requires_access(permissions: Optional[Sequence[Tuple[str, str]]] = None) ->
def requires_access_decorator(func: T):
@wraps(func)
def decorated(*args, **kwargs):
check_authentication()
check_authorization(permissions, kwargs.get('dag_id'))

Просмотреть файл

@ -0,0 +1,115 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Prefix DAG permissions.
Revision ID: 849da589634d
Revises: 52d53670a240
Create Date: 2020-10-01 17:25:10.006322
"""
from airflow.security import permissions
from airflow.www.app import cached_app
# revision identifiers, used by Alembic.
revision = '849da589634d'
down_revision = '52d53670a240'
branch_labels = None
depends_on = None
def upgrade(): # noqa: D103
permissions = ['can_dag_read', 'can_dag_edit']
view_menus = cached_app().appbuilder.sm.get_all_view_menu()
convert_permissions(permissions, view_menus, upgrade_action, upgrade_dag_id)
def downgrade(): # noqa: D103
permissions = ['can_read', 'can_edit']
vms = cached_app().appbuilder.sm.get_all_view_menu()
view_menus = [
vm for vm in vms if (vm.name == permissions.RESOURCE_DAGS or vm.name.startswith('DAG:'))
]
convert_permissions(permissions, view_menus, downgrade_action, downgrade_dag_id)
def upgrade_dag_id(dag_id):
"""Adds the 'DAG:' prefix to a DAG view if appropriate."""
if dag_id == 'all_dags':
return permissions.RESOURCE_DAGS
if dag_id.startswith("DAG:"):
return dag_id
return f"DAG:{dag_id}"
def downgrade_dag_id(dag_id):
"""Removes the 'DAG:' prefix from a DAG view name to return the DAG id."""
if dag_id == permissions.RESOURCE_DAGS:
return 'all_dags'
if dag_id.startswith("DAG:"):
return dag_id[len("DAG:"):]
return dag_id
def upgrade_action(action):
"""Converts the a DAG permission name from the old style to the new style."""
if action == 'can_dag_read':
return 'can_read'
return 'can_edit'
def downgrade_action(action):
"""Converts the a DAG permission name from the old style to the new style."""
if action == 'can_read':
return 'can_dag_read'
return 'can_dag_edit'
def convert_permissions(permissions, view_menus, convert_action, convert_dag_id):
"""Creates new empty role in DB"""
appbuilder = cached_app().appbuilder # pylint: disable=no-member
roles = appbuilder.sm.get_all_roles()
views_to_remove = set()
for permission_name in permissions: # pylint: disable=too-many-nested-blocks
for view_menu in view_menus:
view_name = view_menu.name
old_pvm = appbuilder.sm.find_permission_view_menu(permission_name, view_name)
if not old_pvm:
continue
views_to_remove.add(view_name)
new_permission_name = convert_action(permission_name)
new_pvm = appbuilder.sm.add_permission_view_menu(new_permission_name, convert_dag_id(view_name))
for role in roles:
if appbuilder.sm.exist_permission_on_roles(view_name, permission_name, [role.id]):
appbuilder.sm.add_permission_role(role, new_pvm)
appbuilder.sm.del_permission_role(role, old_pvm)
print(f"DELETING: {role.name} ----> {view_name}.{permission_name}")
appbuilder.sm.del_permission_view_menu(permission_name, view_name)
print(f"DELETING: perm_view ----> {view_name}.{permission_name}")
for view_name in views_to_remove:
if appbuilder.sm.find_view_menu(view_name):
appbuilder.sm.del_view_menu(view_name)
print(f"DELETING: view_menu ----> {view_name}")
if 'can_dag_read' in permissions:
for permission_name in permissions:
if appbuilder.sm.find_permission(permission_name):
appbuilder.sm.del_permission(permission_name)
print(f"DELETING: permission ----> {permission_name}")

Просмотреть файл

@ -51,6 +51,7 @@ from airflow.models.dagcode import DagCode
from airflow.models.dagpickle import DagPickle
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import Context, TaskInstance, clear_task_instances
from airflow.security import permissions
from airflow.stats import Stats
from airflow.utils import timezone
from airflow.utils.dates import cron_presets, date_range as utils_date_range
@ -176,7 +177,7 @@ class DAG(BaseDag, LoggingMixin):
that it is executed when the dag succeeds.
:type on_success_callback: callable
:param access_control: Specify optional DAG-level permissions, e.g.,
"{'role1': {'can_dag_read'}, 'role2': {'can_dag_read', 'can_dag_edit'}}"
"{'role1': {'can_read'}, 'role2': {'can_read', 'can_edit'}}"
:type access_control: dict
:param is_paused_upon_creation: Specifies if the dag is paused when created for the first time.
If the dag exists already, this flag will be ignored. If this optional parameter
@ -332,7 +333,7 @@ class DAG(BaseDag, LoggingMixin):
self.on_failure_callback = on_failure_callback
self.doc_md = doc_md
self._access_control = access_control
self._access_control = DAG._upgrade_outdated_dag_access_control(access_control)
self.is_paused_upon_creation = is_paused_upon_creation
self.jinja_environment_kwargs = jinja_environment_kwargs
@ -382,6 +383,32 @@ class DAG(BaseDag, LoggingMixin):
# /Context Manager ----------------------------------------------
@staticmethod
def _upgrade_outdated_dag_access_control(access_control=None):
"""
Looks for outdated dag level permissions (can_dag_read and can_dag_edit) in DAG
access_controls (for example, {'role1': {'can_dag_read'}, 'role2': {'can_dag_read', 'can_dag_edit'}})
and replaces them with updated permissions (can_read and can_edit).
"""
if not access_control:
return None
new_perm_mapping = {
permissions.DEPRECATED_ACTION_CAN_DAG_READ: permissions.ACTION_CAN_READ,
permissions.DEPRECATED_ACTION_CAN_DAG_EDIT: permissions.ACTION_CAN_EDIT,
}
updated_access_control = {}
for role, perms in access_control.items():
updated_access_control[role] = {new_perm_mapping.get(perm, perm) for perm in perms}
if access_control != updated_access_control:
warnings.warn(
"The 'can_dag_read' and 'can_dag_edit' permissions are deprecated. "
"Please use 'can_read' and 'can_edit', respectively.",
DeprecationWarning, stacklevel=3
)
return updated_access_control
def date_range(
self,
start_date: datetime,
@ -651,7 +678,7 @@ class DAG(BaseDag, LoggingMixin):
@access_control.setter
def access_control(self, value):
self._access_control = value
self._access_control = DAG._upgrade_outdated_dag_access_control(value)
@property
def description(self) -> Optional[str]:

Просмотреть файл

@ -0,0 +1,28 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Resource Constants
RESOURCE_DAGS = 'Dags'
RESOURCE_DAG_PREFIX = 'DAG:'
# Action Constants
ACTION_CAN_CREATE = 'can_create'
ACTION_CAN_READ = 'can_read'
ACTION_CAN_EDIT = 'can_edit'
ACTION_CAN_DELETE = 'can_delete'
DEPRECATED_ACTION_CAN_DAG_READ = 'can_dag_read'
DEPRECATED_ACTION_CAN_DAG_EDIT = 'can_dag_edit'

Просмотреть файл

@ -58,6 +58,7 @@ def sync_appbuilder_roles(flask_app):
if conf.getboolean('webserver', 'UPDATE_FAB_PERMS'):
security_manager = flask_app.appbuilder.sm
security_manager.sync_roles()
security_manager.sync_resource_permissions()
def create_app(config=None, testing=False, app_name="Airflow"):

Просмотреть файл

@ -34,6 +34,7 @@ def action_logging(f: T) -> T:
"""
Decorator to log user actions
"""
@functools.wraps(f)
def wrapper(*args, **kwargs):
@ -50,11 +51,11 @@ def action_logging(f: T) -> T:
owner=user,
extra=str([(k, v) for k, v in request.values.items() if k not in fields_skip_logging]),
task_id=request.values.get('task_id'),
dag_id=request.values.get('dag_id'))
dag_id=request.values.get('dag_id'),
)
if 'execution_date' in request.values:
log.execution_date = pendulum.parse(
request.values.get('execution_date'), strict=False)
log.execution_date = pendulum.parse(request.values.get('execution_date'), strict=False)
session.add(log)
@ -67,6 +68,7 @@ def gzipped(f: T) -> T:
"""
Decorator to make a view compressed
"""
@functools.wraps(f)
def view_func(*args, **kwargs):
@after_this_request
@ -78,12 +80,14 @@ def gzipped(f: T) -> T:
response.direct_passthrough = False
if (response.status_code < 200 or response.status_code >= 300 or
'Content-Encoding' in response.headers):
if (
response.status_code < 200
or response.status_code >= 300
or 'Content-Encoding' in response.headers
):
return response
gzip_buffer = IO()
gzip_file = gzip.GzipFile(mode='wb',
fileobj=gzip_buffer)
gzip_file = gzip.GzipFile(mode='wb', fileobj=gzip_buffer)
gzip_file.write(response.data)
gzip_file.close()
@ -103,30 +107,22 @@ def has_dag_access(**dag_kwargs) -> Callable[[T], T]:
"""
Decorator to check whether the user has read / write permission on the dag.
"""
def decorator(f: T):
@functools.wraps(f)
def wrapper(self, *args, **kwargs):
has_access = self.appbuilder.sm.has_access
dag_id = request.values.get('dag_id')
# if it is false, we need to check whether user has write access on the dag
can_dag_edit = dag_kwargs.get('can_dag_edit', False)
needs_edit_access = dag_kwargs.get('can_dag_edit', False)
# 1. check whether the user has can_dag_edit permissions on all_dags
# 2. if 1 false, check whether the user
# has can_dag_edit permissions on the dag
# 3. if 2 false, check whether it is can_dag_read view,
# and whether user has the permissions
if (
has_access('can_dag_edit', 'all_dags') or
has_access('can_dag_edit', dag_id) or (not can_dag_edit and
(has_access('can_dag_read',
'all_dags') or
has_access('can_dag_read',
dag_id)))):
return f(self, *args, **kwargs)
if needs_edit_access:
if self.appbuilder.sm.can_edit_dag(dag_id):
return f(self, *args, **kwargs)
else:
flash("Access is Denied", "danger")
return redirect(url_for(self.appbuilder.sm.auth_view.
__class__.__name__ + ".login"))
if self.appbuilder.sm.can_read_dag(dag_id):
return f(self, *args, **kwargs)
flash("Access is Denied", "danger")
return redirect(url_for(self.appbuilder.sm.auth_view.__class__.__name__ + ".login"))
return cast(T, wrapper)
return decorator

Просмотреть файл

@ -29,6 +29,7 @@ from sqlalchemy.orm import joinedload
from airflow import models
from airflow.exceptions import AirflowException
from airflow.models import DagModel
from airflow.security import permissions
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import provide_session
from airflow.www.utils import CustomSQLAInterface
@ -41,13 +42,6 @@ EXISTING_ROLES = {
'Public',
}
CAN_CREATE = 'can_create'
CAN_READ = 'can_read'
CAN_DAG_READ = 'can_dag_read'
CAN_EDIT = 'can_edit'
CAN_DAG_EDIT = 'can_dag_edit'
CAN_DELETE = 'can_delete'
class AirflowSecurityManager(SecurityManager, LoggingMixin):
"""Custom security manager, which introduces an permission model adapted to Airflow"""
@ -160,19 +154,10 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
# [END security_op_perms]
# global view-menu for dag-level access
DAG_VMS = {'all_dags'}
DAG_VMS = {permissions.RESOURCE_DAGS}
WRITE_DAG_PERMS = {
'can_dag_edit',
'can_edit',
}
READ_DAG_PERMS = {
'can_dag_read',
'can_read',
}
DAG_PERMS = WRITE_DAG_PERMS | READ_DAG_PERMS
READ_DAG_PERMS = {permissions.ACTION_CAN_READ}
DAG_PERMS = {permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT}
###########################################################################
# DEFAULT ROLE CONFIGURATIONS
@ -280,11 +265,11 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
def get_readable_dags(self, user):
"""Gets the DAGs readable by authenticated user."""
return self.get_accessible_dags([CAN_READ, CAN_DAG_READ], user)
return self.get_accessible_dags([permissions.ACTION_CAN_READ], user)
def get_editable_dags(self, user):
"""Gets the DAGs editable by authenticated user."""
return self.get_accessible_dags([CAN_EDIT, CAN_DAG_EDIT], user)
return self.get_accessible_dags([permissions.ACTION_CAN_EDIT], user)
def get_readable_dag_ids(self, user) -> Set[str]:
"""Gets the DAG IDs readable by authenticated user."""
@ -296,7 +281,9 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
def get_accessible_dag_ids(self, user) -> Set[str]:
"""Gets the DAG IDs editable or readable by authenticated user."""
accessible_dags = self.get_accessible_dags([CAN_EDIT, CAN_DAG_EDIT, CAN_READ, CAN_DAG_READ], user)
accessible_dags = self.get_accessible_dags(
[permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ], user
)
return set(dag.dag_id for dag in accessible_dags)
@provide_session
@ -320,33 +307,83 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
for permission in role.permissions:
resource = permission.view_menu.name
action = permission.permission.name
if action in user_actions:
if action not in user_actions:
continue
if resource.startswith(permissions.RESOURCE_DAG_PREFIX):
resources.add(resource[len(permissions.RESOURCE_DAG_PREFIX) :])
else:
resources.add(resource)
if bool({'Dag', 'all_dags'}.intersection(resources)):
if permissions.RESOURCE_DAGS in resources:
return session.query(DagModel)
return session.query(DagModel).filter(DagModel.dag_id.in_(resources))
def has_access(self, permission, view_name, user=None) -> bool:
def can_read_dag(self, dag_id, user=None) -> bool:
"""Determines whether a user has DAG read access."""
if not user:
user = g.user
prefixed_dag_id = self.prefixed_dag_id(dag_id)
return self._has_view_access(
user, permissions.ACTION_CAN_READ, permissions.RESOURCE_DAGS
) or self._has_view_access(user, permissions.ACTION_CAN_READ, prefixed_dag_id)
def can_edit_dag(self, dag_id, user=None) -> bool:
"""Determines whether a user has DAG edit access."""
if not user:
user = g.user
prefixed_dag_id = self.prefixed_dag_id(dag_id)
return self._has_view_access(
user, permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAGS
) or self._has_view_access(user, permissions.ACTION_CAN_EDIT, prefixed_dag_id)
def prefixed_dag_id(self, dag_id):
"""Returns the permission name for a DAG id."""
if dag_id == permissions.RESOURCE_DAGS:
return dag_id
if dag_id.startswith(permissions.RESOURCE_DAG_PREFIX):
return dag_id
return f"{permissions.RESOURCE_DAG_PREFIX}{dag_id}"
def is_dag_resource(self, resource_name):
"""Determines if a permission belongs to a DAG or all DAGs."""
if resource_name == permissions.RESOURCE_DAGS:
return True
return resource_name.startswith(permissions.RESOURCE_DAG_PREFIX)
def has_access(self, permission, resource, user=None) -> bool:
"""
Verify whether a given user could perform certain permission
(e.g can_read, can_write) on the given dag_id.
(e.g can_read, can_write) on the given resource.
:param permission: permission on dag_id(e.g can_read, can_edit).
:param permission: permission on resource (e.g can_read, can_edit).
:type permission: str
:param view_name: name of view-menu(e.g dag id is a view-menu as well).
:type view_name: str
:param resource: name of view-menu or resource.
:type resource: str
:param user: user name
:type user: str
:return: a bool whether user could perform certain permission on the dag_id.
:return: a bool whether user could perform certain permission on the resource.
:rtype bool
"""
if not user:
user = g.user
# breakpoint()
if user.is_anonymous:
return self.is_item_public(permission, view_name)
return self._has_view_access(user, permission, view_name)
return self.is_item_public(permission, resource)
has_access = self._has_view_access(user, permission, resource)
# FAB built-in view access method. Won't work for AllDag access.
if self.is_dag_resource(resource):
if permission == permissions.ACTION_CAN_READ:
has_access |= self.can_read_dag(resource, user)
elif permission == permissions.ACTION_CAN_EDIT:
has_access |= self.can_edit_dag(resource, user)
return has_access
def _get_and_cache_perms(self):
"""
@ -377,13 +414,13 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
"""
Has all the dag access in any of the 3 cases:
1. Role needs to be in (Admin, Viewer, User, Op).
2. Has can_dag_read permission on all_dags view.
3. Has can_dag_edit permission on all_dags view.
2. Has can_read permission on dags view.
3. Has can_edit permission on dags view.
"""
return (
self._has_role(['Admin', 'Viewer', 'Op', 'User'])
or self._has_perm('can_dag_read', 'all_dags')
or self._has_perm('can_dag_edit', 'all_dags')
or self._has_perm(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAGS)
or self._has_perm(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAGS)
)
def clean_perms(self):
@ -467,10 +504,10 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
.all()
)
# create can_dag_edit and can_dag_read permissions for every dag(vm)
# create can_edit and can_read permissions for every dag(vm)
for dag in all_dags_models:
for perm in self.DAG_PERMS:
merge_pv(perm, dag.dag_id)
merge_pv(perm, self.prefixed_dag_id(dag.dag_id))
# for all the dag-level role, add the permission of viewer
# with the dag view to ab_permission_view
@ -481,7 +518,7 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
update_perm_views = []
# need to remove all_dag vm from all the existing view-menus
dag_vm = self.find_view_menu('all_dags')
dag_vm = self.find_view_menu(permissions.RESOURCE_DAGS)
ab_perm_view_role = sqla_models.assoc_permissionview_role
perm_view = self.permissionview_model
view_menu = self.viewmenu_model
@ -501,6 +538,7 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
for role in dag_role:
# pylint: disable=no-member
# Get all the perm-view of the role
existing_perm_view_by_user = self.get_session.query(ab_perm_view_role).filter(
ab_perm_view_role.columns.role_id == role.id
)
@ -520,18 +558,23 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
def update_admin_perm_view(self):
"""
Admin should has all the permission-views, except the dag views.
because Admin have already have all_dags permission.
because Admin have already have Dag permission.
Add the missing ones to the table for admin.
:return: None.
"""
all_dag_view = self.find_view_menu('all_dags')
dag_perm_ids = [self.find_permission('can_dag_edit').id, self.find_permission('can_dag_read').id]
all_dag_view = self.find_view_menu(permissions.RESOURCE_DAGS)
dag_pvs = (
self.get_session.query(sqla_models.ViewMenu)
.filter(sqla_models.ViewMenu.name.like(f"{permissions.RESOURCE_DAG_PREFIX}%"))
.all()
)
pv_ids = [pv.id for pv in dag_pvs]
pvms = (
self.get_session.query(sqla_models.PermissionView)
.filter(
~and_(
sqla_models.PermissionView.permission_id.in_(dag_perm_ids),
sqla_models.PermissionView.view_menu_id.in_(pv_ids),
sqla_models.PermissionView.view_menu_id != all_dag_view.id,
)
)
@ -553,7 +596,6 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
:return: None.
"""
self.log.debug('Start syncing user roles.')
# Create global all-dag VM
self.create_perm_vm_for_all_dag()
@ -564,19 +606,18 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
perms = config['perms']
self.init_role(role, vms, perms)
self.create_custom_dag_permission_view()
# init existing roles, the rest role could be created through UI.
self.update_admin_perm_view()
self.clean_perms()
def sync_resource_permissions(self, permissions=None):
def sync_resource_permissions(self, perms=None):
"""
Populates resource-based permissions.
"""
if not permissions:
if not perms:
return
for action, resource in permissions:
for action, resource in perms:
self.add_view_menu(resource)
self.add_permission_view_menu(action, resource)
@ -589,17 +630,18 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
:type dag_id: str
:param access_control: a dict where each key is a rolename and
each value is a set() of permission names (e.g.,
{'can_dag_read'}
{'can_read'}
:type access_control: dict
:return:
"""
prefixed_dag_id = self.prefixed_dag_id(dag_id)
for dag_perm in self.DAG_PERMS:
perm_on_dag = self.find_permission_view_menu(dag_perm, dag_id)
perm_on_dag = self.find_permission_view_menu(dag_perm, prefixed_dag_id)
if perm_on_dag is None:
self.add_permission_view_menu(dag_perm, dag_id)
self.add_permission_view_menu(dag_perm, prefixed_dag_id)
if access_control:
self._sync_dag_view_permissions(dag_id, access_control)
self._sync_dag_view_permissions(prefixed_dag_id, access_control)
def _sync_dag_view_permissions(self, dag_id, access_control):
"""Set the access policy on the given DAG's ViewModel.
@ -608,15 +650,16 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
:type dag_id: str
:param access_control: a dict where each key is a rolename and
each value is a set() of permission names (e.g.,
{'can_dag_read'}
{'can_read'}
:type access_control: dict
"""
prefixed_dag_id = self.prefixed_dag_id(dag_id)
def _get_or_create_dag_permission(perm_name):
dag_perm = self.find_permission_view_menu(perm_name, dag_id)
dag_perm = self.find_permission_view_menu(perm_name, prefixed_dag_id)
if not dag_perm:
self.log.info("Creating new permission '%s' on view '%s'", perm_name, dag_id)
dag_perm = self.add_permission_view_menu(perm_name, dag_id)
self.log.info("Creating new permission '%s' on view '%s'", perm_name, prefixed_dag_id)
dag_perm = self.add_permission_view_menu(perm_name, prefixed_dag_id)
return dag_perm
@ -628,11 +671,14 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
target_perms_for_role = access_control.get(role.name, {})
if perm.permission.name not in target_perms_for_role:
self.log.info(
"Revoking '%s' on DAG '%s' for role '%s'", perm.permission, dag_id, role.name
"Revoking '%s' on DAG '%s' for role '%s'",
perm.permission,
prefixed_dag_id,
role.name,
)
self.del_permission_role(role, perm)
dag_view = self.find_view_menu(dag_id)
dag_view = self.find_view_menu(prefixed_dag_id)
if dag_view:
_revoke_stale_permissions(dag_view)
@ -641,7 +687,7 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
if not role:
raise AirflowException(
"The access_control mapping for DAG '{}' includes a role "
"named '{}', but that role does not exist".format(dag_id, rolename)
"named '{}', but that role does not exist".format(prefixed_dag_id, rolename)
)
perms = set(perms)
@ -650,7 +696,7 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
raise AirflowException(
"The access_control map for DAG '{}' includes the following "
"invalid permissions: {}; The set of valid permissions "
"is: {}".format(dag_id, (perms - self.DAG_PERMS), self.DAG_PERMS)
"is: {}".format(prefixed_dag_id, (perms - self.DAG_PERMS), self.DAG_PERMS)
)
for perm_name in perms:

Просмотреть файл

@ -63,6 +63,7 @@ from airflow.models.baseoperator import BaseOperator
from airflow.models.dagcode import DagCode
from airflow.models.dagrun import DagRun, DagRunType
from airflow.models.taskinstance import TaskInstance
from airflow.security import permissions
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies_deps import RUNNING_DEPS, SCHEDULER_QUEUED_DEPS
from airflow.utils import timezone
@ -461,7 +462,7 @@ class Airflow(AirflowBaseView): # noqa: D101 pylint: disable=too-many-public-m
if arg_tags_filter:
dags_query = dags_query.filter(DagModel.tags.any(DagTag.name.in_(arg_tags_filter)))
if 'all_dags' not in filter_dag_ids:
if permissions.RESOURCE_DAGS not in filter_dag_ids:
dags_query = dags_query.filter(DagModel.dag_id.in_(filter_dag_ids))
# pylint: enable=no-member
@ -545,7 +546,7 @@ class Airflow(AirflowBaseView): # noqa: D101 pylint: disable=too-many-public-m
dr = models.DagRun
allowed_dag_ids = current_app.appbuilder.sm.get_accessible_dag_ids(g.user)
if 'all_dags' in allowed_dag_ids:
if permissions.RESOURCE_DAGS in allowed_dag_ids:
allowed_dag_ids = [dag_id for dag_id, in session.query(models.DagModel.dag_id)]
dag_state_stats = session.query(dr.dag_id, dr.state, sqla.func.count(dr.state))\
@ -594,7 +595,7 @@ class Airflow(AirflowBaseView): # noqa: D101 pylint: disable=too-many-public-m
if not allowed_dag_ids:
return wwwutils.json_response({})
if 'all_dags' in allowed_dag_ids:
if permissions.RESOURCE_DAGS in allowed_dag_ids:
allowed_dag_ids = {dag_id for dag_id, in session.query(models.DagModel.dag_id)}
# Filter by post parameters
@ -705,7 +706,7 @@ class Airflow(AirflowBaseView): # noqa: D101 pylint: disable=too-many-public-m
"""Last DAG runs"""
allowed_dag_ids = current_app.appbuilder.sm.get_accessible_dag_ids(g.user)
if 'all_dags' in allowed_dag_ids:
if permissions.RESOURCE_DAGS in allowed_dag_ids:
allowed_dag_ids = [dag_id for dag_id, in session.query(models.DagModel.dag_id)]
# Filter by post parameters
@ -1389,7 +1390,7 @@ class Airflow(AirflowBaseView): # noqa: D101 pylint: disable=too-many-public-m
"""Mark Dag Blocked."""
allowed_dag_ids = current_app.appbuilder.sm.get_accessible_dag_ids(g.user)
if 'all_dags' in allowed_dag_ids:
if permissions.RESOURCE_DAGS in allowed_dag_ids:
allowed_dag_ids = [dag_id for dag_id, in session.query(models.DagModel.dag_id)]
# Filter by post parameters
@ -3142,7 +3143,7 @@ class DagModelView(AirflowModelView):
filter_dag_ids = current_app.appbuilder.sm.get_accessible_dag_ids(g.user)
# pylint: disable=no-member
if not bool({'all_dags', 'Dag'}.intersection(filter_dag_ids)):
if permissions.RESOURCE_DAGS not in filter_dag_ids:
dag_ids_query = dag_ids_query.filter(DagModel.dag_id.in_(filter_dag_ids))
owners_query = owners_query.filter(DagModel.dag_id.in_(filter_dag_ids))
# pylint: enable=no-member

Просмотреть файл

@ -25,6 +25,7 @@ from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP
from airflow.models import DagBag, DagModel
from airflow.models.serialized_dag import SerializedDagModel
from airflow.operators.dummy_operator import DummyOperator
from airflow.security import permissions
from airflow.utils.session import provide_session
from airflow.www import app
from tests.test_utils.api_connexion_utils import assert_401, create_user, delete_user
@ -52,10 +53,10 @@ class TestDagEndpoint(unittest.TestCase):
username="test",
role_name="Test",
permissions=[
("can_create", "Dag"),
("can_read", "Dag"),
("can_edit", "Dag"),
("can_delete", "Dag"),
("can_create", permissions.RESOURCE_DAGS),
("can_read", permissions.RESOURCE_DAGS),
("can_edit", permissions.RESOURCE_DAGS),
("can_delete", permissions.RESOURCE_DAGS),
],
)
create_user(cls.app, username="test_no_permissions", role_name="TestNoPermissions") # type: ignore

Просмотреть файл

@ -21,6 +21,7 @@ from parameterized import parameterized
from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP
from airflow.models import DagModel, DagRun
from airflow.security import permissions
from airflow.utils import timezone
from airflow.utils.session import create_session, provide_session
from airflow.utils.types import DagRunType
@ -42,7 +43,7 @@ class TestDagRunEndpoint(unittest.TestCase):
username="test",
role_name="Test",
permissions=[
("can_read", "Dag"),
("can_read", permissions.RESOURCE_DAGS),
("can_create", "DagRun"),
("can_read", "DagRun"),
("can_edit", "DagRun"),

Просмотреть файл

@ -27,6 +27,7 @@ from airflow.models.dagrun import DagRun
from airflow.models.xcom import XCom
from airflow.plugins_manager import AirflowPlugin
from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator
from airflow.security import permissions
from airflow.utils.dates import days_ago
from airflow.utils.session import provide_session
from airflow.utils.timezone import datetime
@ -51,7 +52,7 @@ class TestGetExtraLinks(unittest.TestCase):
username="test",
role_name="Test",
permissions=[
('can_read', 'Dag'),
('can_read', permissions.RESOURCE_DAGS),
('can_read', 'DagRun'),
('can_read', 'Task'),
('can_read', 'TaskInstance'),

Просмотреть файл

@ -31,6 +31,7 @@ from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP
from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
from airflow.models import DagRun, TaskInstance
from airflow.operators.dummy_operator import DummyOperator
from airflow.security import permissions
from airflow.utils import timezone
from airflow.utils.session import create_session, provide_session
from airflow.utils.types import DagRunType
@ -56,7 +57,11 @@ class TestGetLog(unittest.TestCase):
cls.app,
username="test",
role_name="Test",
permissions=[('can_read', 'Dag'), ('can_read', 'DagRun'), ('can_read', 'Task')],
permissions=[
('can_read', permissions.RESOURCE_DAGS),
('can_read', 'DagRun'),
('can_read', 'Task'),
],
)
create_user(cls.app, username="test_no_permissions", role_name="TestNoPermissions")

Просмотреть файл

@ -22,6 +22,7 @@ from airflow import DAG
from airflow.models import DagBag
from airflow.models.serialized_dag import SerializedDagModel
from airflow.operators.dummy_operator import DummyOperator
from airflow.security import permissions
from airflow.www import app
from tests.test_utils.api_connexion_utils import assert_401, create_user, delete_user
from tests.test_utils.config import conf_vars
@ -47,7 +48,11 @@ class TestTaskEndpoint(unittest.TestCase):
cls.app, # type: ignore
username="test",
role_name="Test",
permissions=[('can_read', 'Dag'), ('can_read', 'DagRun'), ('can_read', 'Task')],
permissions=[
('can_read', permissions.RESOURCE_DAGS),
('can_read', 'DagRun'),
('can_read', 'Task'),
],
)
create_user(cls.app, username="test_no_permissions", role_name="TestNoPermissions") # type: ignore

Просмотреть файл

@ -21,6 +21,7 @@ from unittest import mock
from parameterized import parameterized
from airflow.models import DagBag, DagRun, TaskInstance, SlaMiss
from airflow.security import permissions
from airflow.utils.types import DagRunType
from airflow.utils.session import provide_session
from airflow.utils.state import State
@ -46,7 +47,7 @@ class TestTaskInstanceEndpoint(unittest.TestCase):
username="test",
role_name="Test",
permissions=[
('can_read', 'Dag'),
('can_read', permissions.RESOURCE_DAGS),
('can_read', 'DagRun'),
('can_read', 'Task'),
('can_edit', 'Task'),

Просмотреть файл

@ -19,6 +19,7 @@ import unittest
from parameterized import parameterized
from airflow.models import DagModel, DagRun as DR, XCom
from airflow.security import permissions
from airflow.utils.dates import parse_execution_date
from airflow.utils.session import provide_session
from airflow.utils.types import DagRunType
@ -40,7 +41,7 @@ class TestXComEndpoint(unittest.TestCase):
username="test",
role_name="Test",
permissions=[
("can_read", "Dag"),
("can_read", permissions.RESOURCE_DAGS),
("can_read", "DagRun"),
("can_read", "Task"),
("can_read", "XCom"),

Просмотреть файл

@ -38,7 +38,7 @@ class TestCliSyncPerm(unittest.TestCase):
self.expect_dagbag_contains([
DAG('has_access_control',
access_control={
'Public': {'can_dag_read'}
'Public': {'can_read'}
}),
DAG('no_access_control')
], dagbag_mock)
@ -56,7 +56,7 @@ class TestCliSyncPerm(unittest.TestCase):
self.assertEqual(2, len(appbuilder.sm.sync_perm_for_dag.mock_calls))
appbuilder.sm.sync_perm_for_dag.assert_any_call(
'has_access_control',
{'Public': {'can_dag_read'}}
{'Public': {'can_read'}}
)
appbuilder.sm.sync_perm_for_dag.assert_any_call(
'no_access_control',

Просмотреть файл

@ -31,6 +31,7 @@ from unittest import mock
from unittest.mock import patch
import pendulum
import pytest
from dateutil.relativedelta import relativedelta
from freezegun import freeze_time
from parameterized import parameterized
@ -1653,6 +1654,18 @@ class TestDag(unittest.TestCase):
next_subdag_date = subdag.next_dagrun_after_date(None)
assert next_subdag_date is None, "SubDags should never have DagRuns created by the scheduler"
def test_replace_outdated_access_control_actions(self):
outdated_permissions = {'role1': {'can_read', 'can_edit'}, 'role2': {'can_dag_read', 'can_dag_edit'}}
updated_permissions = {'role1': {'can_read', 'can_edit'}, 'role2': {'can_read', 'can_edit'}}
with pytest.warns(DeprecationWarning):
dag = DAG(dag_id='dag_with_outdated_perms', access_control=outdated_permissions)
self.assertEqual(dag.access_control, updated_permissions)
with pytest.warns(DeprecationWarning):
dag.access_control = outdated_permissions
self.assertEqual(dag.access_control, updated_permissions)
class TestDagModel:

Просмотреть файл

@ -132,8 +132,8 @@ serialized_simple_dag_ground_truth = {
"test_role": {
"__type": "set",
"__var": [
"can_dag_read",
"can_dag_edit"
"can_read",
"can_edit"
]
}
}
@ -164,7 +164,7 @@ def make_simple_dag():
start_date=datetime(2019, 8, 1),
is_paused_upon_creation=False,
access_control={
"test_role": {"can_dag_read", "can_dag_edit"}
"test_role": {"can_read", "can_edit"}
}
) as dag:
CustomOperator(task_id='custom_task')

Просмотреть файл

@ -0,0 +1,80 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP
from airflow.www import security
def create_user(app, username, role_name, permissions=None):
appbuilder = app.appbuilder
# Removes user and role so each test has isolated test data.
delete_user(app, username)
delete_role(app, role_name)
role = create_role(app, role_name, permissions)
return appbuilder.sm.add_user(
username=username,
first_name=username,
last_name=username,
email=f"{username}@fab.org",
role=role,
password=username,
)
def create_role(app, name, permissions=None):
appbuilder = app.appbuilder
role = appbuilder.sm.find_role(name)
if not role:
role = appbuilder.sm.add_role(name)
if not permissions:
permissions = []
for permission in permissions:
perm_object = appbuilder.sm.find_permission_view_menu(*permission)
appbuilder.sm.add_permission_role(role, perm_object)
return role
def delete_role(app, name):
if app.appbuilder.sm.find_role(name):
app.appbuilder.sm.delete_role(name)
def delete_roles(app):
for role in app.appbuilder.sm.get_all_roles():
if role.name not in security.EXISTING_ROLES:
app.appbuilder.sm.delete_role(role.name)
def delete_user(app, username):
appbuilder = app.appbuilder
for user in appbuilder.sm.get_all_users():
if user.username == username:
for role in user.roles:
delete_role(app, role.name)
appbuilder.sm.del_register_user(user)
break
def assert_401(response):
assert response.status_code == 401
assert response.json == {
'detail': None,
'status': 401,
'title': 'Unauthorized',
'type': EXCEPTIONS_LINK_MAP[401],
}

Просмотреть файл

@ -28,13 +28,15 @@ from sqlalchemy import Column, Date, Float, Integer, String
from airflow import settings
from airflow.exceptions import AirflowException
from airflow.models import DagModel
from airflow.security import permissions
from airflow.www import app as application
from airflow.www.utils import CustomSQLAInterface
from tests.test_utils.db import clear_db_runs
from tests.test_utils import fab_utils
from tests.test_utils.db import clear_db_dags, clear_db_runs
from tests.test_utils.mock_security_manager import MockSecurityManager
READ_WRITE = {'can_dag_read', 'can_dag_edit'}
READ_ONLY = {'can_dag_read'}
READ_WRITE = {'can_read', 'can_edit'}
READ_ONLY = {'can_read'}
logging.basicConfig(format='%(asctime)s:%(levelname)s:%(name)s:%(message)s')
logging.getLogger().setLevel(logging.DEBUG)
@ -76,18 +78,22 @@ class TestSecurity(unittest.TestCase):
cls.appbuilder = cls.app.appbuilder # pylint: disable=no-member
cls.app.config['WTF_CSRF_ENABLED'] = False
cls.security_manager = cls.appbuilder.sm
cls.role_admin = cls.security_manager.find_role('Admin')
cls.user = cls.appbuilder.sm.add_user(
'admin', 'admin', 'user', 'admin@fab.org', cls.role_admin, 'general'
)
cls.delete_roles()
def setUp(self):
clear_db_runs()
clear_db_dags()
self.db = SQLA(self.app)
self.appbuilder.add_view(SomeBaseView, "SomeBaseView", category="BaseViews")
self.appbuilder.add_view(SomeModelView, "SomeModelView", category="ModelViews")
log.debug("Complete setup!")
@classmethod
def delete_roles(cls):
for role_name in ['team-a', 'MyRole1', 'MyRole5', 'Test_Role', 'MyRole3', 'MyRole2']:
fab_utils.delete_role(cls.app, role_name)
def expect_user_is_in_role(self, user, rolename):
self.security_manager.init_role(rolename, [], [])
role = self.security_manager.find_role(rolename)
@ -97,26 +103,28 @@ class TestSecurity(unittest.TestCase):
user.roles = [role]
self.security_manager.update_user(user)
def assert_user_has_dag_perms(self, perms, dag_id):
def assert_user_has_dag_perms(self, perms, dag_id, user=None):
for perm in perms:
self.assertTrue(
self._has_dag_perm(perm, dag_id),
"User should have '{}' on DAG '{}'".format(perm, dag_id))
self._has_dag_perm(perm, dag_id, user),
"User should have '{}' on DAG '{}'".format(perm, dag_id),
)
def assert_user_does_not_have_dag_perms(self, dag_id, perms):
def assert_user_does_not_have_dag_perms(self, dag_id, perms, user=None):
for perm in perms:
self.assertFalse(
self._has_dag_perm(perm, dag_id),
"User should not have '{}' on DAG '{}'".format(perm, dag_id))
self._has_dag_perm(perm, dag_id, user),
"User should not have '{}' on DAG '{}'".format(perm, dag_id),
)
def _has_dag_perm(self, perm, dag_id):
return self.security_manager.has_access(
perm,
dag_id,
self.user)
def _has_dag_perm(self, perm, dag_id, user):
# if not user:
# user = self.user
return self.security_manager.has_access(perm, self.security_manager.prefixed_dag_id(dag_id), user)
def tearDown(self):
clear_db_runs()
clear_db_dags()
self.appbuilder = None
self.app = None
self.db = None
@ -145,8 +153,7 @@ class TestSecurity(unittest.TestCase):
self.security_manager.init_role(role_name, [], [])
role = self.security_manager.find_role(role_name)
perm = self.security_manager.\
find_permission_view_menu('can_edit', 'RoleModelView')
perm = self.security_manager.find_permission_view_menu('can_edit', 'RoleModelView')
self.security_manager.add_permission_role(role, perm)
role_perms_len = len(role.permissions)
@ -165,43 +172,45 @@ class TestSecurity(unittest.TestCase):
@mock.patch('airflow.www.security.AirflowSecurityManager.get_user_roles')
def test_get_all_permissions_views(self, mock_get_user_roles):
role_name = 'MyRole5'
role_perms = ['can_some_action']
role_vms = ['SomeBaseView']
self.security_manager.init_role(role_name, role_vms, role_perms)
role = self.security_manager.find_role(role_name)
role_perm = 'can_some_action'
role_vm = 'SomeBaseView'
username = 'get_all_permissions_views'
mock_get_user_roles.return_value = [role]
self.assertEqual(self.security_manager
.get_all_permissions_views(),
{('can_some_action', 'SomeBaseView')})
with self.app.app_context():
user = fab_utils.create_user(self.app, username, role_name, permissions=[(role_perm, role_vm),],)
role = user.roles[0]
mock_get_user_roles.return_value = [role]
mock_get_user_roles.return_value = []
self.assertEqual(len(self.security_manager
.get_all_permissions_views()), 0)
self.assertEqual(self.security_manager.get_all_permissions_views(), {(role_perm, role_vm)})
mock_get_user_roles.return_value = []
self.assertEqual(len(self.security_manager.get_all_permissions_views()), 0)
def test_get_accessible_dag_ids(self):
role_name = 'MyRole1'
permission_action = ['can_dag_read']
permission_action = ['can_read']
dag_id = 'dag_id'
username = "Mr. User"
self.security_manager.init_role(role_name, [], [])
username = "ElUser"
user = fab_utils.create_user(
self.app,
username,
role_name,
permissions=[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAGS),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAGS),
],
)
dag_model = DagModel(dag_id=dag_id, fileloc="/tmp/dag_.py", schedule_interval="2 2 * * *")
self.session.add(dag_model)
self.session.commit()
self.security_manager.sync_perm_for_dag( # type: ignore # pylint: disable=no-member
dag_id, access_control={role_name: permission_action}
)
role = self.security_manager.find_role(role_name)
user = self.security_manager.add_user(
username=username,
first_name=username,
last_name=username,
email=f"{username}@fab.org",
role=role,
password=username,
)
dag_model = DagModel(dag_id="dag_id", fileloc="/tmp/dag_.py", schedule_interval="2 2 * * *")
self.session.add(dag_model)
self.session.commit()
self.assertEqual(self.security_manager
.get_accessible_dag_ids(user), {'dag_id'})
self.assertEqual(self.security_manager.get_accessible_dag_ids(user), {'dag_id'})
@mock.patch('airflow.www.security.AirflowSecurityManager._has_view_access')
def test_has_access(self, mock_has_view_access):
@ -212,10 +221,14 @@ class TestSecurity(unittest.TestCase):
def test_sync_perm_for_dag_creates_permissions_on_view_menus(self):
test_dag_id = 'TEST_DAG'
prefixed_test_dag_id = f'DAG:{test_dag_id}'
self.security_manager.sync_perm_for_dag(test_dag_id, access_control=None)
for dag_perm in self.security_manager.DAG_PERMS:
self.assertIsNotNone(self.security_manager.
find_permission_view_menu(dag_perm, test_dag_id))
self.assertIsNotNone(
self.security_manager.find_permission_view_menu('can_read', prefixed_test_dag_id)
)
self.assertIsNotNone(
self.security_manager.find_permission_view_menu('can_edit', prefixed_test_dag_id)
)
@mock.patch('airflow.www.security.AirflowSecurityManager._has_perm')
@mock.patch('airflow.www.security.AirflowSecurityManager._has_role')
@ -234,74 +247,79 @@ class TestSecurity(unittest.TestCase):
with self.assertRaises(AirflowException) as context:
self.security_manager.sync_perm_for_dag(
dag_id='access-control-test',
access_control={
'this-role-does-not-exist': ['can_dag_edit', 'can_dag_read']
})
access_control={'this-role-does-not-exist': ['can_edit', 'can_read']},
)
self.assertIn("role does not exist", str(context.exception))
def test_all_dag_access_doesnt_give_non_dag_access(self):
username = 'dag_access_user'
role_name = 'dag_access_role'
with self.app.app_context():
user = fab_utils.create_user(
self.app,
username,
role_name,
permissions=[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAGS),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAGS),
],
)
self.assertTrue(
self.security_manager.has_access(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAGS, user)
)
self.assertFalse(self.security_manager.has_access(permissions.ACTION_CAN_READ, 'Task', user))
def test_access_control_with_invalid_permission(self):
invalid_permissions = [
'can_varimport', # a real permission, but not a member of DAG_PERMS
'can_eat_pudding', # clearly not a real permission
]
username = "Mrs. User"
user = self.security_manager.add_user(
username=username,
first_name=username,
last_name=username,
email=f"{username}@fab.org",
role=self.role_admin,
password=username,
)
username = "LaUser"
user = fab_utils.create_user(self.app, username=username, role_name='team-a',)
for permission in invalid_permissions:
self.expect_user_is_in_role(user, rolename='team-a')
with self.assertRaises(AirflowException) as context:
self.security_manager.sync_perm_for_dag(
'access_control_test',
access_control={
'team-a': {permission}
})
'access_control_test', access_control={'team-a': {permission}}
)
self.assertIn("invalid permissions", str(context.exception))
def test_access_control_is_set_on_init(self):
self.expect_user_is_in_role(self.user, rolename='team-a')
self.security_manager.sync_perm_for_dag(
'access_control_test',
access_control={
'team-a': ['can_dag_edit', 'can_dag_read']
})
self.assert_user_has_dag_perms(
perms=['can_dag_edit', 'can_dag_read'],
dag_id='access_control_test',
)
username = 'access_control_is_set_on_init'
role_name = 'team-a'
with self.app.app_context():
user = fab_utils.create_user(self.app, username, role_name, permissions=[],)
self.expect_user_is_in_role(user, rolename='team-a')
self.security_manager.sync_perm_for_dag(
'access_control_test', access_control={'team-a': ['can_edit', 'can_read']}
)
self.assert_user_has_dag_perms(
perms=['can_edit', 'can_read'], dag_id='access_control_test', user=user
)
self.expect_user_is_in_role(self.user, rolename='NOT-team-a')
self.assert_user_does_not_have_dag_perms(
perms=['can_dag_edit', 'can_dag_read'],
dag_id='access_control_test',
)
self.expect_user_is_in_role(user, rolename='NOT-team-a')
self.assert_user_does_not_have_dag_perms(
perms=['can_edit', 'can_read'], dag_id='access_control_test', user=user
)
def test_access_control_stale_perms_are_revoked(self):
self.expect_user_is_in_role(self.user, rolename='team-a')
self.security_manager.sync_perm_for_dag(
'access_control_test',
access_control={'team-a': READ_WRITE})
self.assert_user_has_dag_perms(
perms=READ_WRITE,
dag_id='access_control_test',
)
username = 'access_control_stale_perms_are_revoked'
role_name = 'team-a'
with self.app.app_context():
user = fab_utils.create_user(self.app, username, role_name, permissions=[],)
self.expect_user_is_in_role(user, rolename='team-a')
self.security_manager.sync_perm_for_dag(
'access_control_test', access_control={'team-a': READ_WRITE}
)
self.assert_user_has_dag_perms(perms=READ_WRITE, dag_id='access_control_test', user=user)
self.security_manager.sync_perm_for_dag(
'access_control_test',
access_control={'team-a': READ_ONLY})
self.assert_user_has_dag_perms(
perms=['can_dag_read'],
dag_id='access_control_test',
)
self.assert_user_does_not_have_dag_perms(
perms=['can_dag_edit'],
dag_id='access_control_test',
)
self.security_manager.sync_perm_for_dag(
'access_control_test', access_control={'team-a': READ_ONLY}
)
self.assert_user_has_dag_perms(perms=['can_read'], dag_id='access_control_test', user=user)
self.assert_user_does_not_have_dag_perms(
perms=['can_edit'], dag_id='access_control_test', user=user
)
def test_no_additional_dag_permission_views_created(self):
ab_perm_view_role = sqla_models.assoc_permissionview_role

Просмотреть файл

@ -52,6 +52,7 @@ from airflow.models.renderedtifields import RenderedTaskInstanceFields as RTIF
from airflow.models.serialized_dag import SerializedDagModel
from airflow.operators.bash import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.security import permissions
from airflow.ti_deps.dependencies_states import QUEUEABLE_STATES, RUNNABLE_STATES
from airflow.utils import dates, timezone
from airflow.utils.log.logging_mixin import ExternalLoggingMixin
@ -141,7 +142,6 @@ class TestBase(unittest.TestCase):
role=self.appbuilder.sm.find_role('Admin'),
password='test',
)
if username == 'test_user' and not self.appbuilder.sm.find_user(username='test_user'):
self.appbuilder.sm.add_user(
username='test_user',
@ -1649,6 +1649,10 @@ class TestDagACLView(TestBase):
super().setUpClass()
dagbag = models.DagBag(include_examples=True)
DAG.bulk_write_to_db(dagbag.dags.values())
for username in ['all_dag_user', 'dag_read_only', 'dag_faker', 'dag_tester']:
user = cls.appbuilder.sm.find_user(username=username)
if user:
cls.appbuilder.sm.del_register_user(user)
def prepare_dagruns(self):
dagbag = models.DagBag(include_examples=True)
@ -1729,21 +1733,28 @@ class TestDagACLView(TestBase):
self.logout()
self.login(username='test',
password='test')
perm_on_dag = self.appbuilder.sm.\
find_permission_view_menu('can_dag_edit', 'example_bash_operator')
dag_tester_role = self.appbuilder.sm.find_role('dag_acl_tester')
self.appbuilder.sm.add_permission_role(dag_tester_role, perm_on_dag)
edit_perm_on_dag = self.appbuilder.sm.\
find_permission_view_menu('can_edit', 'DAG:example_bash_operator')
self.appbuilder.sm.add_permission_role(dag_tester_role, edit_perm_on_dag)
read_perm_on_dag = self.appbuilder.sm.\
find_permission_view_menu('can_read', 'DAG:example_bash_operator')
self.appbuilder.sm.add_permission_role(dag_tester_role, read_perm_on_dag)
perm_on_all_dag = self.appbuilder.sm.\
find_permission_view_menu('can_dag_edit', 'all_dags')
all_dag_role = self.appbuilder.sm.find_role('all_dag_role')
self.appbuilder.sm.add_permission_role(all_dag_role, perm_on_all_dag)
edit_perm_on_all_dag = self.appbuilder.sm.\
find_permission_view_menu('can_edit', permissions.RESOURCE_DAGS)
self.appbuilder.sm.add_permission_role(all_dag_role, edit_perm_on_all_dag)
read_perm_on_all_dag = self.appbuilder.sm.\
find_permission_view_menu('can_read', permissions.RESOURCE_DAGS)
self.appbuilder.sm.add_permission_role(all_dag_role, read_perm_on_all_dag)
role_user = self.appbuilder.sm.find_role('User')
self.appbuilder.sm.add_permission_role(role_user, perm_on_all_dag)
self.appbuilder.sm.add_permission_role(role_user, read_perm_on_all_dag)
self.appbuilder.sm.add_permission_role(role_user, edit_perm_on_all_dag)
read_only_perm_on_dag = self.appbuilder.sm.\
find_permission_view_menu('can_dag_read', 'example_bash_operator')
find_permission_view_menu('can_read', 'DAG:example_bash_operator')
dag_read_only_role = self.appbuilder.sm.find_role('dag_acl_read_only')
self.appbuilder.sm.add_permission_role(dag_read_only_role, read_only_perm_on_dag)
@ -1751,19 +1762,17 @@ class TestDagACLView(TestBase):
self.logout()
self.login(username='test',
password='test')
test_view_menu = self.appbuilder.sm.find_view_menu('example_bash_operator')
test_view_menu = self.appbuilder.sm.find_view_menu('DAG:example_bash_operator')
perms_views = self.appbuilder.sm.find_permissions_view_menu(test_view_menu)
self.assertEqual(len(perms_views), 4)
self.assertEqual(len(perms_views), 2)
permissions = [str(perm) for perm in perms_views]
expected_permissions = [
'can read on example_bash_operator',
'can dag edit on example_bash_operator',
'can dag read on example_bash_operator',
'can edit on example_bash_operator',
perms = [str(perm) for perm in perms_views]
expected_perms = [
'can read on DAG:example_bash_operator',
'can edit on DAG:example_bash_operator',
]
for perm in expected_permissions:
self.assertIn(perm, permissions)
for perm in expected_perms:
self.assertIn(perm, perms)
def test_role_permission_associate(self):
self.logout()
@ -1771,8 +1780,8 @@ class TestDagACLView(TestBase):
password='test')
test_role = self.appbuilder.sm.find_role('dag_acl_tester')
perms = {str(perm) for perm in test_role.permissions}
self.assertIn('can dag edit on example_bash_operator', perms)
self.assertNotIn('can dag read on example_bash_operator', perms)
self.assertIn('can edit on DAG:example_bash_operator', perms)
self.assertIn('can read on DAG:example_bash_operator', perms)
def test_index_success(self):
self.logout()
@ -2175,7 +2184,7 @@ class TestDagACLView(TestBase):
self.check_content_not_in_response('example_bash_operator', resp)
def test_success_fail_for_read_only_role(self):
# success endpoint need can_dag_edit, which read only role can not access
# success endpoint need can_edit, which read only role can not access
self.logout()
self.login(username='dag_read_only',
password='dag_read_only')
@ -2193,7 +2202,7 @@ class TestDagACLView(TestBase):
self.check_content_not_in_response('Wait a minute', resp, resp_code=302)
def test_tree_success_for_read_only_role(self):
# tree view only allows can_dag_read, which read only role could access
# tree view only allows can_read, which read only role could access
self.logout()
self.login(username='dag_read_only',
password='dag_read_only')