This commit is contained in:
Maxime 2015-03-07 22:24:11 +00:00
Родитель 1097a03f6e
Коммит 2deae88b0b
12 изменённых файлов: 135 добавлений и 34 удалений

Просмотреть файл

@ -420,6 +420,9 @@ class BackfillJob(BaseJob):
del tasks_to_run[key]
executor.end()
session.close()
if failed:
raise Exception(
"Some tasks instances failed, here's the list:\n"+str(failed))
class LocalTaskJob(BaseJob):

Просмотреть файл

@ -93,8 +93,8 @@ class DagBag(object):
dttm = datetime.fromtimestamp(os.path.getmtime(filepath))
mod_name, file_ext = os.path.splitext(os.path.split(filepath)[-1])
# Skip file if no obvious references to airflow or DAG are found.
if safe_mode:
# Skip file if no obvious references to airflow or DAG are found.
f = open(filepath, 'r')
content = f.read()
f.close()
@ -111,19 +111,30 @@ class DagBag(object):
except:
logging.error("Failed to import: " + filepath)
self.file_last_changed[filepath] = dttm
# logging.error("Exception: " + str(sys.exc_info()))
# traceback.print_exc(file=sys.stdout)
return
for dag in m.__dict__.values():
if type(dag) == DAG:
dag.full_filepath = filepath
self.dags[dag.dag_id] = dag
dag.resolve_template_files()
logging.info('Loaded DAG {dag}'.format(**locals()))
self.bag_dag(dag)
self.file_last_changed[filepath] = dttm
def bag_dag(self, dag):
'''
Adds the DAG into the bag, recurses into sub dags.
'''
self.dags[dag.dag_id] = dag
dag.resolve_template_files()
for task in dag.tasks:
# Late import to prevent circular imports
from airflow.operators import SubDagOperator
if isinstance(task, SubDagOperator):
task.subdag.full_filepath = dag.full_filepath
task.subdag.parent_dag = dag
self.bag_dag(task.subdag)
logging.info('Loaded DAG {dag}'.format(**locals()))
def collect_dags(
self,
dag_folder=DAGS_FOLDER,
@ -819,6 +830,7 @@ class BaseOperator(Base):
template_ext = []
# Defines the color in the UI
ui_color = '#fff'
ui_fgcolor = '#000'
__tablename__ = "task"
@ -1170,6 +1182,7 @@ class DAG(Base):
self.user_defined_macros = user_defined_macros
self.default_args = default_args or {}
self.params = params
self.parent_dag = None # Gets set when DAGs are loaded
def __repr__(self):
return "<DAG: {self.dag_id}>".format(self=self)
@ -1419,11 +1432,11 @@ class DAG(Base):
def run(
self, start_date=None, end_date=None, mark_success=False,
include_adhoc=False, local=False):
include_adhoc=False, local=False, executor=None):
from airflow.jobs import BackfillJob
if local:
if not executor and local:
executor = LocalExecutor()
else:
elif not executor:
executor = DEFAULT_EXECUTOR
job = BackfillJob(
self,

Просмотреть файл

@ -16,3 +16,4 @@ from sensors import TimeSensor
from email_operator import EmailOperator
from dummy_operator import DummyOperator
from hive2samba_operator import Hive2SambaOperator
from subdag_operator import SubDagOperator

Просмотреть файл

@ -0,0 +1,41 @@
from airflow.models import BaseOperator
from airflow.utils import apply_defaults
class SubDagOperator(BaseOperator):
template_fields = tuple()
ui_color = '#555'
ui_fgcolor = '#fff'
__mapper_args__ = {
'polymorphic_identity': 'SubDagOperator'
}
@apply_defaults
def __init__(
self,
subdag,
*args, **kwargs):
"""
Yo dawg. This runs a sub dag. By convention, a sub dag's dag_id
should be prefixed by its parent and a dot. As in `parent.child`.
:param subdag: the DAG object to run as a subdag of the current DAG.
:type subdag: airflow.DAG
:param dag: the parent DAG
:type subdag: airflow.DAG
"""
if 'dag' not in kwargs:
raise Exception("Please pass in the `dag` param")
dag = kwargs['dag']
super(SubDagOperator, self).__init__(*args, **kwargs)
if dag.dag_id + '.' + kwargs['task_id'] != subdag.dag_id:
raise Exception(
"The subdag's dag_id should correspond to the parent's "
"'dag_id.task_id'")
self.subdag = subdag
def execute(self, context):
ed = context['execution_date']
self.subdag.run(start_date=ed, end_date=ed)

Просмотреть файл

@ -144,10 +144,10 @@ def validate_key(k, max_length=250):
elif len(k) > max_length:
raise Exception("The key has to be less than {0} characters".format(
max_length))
elif not re.match(r'^[A-Za-z0-9_-]+$', k):
elif not re.match(r'^[A-Za-z0-9_\-\.]+$', k):
raise Exception(
"The key ({k}) has to be made of alphanumeric characters, dashes "
"and underscores exclusively".format(**locals()))
"The key ({k}) has to be made of alphanumeric characters, dashes, "
"dots and underscores exclusively".format(**locals()))
else:
return True

