Add MesosExecutor for airflow
This commit is contained in:
Родитель
2b7ede4040
Коммит
5516f884b7
|
@ -202,6 +202,36 @@ scheduler_heartbeat_sec = 5
|
|||
# statsd_host = localhost
|
||||
# statsd_port = 8125
|
||||
# statsd_prefix = airflow
|
||||
|
||||
[mesos]
|
||||
# Mesos master address which MesosExecutor will connect to.
|
||||
master = localhost:5050
|
||||
|
||||
# The framework name which Airflow scheduler will register itself as on mesos
|
||||
framework_name = Airflow
|
||||
|
||||
# Number of cpu cores required for running one task instance using
|
||||
# 'airflow run <dag_id> <task_id> <execution_date> --local -p <pickle_id>'
|
||||
# command on a mesos slave
|
||||
task_cpu = 1
|
||||
|
||||
# Memory in MB required for running one task instance using
|
||||
# 'airflow run <dag_id> <task_id> <execution_date> --local -p <pickle_id>'
|
||||
# command on a mesos slave
|
||||
task_memory = 256
|
||||
|
||||
# Enable framework checkpointing for mesos
|
||||
# See http://mesos.apache.org/documentation/latest/slave-recovery/
|
||||
checkpoint = False
|
||||
|
||||
# Enable framework authentication for mesos
|
||||
# See http://mesos.apache.org/documentation/latest/configuration/
|
||||
authenticate = False
|
||||
|
||||
# Mesos credentials, if authentication is enabled
|
||||
# default_principal = admin
|
||||
# default_secret = admin
|
||||
|
||||
"""
|
||||
|
||||
TEST_CONFIG = """\
|
||||
|
|
|
@ -0,0 +1,224 @@
|
|||
import os
|
||||
import sys
|
||||
import time
|
||||
import logging
|
||||
from Queue import Queue
|
||||
|
||||
import mesos.interface
|
||||
from mesos.interface import mesos_pb2
|
||||
import mesos.native
|
||||
|
||||
from airflow.configuration import conf
|
||||
from airflow.executors.base_executor import BaseExecutor
|
||||
from airflow.utils import State
|
||||
from airflow.utils import AirflowException
|
||||
|
||||
# AirflowMesosScheduler, implements Mesos Scheduler interface
|
||||
# To schedule airflow jobs on mesos
|
||||
class AirflowMesosScheduler(mesos.interface.Scheduler):
|
||||
"""
|
||||
Airflow Mesos scheduler implements mesos scheduler interface
|
||||
to schedule airflow tasks on mesos.
|
||||
Basically, it schedules a command like
|
||||
'airflow run <dag_id> <task_instance_id> <start_date> --local -p=<pickle>'
|
||||
to run on a mesos slave.
|
||||
"""
|
||||
|
||||
def __init__(self,
|
||||
task_queue,
|
||||
result_queue,
|
||||
task_cpu=1,
|
||||
task_mem=256):
|
||||
self.task_queue = task_queue
|
||||
self.result_queue = result_queue
|
||||
self.task_cpu = task_cpu
|
||||
self.task_mem = task_mem
|
||||
self.task_counter = 0
|
||||
self.task_key_map = {}
|
||||
|
||||
def registered(self, driver, frameworkId, masterInfo):
|
||||
logging.info("AirflowScheduler registered to mesos with framework ID %s", frameworkId.value)
|
||||
|
||||
def reregistered(self, driver, masterInfo):
|
||||
logging.info("AirflowScheduler re-registered to mesos")
|
||||
|
||||
def disconnected(self, driver):
|
||||
logging.info("AirflowScheduler disconnected from mesos")
|
||||
|
||||
def offerRescinded(self, driver, offerId):
|
||||
logging.info("AirflowScheduler offer %s rescinded", str(offerId))
|
||||
|
||||
def frameworkMessage(self, driver, executorId, slaveId, message):
|
||||
logging.info("AirflowScheduler received framework message %s", message)
|
||||
|
||||
def executorLost(self, driver, executorId, slaveId, status):
|
||||
logging.warning("AirflowScheduler executor %s lost", str(executorId))
|
||||
|
||||
def slaveLost(self, driver, slaveId):
|
||||
logging.warning("AirflowScheduler slave %s lost", str(slaveId))
|
||||
|
||||
def error(self, driver, message):
|
||||
logging.error("AirflowScheduler driver aborted %s", message)
|
||||
raise AirflowException("AirflowScheduler driver aborted %s" % message)
|
||||
|
||||
def resourceOffers(self, driver, offers):
|
||||
for offer in offers:
|
||||
tasks = []
|
||||
offerCpus = 0
|
||||
offerMem = 0
|
||||
for resource in offer.resources:
|
||||
if resource.name == "cpus":
|
||||
offerCpus += resource.scalar.value
|
||||
elif resource.name == "mem":
|
||||
offerMem += resource.scalar.value
|
||||
|
||||
logging.info("Received offer %s with cpus: %s and mem: %s", offer.id.value, offerCpus, offerMem)
|
||||
|
||||
remainingCpus = offerCpus
|
||||
remainingMem = offerMem
|
||||
|
||||
while (not self.task_queue.empty()) and \
|
||||
remainingCpus >= self.task_cpu and \
|
||||
remainingMem >= self.task_mem:
|
||||
key, cmd = self.task_queue.get()
|
||||
tid = self.task_counter
|
||||
self.task_counter += 1
|
||||
self.task_key_map[str(tid)] = key
|
||||
|
||||
logging.info("Launching task %d using offer %s", tid, offer.id.value)
|
||||
|
||||
task = mesos_pb2.TaskInfo()
|
||||
task.task_id.value = str(tid)
|
||||
task.slave_id.value = offer.slave_id.value
|
||||
task.name = "AirflowTask %d" % tid
|
||||
|
||||
cpus = task.resources.add()
|
||||
cpus.name = "cpus"
|
||||
cpus.type = mesos_pb2.Value.SCALAR
|
||||
cpus.scalar.value = self.task_cpu
|
||||
|
||||
mem = task.resources.add()
|
||||
mem.name = "mem"
|
||||
mem.type = mesos_pb2.Value.SCALAR
|
||||
mem.scalar.value = self.task_mem
|
||||
|
||||
command = mesos_pb2.CommandInfo()
|
||||
command.shell = True
|
||||
command.value = cmd
|
||||
task.command.MergeFrom(command)
|
||||
|
||||
tasks.append(task)
|
||||
|
||||
remainingCpus -= self.task_cpu
|
||||
remainingMem -= self.task_mem
|
||||
|
||||
driver.launchTasks(offer.id, tasks)
|
||||
|
||||
def statusUpdate(self, driver, update):
|
||||
logging.info("Task %s is in state %s, data %s", \
|
||||
update.task_id.value, mesos_pb2.TaskState.Name(update.state), str(update.data))
|
||||
|
||||
key = self.task_key_map[update.task_id.value]
|
||||
|
||||
if update.state == mesos_pb2.TASK_FINISHED:
|
||||
self.result_queue.put((key, State.SUCCESS))
|
||||
self.task_queue.task_done()
|
||||
|
||||
if update.state == mesos_pb2.TASK_LOST or \
|
||||
update.state == mesos_pb2.TASK_KILLED or \
|
||||
update.state == mesos_pb2.TASK_FAILED:
|
||||
self.result_queue.put((key, State.FAILED))
|
||||
self.task_queue.task_done()
|
||||
|
||||
class MesosExecutor(BaseExecutor):
|
||||
|
||||
"""
|
||||
MesosExecutor allows distributing the execution of task
|
||||
instances to multiple mesos workers.
|
||||
|
||||
Apache Mesos is a distributed systems kernel which abstracts
|
||||
CPU, memory, storage, and other compute resources away from
|
||||
machines (physical or virtual), enabling fault-tolerant and
|
||||
elastic distributed systems to easily be built and run effectively.
|
||||
See http://mesos.apache.org/
|
||||
"""
|
||||
|
||||
def start(self):
|
||||
self.task_queue = Queue()
|
||||
self.result_queue = Queue()
|
||||
framework = mesos_pb2.FrameworkInfo()
|
||||
framework.user = ''
|
||||
|
||||
if not conf.get('mesos', 'MASTER'):
|
||||
logging.error("Expecting mesos master URL for mesos executor")
|
||||
raise AirflowException("mesos.master not provided for mesos executor")
|
||||
|
||||
master = conf.get('mesos', 'MASTER')
|
||||
|
||||
if not conf.get('mesos', 'FRAMEWORK_NAME'):
|
||||
framework.name = 'Airflow'
|
||||
else:
|
||||
framework.name = conf.get('mesos', 'FRAMEWORK_NAME')
|
||||
|
||||
if not conf.get('mesos', 'TASK_CPU'):
|
||||
task_cpu = 1
|
||||
else:
|
||||
task_cpu = conf.getint('mesos', 'TASK_CPU')
|
||||
|
||||
if not conf.get('mesos', 'TASK_MEMORY'):
|
||||
task_memory = 256
|
||||
else:
|
||||
task_memory = conf.getint('mesos', 'TASK_MEMORY')
|
||||
|
||||
if (conf.getboolean('mesos', 'CHECKPOINT')):
|
||||
framework.checkpoint = True
|
||||
else:
|
||||
framework.checkpoint = False
|
||||
|
||||
logging.info('MesosFramework master : %s, name : %s, cpu : %s, mem : %s, checkpoint : %s',
|
||||
master, framework.name, str(task_cpu), str(task_memory), str(framework.checkpoint))
|
||||
|
||||
implicit_acknowledgements = 1
|
||||
|
||||
if (conf.getboolean('mesos', 'AUTHENTICATE')):
|
||||
if not conf.get('mesos', 'DEFAULT_PRINCIPAL'):
|
||||
logging.error("Expecting authentication principal in the environment")
|
||||
raise AirflowException("mesos.default_principal not provided in authenticated mode")
|
||||
if not conf.get('mesos', 'DEFAULT_SECRET'):
|
||||
logging.error("Expecting authentication secret in the environment")
|
||||
raise AirflowException("mesos.default_secret not provided in authenticated mode")
|
||||
|
||||
credential = mesos_pb2.Credential()
|
||||
credential.principal = conf.get('mesos', 'DEFAULT_PRINCIPAL')
|
||||
credential.secret = conf.get('mesos', 'DEFAULT_SECRET')
|
||||
|
||||
framework.principal = credential.principal
|
||||
|
||||
driver = mesos.native.MesosSchedulerDriver(
|
||||
AirflowMesosScheduler(self.task_queue, self.result_queue, task_cpu, task_memory),
|
||||
framework,
|
||||
master,
|
||||
implicit_acknowledgements,
|
||||
credential)
|
||||
else:
|
||||
framework.principal = 'Airflow'
|
||||
driver = mesos.native.MesosSchedulerDriver(
|
||||
AirflowMesosScheduler(self.task_queue, self.result_queue, task_cpu, task_memory),
|
||||
framework,
|
||||
master,
|
||||
implicit_acknowledgements)
|
||||
|
||||
self.mesos_driver = driver
|
||||
self.mesos_driver.start()
|
||||
|
||||
def execute_async(self, key, command, queue=None):
|
||||
self.task_queue.put((key, command))
|
||||
|
||||
def sync(self):
|
||||
while not self.result_queue.empty():
|
||||
results = self.result_queue.get()
|
||||
self.change_state(*results)
|
||||
|
||||
def end(self):
|
||||
self.task_queue.join()
|
||||
self.mesos_driver.stop()
|
|
@ -5,6 +5,7 @@ from airflow.executors.base_executor import BaseExecutor
|
|||
from airflow.executors.local_executor import LocalExecutor
|
||||
from airflow.executors.celery_executor import CeleryExecutor
|
||||
from airflow.executors.sequential_executor import SequentialExecutor
|
||||
from airflow.contrib.executors.mesos_executor import MesosExecutor
|
||||
from airflow.utils import AirflowException
|
||||
|
||||
_EXECUTOR = conf.get('core', 'EXECUTOR')
|
||||
|
@ -15,6 +16,8 @@ elif _EXECUTOR == 'CeleryExecutor':
|
|||
DEFAULT_EXECUTOR = CeleryExecutor()
|
||||
elif _EXECUTOR == 'SequentialExecutor':
|
||||
DEFAULT_EXECUTOR = SequentialExecutor()
|
||||
elif _EXECUTOR == 'MesosExecutor':
|
||||
DEFAULT_EXECUTOR = MesosExecutor()
|
||||
else:
|
||||
# Loading plugins
|
||||
from airflow.plugins_manager import executors as _executors
|
||||
|
|
|
@ -159,4 +159,4 @@ Executors are the mechanism by which task instances get run.
|
|||
|
||||
.. automodule:: airflow.executors
|
||||
:show-inheritance:
|
||||
:members: LocalExecutor, CeleryExecutor, SequentialExecutor
|
||||
:members: LocalExecutor, CeleryExecutor, SequentialExecutor, MesosExecutor
|
||||
|
|
|
@ -123,8 +123,8 @@ Airflow with the value in a URI format to use the connection properly. Please
|
|||
see the :doc:`concepts` documentation for more information on environment
|
||||
variables and connections.
|
||||
|
||||
Scaling Out
|
||||
'''''''''''
|
||||
Scaling Out with Celery
|
||||
'''''''''''''''''''''''
|
||||
CeleryExecutor is the way you can scale out the number of workers. For this
|
||||
to work, you need to setup a Celery backend (**RabbitMQ**, **Redis**, ...) and
|
||||
change your ``airflow.cfg`` to point the executor parameter to
|
||||
|
@ -147,6 +147,43 @@ Note that you can also run "Celery Flower", a web UI built on top of Celery,
|
|||
to monitor your workers.
|
||||
|
||||
|
||||
Scaling Out on Mesos (community contributed)
|
||||
''''''''''''''''''''''''''''''''''''''''''''
|
||||
MesosExecutor allows you to schedule airflow tasks on a Mesos cluster.
|
||||
For this to work, you need a running mesos cluster and perform following
|
||||
steps -
|
||||
|
||||
1. Install airflow on a machine where webserver and scheduler will run,
|
||||
let's refer this as Airflow server.
|
||||
2. On Airflow server, install mesos python eggs from `mesos downloads <http://open.mesosphere.com/downloads/mesos/>`_.
|
||||
3. On Airflow server, use a database which can be accessed from mesos
|
||||
slave machines, for example mysql, and configure in ``airflow.cfg``.
|
||||
4. Change your ``airflow.cfg`` to point executor parameter to
|
||||
MesosExecutor and provide related Mesos settings.
|
||||
5. On all mesos slaves, install airflow. Copy the ``airflow.cfg`` from
|
||||
Airflow server (so that it uses same sql alchemy connection).
|
||||
6. On all mesos slaves, run
|
||||
.. code-block:: bash
|
||||
|
||||
airflow serve_logs
|
||||
|
||||
for serving logs.
|
||||
|
||||
7. On Airflow server, run
|
||||
.. code-block:: bash
|
||||
|
||||
airflow scheduler -p
|
||||
|
||||
to start processing DAGs and scheduling them on mesos.
|
||||
We need -p parameter to pickle the DAGs.
|
||||
|
||||
You can now see the airflow framework and corresponding tasks in mesos UI.
|
||||
The logs for airflow tasks can be seen in airflow UI as usual.
|
||||
|
||||
For more information about mesos, refer `mesos documentation <http://mesos.apache.org/documentation/latest/>`_.
|
||||
For any queries/bugs on MesosExecutor, please contact `kapil-malik <https://github.com/kapil-malik>`_.
|
||||
|
||||
|
||||
Web Authentication
|
||||
''''''''''''''''''
|
||||
|
||||
|
|
|
@ -14,7 +14,7 @@ has ended.
|
|||
|
||||
The scheduler starts an instance of the executor specified in the your
|
||||
``airflow.cfg``. If it happens to be the LocalExecutor, tasks will be
|
||||
executed as subprocesses; in the case of CeleryExecutor, tasks are
|
||||
executed as subprocesses; in the case of CeleryExecutor and MesosExecutor, tasks are
|
||||
executed remotely.
|
||||
|
||||
To start a scheduler, simply run the command:
|
||||
|
|
Загрузка…
Ссылка в новой задаче