Publish views in order by dependencies (#3411)
This commit is contained in:
Родитель
bb84cb0eb5
Коммит
ddbabeef05
|
@ -17,6 +17,7 @@ from ..cli.utils import (
|
|||
use_cloud_function_option,
|
||||
)
|
||||
from ..metadata.parse_metadata import METADATA_FILE, Metadata
|
||||
from ..util.string_dag import StringDag
|
||||
from ..view import View, broken_views
|
||||
from .dryrun import dryrun
|
||||
|
||||
|
@ -196,10 +197,23 @@ def publish(
|
|||
|
||||
views = [View.from_file(f) for f in view_files]
|
||||
views = [v for v in views if not user_facing_only or v.is_user_facing]
|
||||
views_by_id = {v.view_identifier: v for v in views}
|
||||
|
||||
dag = StringDag(
|
||||
dependencies={
|
||||
view.view_identifier: {
|
||||
ref for ref in view.table_references if ref in views_by_id
|
||||
}
|
||||
for view in views
|
||||
}
|
||||
)
|
||||
dag.validate()
|
||||
|
||||
with ThreadPool(parallelism) as p:
|
||||
publish_view = functools.partial(_publish_view, target_project, dry_run)
|
||||
result = p.map(publish_view, views, chunksize=1)
|
||||
publish_view = functools.partial(
|
||||
_publish_view, target_project, dry_run, views_by_id
|
||||
)
|
||||
result = p.map(publish_view, [dag for _ in views], chunksize=1)
|
||||
|
||||
if not all(result):
|
||||
sys.exit(1)
|
||||
|
@ -207,8 +221,10 @@ def publish(
|
|||
click.echo("All have been published.")
|
||||
|
||||
|
||||
def _publish_view(target_project, dry_run, view):
|
||||
return view.publish(target_project, dry_run)
|
||||
def _publish_view(target_project, dry_run, views_by_id, dag):
|
||||
with dag.get() as view_id:
|
||||
view = views_by_id[view_id]
|
||||
return view.publish(target_project, dry_run)
|
||||
|
||||
|
||||
@view.command(
|
||||
|
|
|
@ -0,0 +1,53 @@
|
|||
"""String Dag."""
|
||||
from contextlib import contextmanager
|
||||
from copy import deepcopy
|
||||
from queue import Empty, Queue
|
||||
from threading import Lock
|
||||
from typing import Dict, Set
|
||||
|
||||
|
||||
class StringDag:
|
||||
"""Abstraction for using a queue to iterate in parallel over a set of values.
|
||||
|
||||
All dependencies must be acked before a given value is added to the queue. Values in
|
||||
the queue may be processed in parallel in any order.
|
||||
"""
|
||||
|
||||
def __init__(self, dependencies: Dict[str, Set[str]]):
|
||||
"""Initialize."""
|
||||
self.dependencies = deepcopy(dependencies)
|
||||
self.lock: Lock = Lock()
|
||||
self.q: Queue = Queue()
|
||||
self.ready: Set[str] = set()
|
||||
# ack a nonexisting value to initialize queue
|
||||
self._ack(None)
|
||||
|
||||
def _ack(self, ack_value):
|
||||
with self.lock:
|
||||
for value, deps in self.dependencies.items():
|
||||
deps.discard(ack_value)
|
||||
if not deps and value not in self.ready:
|
||||
self.ready.add(value)
|
||||
self.q.put(value)
|
||||
|
||||
@contextmanager
|
||||
def get(self):
|
||||
"""Context manager for getting a ready value from the queue."""
|
||||
value = self.q.get()
|
||||
try:
|
||||
yield value
|
||||
finally:
|
||||
self._ack(value)
|
||||
|
||||
def validate(self):
|
||||
"""Validate all dependencies eventually resolve."""
|
||||
_dag = StringDag(self.dependencies)
|
||||
for _ in range(len(self.dependencies)):
|
||||
try:
|
||||
value = _dag.q.get_nowait()
|
||||
_dag._ack(value)
|
||||
except Empty:
|
||||
raise ValueError(
|
||||
"DAG not valid, some values will never be ready:"
|
||||
f" {_dag.dependencies.keys() - _dag.ready}"
|
||||
)
|
|
@ -2,6 +2,7 @@
|
|||
|
||||
import string
|
||||
import time
|
||||
from functools import cached_property
|
||||
from pathlib import Path
|
||||
|
||||
import attr
|
||||
|
@ -148,11 +149,16 @@ class View:
|
|||
return True
|
||||
return self._valid_fully_qualified_references() and self._valid_view_naming()
|
||||
|
||||
def _valid_fully_qualified_references(self):
|
||||
"""Check that referenced tables and views are fully qualified."""
|
||||
@cached_property
|
||||
def table_references(self):
|
||||
"""List of table references in this view."""
|
||||
from bigquery_etl.dependency import extract_table_references
|
||||
|
||||
for table in extract_table_references(self.content):
|
||||
return extract_table_references(self.content)
|
||||
|
||||
def _valid_fully_qualified_references(self):
|
||||
"""Check that referenced tables and views are fully qualified."""
|
||||
for table in self.table_references:
|
||||
if len(table.split(".")) < 3:
|
||||
print(f"{self.path} ERROR\n{table} missing project_id qualifier")
|
||||
return False
|
||||
|
|
Загрузка…
Ссылка в новой задаче