Просмотреть файл

@ -114,7 +114,9 @@ class HomeView(AdminIndexView):
"""
@expose("/")
def index(self):
dags = sorted(dagbag.dags.values(), key=lambda dag: dag.dag_id)
dags = dagbag.dags.values()
dags = [dag for dag in dags if not dag.parent_dag]
dags = sorted(dags, key=lambda dag: dag.dag_id)
return self.render('airflow/dags.html', dags=dags)
admin = Admin(
@ -982,6 +984,7 @@ class Airflow(BaseView):
'id': task.task_id,
'value': {
'label': task.task_id,
'labelStyle': "fill:{0};".format(task.ui_fgcolor),
'style': "fill:{0};".format(task.ui_color),
}
})

Просмотреть файл

@ -39,7 +39,7 @@ div.linenodiv {
padding-right: 1px !important;
}
.linenos {
width: 43px;
width: 50px;
border: none;
}
div.linenodiv pre {

Просмотреть файл

@ -7,8 +7,18 @@
{% endblock %}
{% block body %}
<h3><span style='color:#AAA;'>DAG:</span> <span>{{ dag.dag_id }}</span></h3>
{% if dag.parent_dag %}
<h3><span style='color:#AAA;'>SUBDAG:</span> <span>{{ dag.dag_id }}</span></h3>
{% else %}
<h3><span style='color:#AAA;'>DAG:</span> <span>{{ dag.dag_id }}</span></h3>
{% endif %}
<ul class="nav nav-pills">
{% if dag.parent_dag %}
<li class="never_active"><a href="{{ url_for("airflow.graph", dag_id=dag.parent_dag.dag_id) }}">
<span class="glyphicon glyphicon-arrow-left" aria-hidden="true"></span>
Back to {{ dag.parent_dag.dag_id }}</a>
</li>
{% endif %}
<li><a href="{{ url_for("airflow.graph", dag_id=dag.dag_id) }}">
<span class="glyphicon glyphicon-certificate" aria-hidden="true"></span>
Graph View</a></li>
@ -55,6 +65,12 @@
</h4>
</div>
<div class="modal-body">
<div id="div_btn_subdag">
<button id="btn_subdag" type="button" class="btn btn-primary">
Zoom into Sub DAG
</button>
<hr/>
</div>
<button id="btn_task" type="button" class="btn btn-primary">
Task Details
</button>
@ -108,19 +124,27 @@
// Pills highlighting
$(document).ready(function () {
$('a[href*="' + this.location.pathname + '"]').parent().addClass('active');
$('.never_active').removeClass('active');
});
var dag_id = '{{ dag.dag_id }}';
var task_id = '';
var exection_date = '';
function call_modal(t, d) {
var subdag_id = '';
function call_modal(t, d, sd) {
task_id = t;
subdag_id = sd;
execution_date = d;
$('#task_id').html(t);
$('#execution_date').html(d);
$('#myModal').modal({});
// TRick to fix a problem with Bootstrap 2.3.1 that ships with flask-admin
$("#myModal").css("margin-top","0px")
if (subdag_id===undefined)
$("#div_btn_subdag").hide();
else {
$("#div_btn_subdag").show();
subdag_id = "{{ dag.dag_id }}."+t;
}
}
$("#btn_rendered").click(function(){
@ -131,6 +155,13 @@
window.location = url;
});
$("#btn_subdag").click(function(){
url = "{{ url_for('airflow.graph') }}" +
"?dag_id=" + subdag_id +
"&execution_date=" + execution_date;
window.location = url;
});
$("#btn_log").click(function(){
url = "{{ url_for('airflow.log') }}" +
"?task_id=" + task_id +

Просмотреть файл

@ -80,7 +80,6 @@
d3.json("{{ url_for('airflow.dag_stats') }}", function(error, json) {
for(var dag_id in json) {
states = json[dag_id];
console.log(states);
g = d3.select('svg#' + dag_id)
.attr('height', diameter + (stroke_width_hover * 2))
.attr('width', 4 * (diameter + stroke_width_hover))

Просмотреть файл

@ -72,7 +72,11 @@
});
d3.selectAll("g.node").on("click", function(d){
call_modal(d, execution_date);
task = tasks[d];
if (task.task_type == "SubDagOperator")
call_modal(d, execution_date, true);
else
call_modal(d, execution_date);
});
$("g.node").tooltip({

Просмотреть файл

@ -23,7 +23,6 @@
$('span.status_square').tooltip({html: true});
var data = {{ data|safe }};
console.log(data);
var barHeight = 20;
var axisHeight = 20;
var square_x = 300;
@ -39,6 +38,11 @@ var i = 0,
var tree = d3.layout.tree().nodeSize([0, 25]);
var nodes = tree.nodes(data);
var nodeobj = {};
for (i=0; i<nodes.length; i++) {
node = nodes[i];
nodeobj[node.name] = node;
}
var diagonal = d3.svg.diagonal()
.projection(function(d) { return [d.y, d.x]; });
@ -160,7 +164,10 @@ function update(source) {
.enter()
.append('rect')
.on("click", function(d){
call_modal(d.task_id, d.execution_date);
if(nodeobj[d.task_id].operator=='SubDagOperator')
call_modal(d.task_id, d.execution_date, true);
else
call_modal(d.task_id, d.execution_date);
})
.attr("class", function(d) {return "state " + d.state})
.attr("data-toggle", "tooltip")
@ -280,8 +287,6 @@ function toggles(clicked_d) {
}
// Toggle children on click.
function click(d) {
console.log(d);
if (d.children || d._children){
if (d.children) {
d._children = d.children;

Просмотреть файл

@ -11,22 +11,23 @@ BaseOperator documentation for more details.
.. automodule:: airflow.operators
:show-inheritance:
:members:
HiveOperator,
BashOperator,
DummyOperator,
EmailOperator,
ExternalTaskSensor,
HdfsSensor,
Hive2SambaOperator,
HiveOperator,
HivePartitionSensor,
MySqlOperator,
PostgresOperator,
BashOperator,
PythonOperator,
ExternalTaskSensor,
SqlSensor,
HivePartitionSensor,
HdfsSensor,
TimeSensor,
PrestoCheckOperator,
PrestoIntervalCheckOperator,
PrestoValueCheckOperator,
Hive2SambaOperator,
DummyOperator,
EmailOperator
PythonOperator,
SqlSensor,
SubDagOperator,
TimeSensor
Macros
---------