Fixing the conn_ids
This commit is contained in:
Родитель
9e6ee42772
Коммит
1bfbd752f4
|
@ -26,14 +26,16 @@ class DbApiHook(BaseHook):
|
||||||
"""
|
"""
|
||||||
supports_autocommit = False
|
supports_autocommit = False
|
||||||
|
|
||||||
def __init__(self, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
try:
|
if not self.conn_name_attr:
|
||||||
self.conn_id_name = kwargs[self.conn_name_attr]
|
|
||||||
except NameError:
|
|
||||||
raise AirflowException("conn_name_attr is not defined")
|
raise AirflowException("conn_name_attr is not defined")
|
||||||
except KeyError:
|
elif len(args) == 1:
|
||||||
raise AirflowException(
|
setattr(self, self.conn_name_attr, args[0])
|
||||||
self.conn_name_attr + " was not passed in the kwargs")
|
elif self.conn_name_attr not in kwargs:
|
||||||
|
setattr(self, self.conn_name_attr, self.default_conn_name)
|
||||||
|
else:
|
||||||
|
setattr(self, self.conn_name_attr, kwargs[self.conn_name_attr])
|
||||||
|
|
||||||
|
|
||||||
def get_pandas_df(self, sql, parameters=None):
|
def get_pandas_df(self, sql, parameters=None):
|
||||||
'''
|
'''
|
||||||
|
|
|
@ -16,7 +16,7 @@ class MySqlHook(DbApiHook):
|
||||||
"""
|
"""
|
||||||
Returns a mysql connection object
|
Returns a mysql connection object
|
||||||
"""
|
"""
|
||||||
conn = self.get_connection(self.conn_id_name)
|
conn = self.get_connection(self.mysql_conn_id)
|
||||||
conn = MySQLdb.connect(
|
conn = MySQLdb.connect(
|
||||||
conn.host,
|
conn.host,
|
||||||
conn.login,
|
conn.login,
|
||||||
|
|
|
@ -13,7 +13,7 @@ class PostgresHook(DbApiHook):
|
||||||
supports_autocommit = True
|
supports_autocommit = True
|
||||||
|
|
||||||
def get_conn(self):
|
def get_conn(self):
|
||||||
conn = self.get_connection(self.conn_id_name)
|
conn = self.get_connection(self.postgres_conn_id)
|
||||||
return psycopg2.connect(
|
return psycopg2.connect(
|
||||||
host=conn.host,
|
host=conn.host,
|
||||||
user=conn.login,
|
user=conn.login,
|
||||||
|
|
|
@ -26,7 +26,7 @@ class PrestoHook(DbApiHook):
|
||||||
|
|
||||||
def get_conn(self):
|
def get_conn(self):
|
||||||
"""Returns a connection object"""
|
"""Returns a connection object"""
|
||||||
db = self.get_connection(self.conn_id_name)
|
db = self.get_connection(self.presto_conn_id)
|
||||||
return presto.connect(
|
return presto.connect(
|
||||||
host=db.host,
|
host=db.host,
|
||||||
port=db.port,
|
port=db.port,
|
||||||
|
|
|
@ -17,6 +17,6 @@ class SqliteHook(DbApiHook):
|
||||||
"""
|
"""
|
||||||
Returns a sqlite connection object
|
Returns a sqlite connection object
|
||||||
"""
|
"""
|
||||||
conn = self.get_connection(self.conn_id_name)
|
conn = self.get_connection(self.sqlite_conn_id)
|
||||||
conn = sqlite3.connect(conn.host)
|
conn = sqlite3.connect(conn.host)
|
||||||
return conn
|
return conn
|
||||||
|
|
Загрузка…
Ссылка в новой задаче