Merge pull request #4 from mistercrunch/config_parser

integrating config parser to read config values from a file
This commit is contained in:
Krishna Puttaswamy 2014-11-07 17:39:04 -08:00
Родитель cc94aa007d 3361696cff
Коммит fc8f28a261
16 изменённых файлов: 166 добавлений и 89 удалений

31
airflow/airflow.cfg Normal file
Просмотреть файл

@ -0,0 +1,31 @@
[core]
AIRFLOW_HOME: TO_REPLACE_FROM_OS_ENVIRON
BASE_LOG_FOLDER: %(AIRFLOW_HOME)s/logs
DAGS_FOLDER: %(AIRFLOW_HOME)s/dags
BASE_FOLDER: %(AIRFLOW_HOME)s/airflow
[server]
WEB_SERVER_HOST: 0.0.0.0
WEB_SERVER_PORT: 8080
[smpt]
SMTP_HOST: 'localhost'
SMTP_PORT: 25
SMTP_PASSWORD: None
SMTP_MAIL_FROM: 'airflow_alerts@mydomain.com'
[celery]
CELERY_APP_NAME: airflow.executors.celery_worker
CELERY_BROKER: amqp
CELERY_RESULTS_BACKEND: amqp://
[hooks]
HIVE_HOME_PY: '/usr/lib/hive/lib/py'
PRESTO_DEFAULT_DBID: presto_default
HIVE_DEFAULT_DBID: hive_default
[misc]
RUN_AS_MASTER: True
JOB_HEARTBEAT_SEC: 5
# Used for dag_id and task_id VARCHAR length
ID_LEN: 250

Просмотреть файл

@ -1,5 +1,6 @@
#!/usr/bin/env python
from airflow.configuration import getconf
from airflow import settings
from airflow.models import DagBag, TaskInstance, DagPickle, State
@ -18,7 +19,6 @@ from sqlalchemy import func
mark_success_help = "Mark jobs as succeeded without running them"
subdir_help = "File location or directory from which to look for the dag"
def backfill(args):
logging.basicConfig(level=logging.INFO)
dagbag = DagBag(args.subdir)
@ -44,7 +44,7 @@ def backfill(args):
def run(args):
# Setting up logging
directory = settings.BASE_LOG_FOLDER + \
directory = getconf().get('core', 'BASE_LOG_FOLDER') + \
"/{args.dag_id}/{args.task_id}".format(args=args)
if not os.path.exists(directory):
os.makedirs(directory)
@ -52,7 +52,8 @@ def run(args):
iso = args.execution_date.isoformat()
filename = "{directory}/{iso}".format(**locals())
logging.basicConfig(
filename=filename, level=logging.INFO, format=settings.LOG_FORMAT)
filename=filename, level=logging.INFO,
format=settings.LOG_FORMAT)
print("Logging into: " + filename)
if not args.pickle:
@ -106,10 +107,11 @@ def clear(args):
def webserver(args):
logging.basicConfig(level=logging.DEBUG, format=settings.LOG_FORMAT)
logging.basicConfig(level=logging.DEBUG,
format=settings.LOG_FORMAT)
print(settings.HEADER)
from www.app import app
print("Starting the web server on port {0}.".format(args.port))
print("Starting the web server on port {0} and host {1}.".format(args.port, args.hostname))
app.run(debug=True, port=args.port, host=args.hostname)
@ -251,7 +253,7 @@ if __name__ == '__main__':
action="store_true")
parser_backfill.add_argument(
"-sd", "--subdir", help=subdir_help,
default=settings.DAGS_FOLDER)
default=getconf().get('core', 'DAGS_FOLDER'))
parser_backfill.set_defaults(func=backfill)
ht = "Clear a set of task instance, as if they never ran"
@ -272,7 +274,7 @@ if __name__ == '__main__':
"-d", "--downstream", help=ht, action="store_true")
parser_clear.add_argument(
"-sd", "--subdir", help=subdir_help,
default=settings.DAGS_FOLDER)
default=getconf().get('core', 'DAGS_FOLDER'))
parser_clear.set_defaults(func=clear)
ht = "Run a single task instance"
@ -282,7 +284,8 @@ if __name__ == '__main__':
parser_run.add_argument(
"execution_date", help="The execution date to run")
parser_run.add_argument(
"-sd", "--subdir", help=subdir_help, default=settings.DAGS_FOLDER)
"-sd", "--subdir", help=subdir_help,
default=getconf().get('core', 'DAGS_FOLDER'))
parser_run.add_argument(
"-m", "--mark_success", help=mark_success_help, action="store_true")
parser_run.add_argument(
@ -298,18 +301,18 @@ if __name__ == '__main__':
help="Serialized pickle object of the entire dag (used internally)")
parser_run.set_defaults(func=run)
ht = "Start a Flux webserver instance"
ht = "Start a Airflow webserver instance"
parser_webserver = subparsers.add_parser('webserver', help=ht)
parser_webserver.add_argument(
"-p", "--port",
default=settings.WEB_SERVER_PORT,
default=getconf().get('server', 'WEB_SERVER_PORT'),
type=int,
help="Set the port on which to run the web server")
parser_webserver.set_defaults(func=webserver)
parser_webserver.add_argument(
"-hn", "--hostname",
default=settings.WEB_SERVER_HOST,
default=getconf().get('server', 'WEB_SERVER_HOST'),
help="Set the hostname on which to run the web server")
parser_webserver.set_defaults(func=webserver)
@ -318,7 +321,8 @@ if __name__ == '__main__':
parser_master.add_argument(
"-d", "--dag_id", help="The id of the dag to run")
parser_master.add_argument(
"-sd", "--subdir", help=subdir_help, default=settings.DAGS_FOLDER)
"-sd", "--subdir", help=subdir_help,
default=getconf().get('core', 'DAGS_FOLDER'))
parser_master.set_defaults(func=master)
ht = "Initialize and reset the metadata database"

