Remote run won't pickle by default anymore

This commit is contained in:
Maxime 2015-05-21 05:19:20 +00:00
Родитель 1fb575a593
Коммит 015d977304
3 изменённых файлов: 39 добавлений и 26 удалений

Просмотреть файл

@ -12,7 +12,6 @@ import dateutil.parser
from datetime import datetime
import logging
import os
import signal
import subprocess
import sys
@ -74,12 +73,17 @@ def run(args):
args.execution_date = dateutil.parser.parse(args.execution_date)
iso = args.execution_date.isoformat()
filename = "{directory}/{iso}".format(**locals())
subdir = None
if args.subdir:
subdir = args.subdir.replace(
"DAGS_FOLDER", conf.get("core", "DAGS_FOLDER"))
subdir = os.path.expanduser(subdir)
logging.basicConfig(
filename=filename,
level=logging.INFO,
format=settings.LOG_FORMAT)
if not args.pickle:
dagbag = DagBag(args.subdir)
dagbag = DagBag(subdir)
if args.dag_id not in dagbag.dags:
msg = 'DAG [{0}] could not be found'.format(args.dag_id)
logging.error(msg)
@ -115,33 +119,36 @@ def run(args):
job_id=args.job_id,
)
else:
try:
# Running remotely, so pickling the DAG
session = settings.Session()
pickle = DagPickle(dag)
session.add(pickle)
session.commit()
pickle_id = pickle.id
print(
'Pickled dag {dag} '
'as pickle_id:{pickle_id}').format(**locals())
except Exception as e:
print('Could not pickle the DAG')
print(e)
pickle_id = None
pickle_id = None
if args.ship_dag:
try:
# Running remotely, so pickling the DAG
session = settings.Session()
pickle = DagPickle(dag)
session.add(pickle)
session.commit()
pickle_id = pickle.id
print(
'Pickled dag {dag} '
'as pickle_id:{pickle_id}').format(**locals())
except Exception as e:
print('Could not pickle the DAG')
print(e)
raise e
executor = DEFAULT_EXECUTOR
executor.start()
cmd = ti.command(
force=args.force,
local=True,
print("Sending to executor.")
executor.queue_task_instance(
ti,
mark_success=args.mark_success,
pickle_id=pickle_id,
ignore_dependencies=args.ignore_dependencies,
pickle_id=pickle_id)
print("Sending run command to executor:\n" + cmd)
executor.queue_task_instance(ti)
force=args.force)
executor.heartbeat()
executor.end()
def task_state(args):
"""
Returns the state of a TaskInstance at the command line.
@ -151,7 +158,6 @@ def task_state(args):
"""
log_to_stdout()
args.execution_date = dateutil.parser.parse(args.execution_date)
iso = args.execution_date.isoformat()
dagbag = DagBag(args.subdir)
if args.dag_id not in dagbag.dags:
raise Exception('dag_id could not be found')
@ -406,6 +412,10 @@ def get_parser():
"-i", "--ignore_dependencies",
help="Ignore upstream and depends_on_past dependencies",
action="store_true")
parser_run.add_argument(
"--ship_dag",
help="Pickles (serializes) the DAG and ships it to the worker",
action="store_true")
parser_run.add_argument(
"-p", "--pickle",
help="Serialized pickle object of the entire dag (used internally)")

Просмотреть файл

@ -35,10 +35,13 @@ class BaseExecutor(object):
self.queued_tasks[key] = (command, priority)
def queue_task_instance(
self, task_instance, mark_success=False, pickle_id=None):
self, task_instance, mark_success=False, pickle_id=None,
force=False, ignore_dependencies=False):
command = task_instance.command(
local=True,
mark_success=mark_success,
force=force,
ignore_dependencies=ignore_dependencies,
pickle_id=pickle_id)
self.queue_command(
task_instance.key,

Просмотреть файл

@ -415,7 +415,7 @@ class TaskInstance(Base):
raw = "--raw" if raw else ""
subdir = ""
if not pickle and self.task.dag and self.task.dag.full_filepath:
subdir = "-sd {0}".format(self.task.dag.full_filepath)
subdir = "-sd DAGS_FOLDER/{0}".format(self.task.dag.filepath)
return (
"airflow run "
"{self.dag_id} {self.task_id} {iso} "
@ -997,7 +997,7 @@ class BaseOperator(Base):
def priority_weight_total(self):
return sum([
t.priority_weight
for t in self.get_flat_relatives(upstream=False)
for t in self.get_flat_relatives(upstream=False)
]) + self.priority_weight
def __cmp__(self, other):