57
airflow/configuration.py Normal file
Просмотреть файл

@ -0,0 +1,57 @@
import logging
import os
from ConfigParser import ConfigParser, NoOptionError, NoSectionError
class AirflowConfigParser(ConfigParser):
NO_DEFAULT = object()
_instance = None
_config_paths = ['airflow.cfg']
if 'AIRFLOW_CONFIG_PATH' in os.environ:
_config_paths.append(os.environ['AIRFLOW_CONFIG_PATH'])
logging.info("Config paths is " + str(_config_paths))
print("Config paths is " + str(_config_paths))
@classmethod
def add_config_paths(cls, path):
cls._config_paths.append(path)
cls.reload()
@classmethod
def instance(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = cls(*args, **kwargs)
cls._instance.reload()
return cls._instance
@classmethod
def reload(cls):
loaded_obj = cls.instance().read(cls._config_paths)
logging.info("the config object after loading is " + str(loaded_obj))
return loaded_obj
def get_with_default(self, method, section, option, default):
try:
return method(self, section, option)
except (NoOptionError, NoSectionError):
if default is AirflowConfigParser.NO_DEFAULT:
raise
return default
def get(self, section, option, default=NO_DEFAULT):
return self.get_with_default(ConfigParser.get, section, option, default)
def getint(self, section, option, default=NO_DEFAULT):
return self.get_with_default(ConfigParser.getint, section, option, default)
def getboolean(self, section, option, default=NO_DEFAULT):
return self.get_with_default(ConfigParser.getboolean, section, option, default)
def set(self, section, option, value):
if not ConfigParser.has_section(self, section):
ConfigParser.add_section(self, section)
return ConfigParser.set(self, section, option, value)
def getconf():
return AirflowConfigParser.instance()

Просмотреть файл

@ -2,7 +2,7 @@ import multiprocessing
import time
from airflow.executors.base_executor import BaseExecutor
from airflow import settings
from airflow.configuration import getconf
from airflow.utils import State
from celery_worker import execute_command
@ -52,7 +52,7 @@ class CelerySubmitter(multiprocessing.Process):
# Received poison pill, no more tasks to run
self.task_queue.task_done()
break
BASE_FOLDER = settings.BASE_FOLDER
BASE_FOLDER = getconf().get('core', 'BASE_FOLDER')
command = (
"exec bash -c '"
"cd $AIRFLOW_HOME;\n" +

Просмотреть файл

@ -1,18 +1,18 @@
import subprocess
import logging
from celery import Celery
from airflow import settings
from airflow.configuration import getconf
from celery import Celery
# to start the celery worker, run the command:
# "celery -A airflow.executors.celery_worker worker --loglevel=info"
# app = Celery('airflow.executors.celery_worker', backend='amqp', broker='amqp://')
app = Celery(
settings.CELERY_APP_NAME,
backend=settings.CELERY_BROKER,
broker=settings.CELERY_RESULTS_BACKEND)
app = Celery(
getconf().get('celery', 'CELERY_APP_NAME'),
backend=getconf().get('celery', 'CELERY_BROKER'),
broker=getconf().get('celery', 'CELERY_RESULTS_BACKEND'))
@app.task(name='airflow.executors.celery_worker.execute_command')
def execute_command(command):

Просмотреть файл

@ -3,7 +3,7 @@ import multiprocessing
import subprocess
import time
from airflow import settings
from airflow.configuration import getconf
from airflow.utils import State
from airflow.executors.base_executor import BaseExecutor
@ -24,7 +24,7 @@ class LocalWorker(multiprocessing.Process):
# Received poison pill, no more tasks to run
self.task_queue.task_done()
break
BASE_FOLDER = settings.BASE_FOLDER
BASE_FOLDER = getconf().get('core', 'BASE_FOLDER')
print command
command = (
"exec bash -c '"

Просмотреть файл

@ -2,10 +2,11 @@ import logging
import subprocess
import sys
from airflow.models import DatabaseConnection
from airflow.configuration import getconf
from airflow import settings
# Adding the Hive python libs to python path
sys.path.insert(0, settings.HIVE_HOME_PY)
sys.path.insert(0, getconf().get('hooks', 'HIVE_HOME_PY'))
from thrift.transport import TSocket
from thrift.transport import TTransport
@ -16,7 +17,8 @@ from airflow.hooks.base_hook import BaseHook
class HiveHook(BaseHook):
def __init__(self, hive_dbid=settings.HIVE_DEFAULT_DBID):
def __init__(self,
hive_dbid=getconf().get('hooks', 'HIVE_DEFAULT_DBID')):
session = settings.Session()
db = session.query(
DatabaseConnection).filter(

Просмотреть файл

@ -1,6 +1,7 @@
import subprocess
from airflow import settings
from airflow.configuration import getconf
from airflow.models import DatabaseConnection
from airflow.hooks.base_hook import BaseHook
from airflow.hooks.presto.presto_client import PrestoClient
@ -12,7 +13,7 @@ class PrestoHook(BaseHook):
"""
Interact with Presto!
"""
def __init__(self, presto_dbid=settings.PRESTO_DEFAULT_DBID):
def __init__(self, presto_dbid=getconf().get('hooks', 'PRESTO_DEFAULT_DBID')):
session = settings.Session()
db = session.query(
DatabaseConnection).filter(

Просмотреть файл

@ -1,8 +1,8 @@
from airflow import settings
from airflow.configuration import getconf
def max_partition(
table, schema="default", hive_dbid=settings.HIVE_DEFAULT_DBID):
table, schema="default",
hive_dbid=getconf().get('hooks', 'HIVE_DEFAULT_DBID')):
from airflow.hooks.hive_hook import HiveHook
hh = HiveHook(hive_dbid=hive_dbid)
return hh.max_partition(schema=schema, table=table)

Просмотреть файл

@ -5,7 +5,7 @@ import inspect
import jinja2
import logging
import os
import sys
import sys, traceback, traceback, traceback, traceback
import pickle
import re
from time import sleep
@ -18,14 +18,14 @@ from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.serializer import loads, dumps
from sqlalchemy.orm import relationship
from airflow.executors import DEFAULT_EXECUTOR
from airflow.configuration import getconf
from airflow import settings
from airflow import utils
from settings import ID_LEN
import socket
from utils import State
Base = declarative_base()
ID_LEN = getconf().getint('misc', 'ID_LEN')
class DagBag(object):
"""
@ -39,7 +39,7 @@ class DagBag(object):
"""
def __init__(
self,
dag_folder=settings.DAGS_FOLDER,
dag_folder=getconf().get('core', 'DAGS_FOLDER'),
executor=DEFAULT_EXECUTOR):
self.dags = {}
logging.info("Filling up the DagBag from " + dag_folder)
@ -60,17 +60,17 @@ class DagBag(object):
except:
logging.error("Failed to import: " + filepath)
logging.error("Exception: " + str(sys.exc_info()))
traceback.print_exc(file=sys.stdout)
else:
for dag in m.__dict__.values():
if type(dag) == DAG:
dag.full_filepath = filepath
#.replace(settings.AIRFLOW_HOME + '/', '')
if dag.dag_id in self.dags:
raise Exception(
'Two DAGs with the same dag_id. No good.')
self.dags[dag.dag_id] = dag
dag.dagbag = self
if settings.RUN_AS_MASTER:
if getconf().getboolean('misc', 'RUN_AS_MASTER'):
dag.db_merge()
def collect_dags(self, file_location):
@ -229,7 +229,7 @@ class TaskInstance(Base):
@property
def log_filepath(self):
iso = self.execution_date.isoformat()
return settings.BASE_LOG_FOLDER + \
return getconf().get('core', 'BASE_LOG_FOLDER') + \
"/{self.dag_id}/{self.task_id}/{iso}.log".format(**locals())
def current_state(self, main_session=None):
@ -538,12 +538,12 @@ class BaseJob(Base):
def is_alive(self):
return (
(datetime.now() - self.latest_heartbeat).seconds <
(settings.JOB_HEARTBEAT_SEC * 2.1)
(getconf().getint('misc', 'JOB_HEARTBEAT_SEC') * 2.1)
)
def heartbeat(self):
session = settings.Session()
sleep_for = settings.JOB_HEARTBEAT_SEC - (
sleep_for = getconf().getint('misc', 'JOB_HEARTBEAT_SEC') - (
datetime.now() - self.latest_heartbeat).total_seconds()
if sleep_for > 0:
sleep(sleep_for)
@ -946,7 +946,8 @@ class DAG(Base):
@property
def filepath(self):
return self.full_filepath.replace(settings.AIRFLOW_HOME + '/', '')
return self.full_filepath.replace(
getconf().get('core', 'AIRFLOW_HOME') + '/', '')
@property
def folder(self):

Просмотреть файл

@ -1,6 +1,6 @@
import logging
from airflow.models import BaseOperator
from airflow import settings
from airflow.configuration import getconf
from airflow.hooks import HiveHook
@ -21,7 +21,8 @@ class HiveOperator(BaseOperator):
template_ext = ('.hql', '.sql',)
def __init__(
self, hql, hive_dbid=settings.HIVE_DEFAULT_DBID,
self, hql,
hive_dbid=getconf().get('hooks', 'HIVE_DEFAULT_DBID'),
*args, **kwargs):
super(HiveOperator, self).__init__(*args, **kwargs)

Просмотреть файл

@ -3,6 +3,7 @@ import logging
from time import sleep
from airflow import settings
from airflow.configuration import getconf
from airflow.hooks import HiveHook
from airflow.hooks import MySqlHook
from airflow.models import BaseOperator
@ -118,7 +119,7 @@ class HivePartitionSensor(BaseSensorOperator):
def __init__(
self,
table, partition,
hive_dbid=settings.HIVE_DEFAULT_DBID,
hive_dbid=getconf().get('hooks', 'HIVE_DEFAULT_DBID'),
schema='default',
*args, **kwargs):
super(HivePartitionSensor, self).__init__(*args, **kwargs)

Просмотреть файл

@ -2,7 +2,26 @@ import os
import sys
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from configuration import getconf
"""
if 'AIRFLOW_HOME' not in os.environ:
os.environ['AIRFLOW_HOME'] = os.path.join(os.path.dirname(__file__), "..")
AIRFLOW_HOME = os.environ['AIRFLOW_HOME']
"""
BASE_FOLDER = getconf().get('core', 'BASE_FOLDER')
if BASE_FOLDER not in sys.path:
sys.path.append(BASE_FOLDER)
Session = sessionmaker()
#engine = create_engine('mysql://airflow:airflow@localhost/airflow')
engine = create_engine('sqlite:///' + BASE_FOLDER + '/airflow.db' )
Session.configure(bind=engine)
# can't move this to configuration due to ConfigParser interpolation
LOG_FORMAT = \
'[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s'
HEADER = """\
.__ _____.__
@ -11,45 +30,3 @@ _____ |__|_______/ ____\ | ______ _ __
/ __ \| || | \/| | | |_( <_> ) /
(____ /__||__| |__| |____/\____/ \/\_/
\/"""
if 'AIRFLOW_HOME' not in os.environ:
os.environ['AIRFLOW_HOME'] = os.path.join(os.path.dirname(__file__), "..")
AIRFLOW_HOME = os.environ['AIRFLOW_HOME']
BASE_FOLDER = AIRFLOW_HOME + '/airflow'
if BASE_FOLDER not in sys.path:
sys.path.append(BASE_FOLDER)
DAGS_FOLDER = AIRFLOW_HOME + '/dags'
BASE_LOG_FOLDER = AIRFLOW_HOME + "/logs"
HIVE_HOME_PY = '/usr/lib/hive/lib/py'
RUN_AS_MASTER = True
JOB_HEARTBEAT_SEC = 5
ID_LEN = 250 # Used for dag_id and task_id VARCHAR length
LOG_FORMAT = \
'[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s'
PRESTO_DEFAULT_DBID = "presto_default"
HIVE_DEFAULT_DBID = "hive_default"
WEB_SERVER_HOST = '0.0.0.0'
WEB_SERVER_PORT = 8080
Session = sessionmaker()
#engine = create_engine('mysql://airflow:airflow@localhost/airflow')
engine = create_engine('sqlite:///' + BASE_FOLDER + '/airflow.db' )
Session.configure(bind=engine)
CELERY_APP_NAME = "airflow.executors.celery_worker"
CELERY_BROKER = "amqp"
CELERY_RESULTS_BACKEND = "amqp://"
# SMTP settings
SMTP_HOST = 'localhost'
SMTP_PORT = 25
SMTP_PASSWORD = None
SMTP_MAIL_FROM = 'airflow_alerts@mydomain.com'
# Overriding settings defaults with local ones
try:
from airflow.secrets import *
except ImportError as e:
pass

Просмотреть файл

@ -21,9 +21,10 @@ from airflow.settings import Session
from airflow import models
from airflow.models import State
from airflow import settings
from airflow.configuration import getconf
from airflow import utils
dagbag = models.DagBag(settings.DAGS_FOLDER)
dagbag = models.DagBag(getconf().get('core', 'DAGS_FOLDER'))
session = Session()
app = Flask(__name__)
@ -64,7 +65,7 @@ class HomeView(AdminIndexView):
@expose("/")
def index(self):
md = "".join(
open(settings.AIRFLOW_HOME + '/README.md', 'r').readlines())
open(getconf().get('core', 'AIRFLOW_HOME') + '/README.md', 'r').readlines())
content = Markup(markdown.markdown(md))
return self.render('admin/index.html', content=content)
admin = Admin(app, name="Airflow", index_view=HomeView(name='Home'))
@ -110,7 +111,7 @@ class Airflow(BaseView):
dag_id = request.args.get('dag_id')
dag = dagbag.dags[dag_id]
code = "".join(open(dag.filepath, 'r').readlines())
title = dag.filepath.replace(settings.BASE_FOLDER + '/dags/', '')
title = dag.filepath.replace(getconf().get('core', 'BASE_FOLDER') + '/dags/', '')
html_code = highlight(
code, PythonLexer(), HtmlFormatter(noclasses=True))
return self.render(
@ -122,7 +123,7 @@ class Airflow(BaseView):
task_id = request.args.get('task_id')
execution_date = request.args.get('execution_date')
dag = dagbag.dags[dag_id]
loc = settings.BASE_LOG_FOLDER + "/{dag_id}/{task_id}/{execution_date}"
loc = getconf().get('core', 'BASE_LOG_FOLDER') + "/{dag_id}/{task_id}/{execution_date}"
loc = loc.format(**locals())
try:
f = open(loc)
@ -486,7 +487,7 @@ class ModelViewOnly(ModelView):
def filepath_formatter(view, context, model, name):
url = url_for('airflow.code', dag_id=model.dag_id)
short_fp = model.filepath.replace(settings.BASE_FOLDER + '/dags/', '')
short_fp = model.filepath.replace(getconf().get('core', 'BASE_FOLDER') + '/dags/', '')
link = Markup('<a href="{url}">{short_fp}</a>'.format(**locals()))
return link

Просмотреть файл

@ -5,7 +5,6 @@ default_args = {
'mysql_dbid': 'local_mysql',
}
from airflow.operators import HivePartitionSensor
from airflow import settings
from airflow import DAG
dag = DAG("test_wfh")

Просмотреть файл

@ -1,3 +1,5 @@
#!/bin/bash
source $AIRFLOW_HOME/env/bin/activate
export PYTHONPATH=$AIRFLOW_HOME
export AIRFLOW_CONFIG_PATH=$AIRFLOW_HOME/airflow/airflow.cfg
sed -i .bk "s#TO_REPLACE_FROM_OS_ENVIRON#$AIRFLOW_HOME#" $AIRFLOW_CONFIG_PATH