[AIRFLOW-1170] DbApiHook insert_rows inserts parameters separately
Instead of creating a sql statement with all values, we send the values separately to prevent sql injection Closes #2270 from NielsZeilemaker/AIRFLOW-1170
This commit is contained in:
Родитель
03704ce026
Коммит
3b589a9f73
|
@ -180,7 +180,7 @@ class DbApiHook(BaseHook):
|
|||
def insert_rows(self, table, rows, target_fields=None, commit_every=1000):
|
||||
"""
|
||||
A generic way to insert a set of tuples into a table,
|
||||
the whole set of inserts is treated as one transaction
|
||||
a new transaction is created every commit_every rows
|
||||
|
||||
:param table: Name of the target table
|
||||
:type table: str
|
||||
|
@ -210,11 +210,12 @@ class DbApiHook(BaseHook):
|
|||
for cell in row:
|
||||
l.append(self._serialize_cell(cell, conn))
|
||||
values = tuple(l)
|
||||
placeholders = ["%s",]*len(values)
|
||||
sql = "INSERT INTO {0} {1} VALUES ({2});".format(
|
||||
table,
|
||||
target_fields,
|
||||
",".join(values))
|
||||
cur.execute(sql)
|
||||
",".join(placeholders))
|
||||
cur.execute(sql, values)
|
||||
if commit_every and i % commit_every == 0:
|
||||
conn.commit()
|
||||
logging.info(
|
||||
|
@ -237,16 +238,11 @@ class DbApiHook(BaseHook):
|
|||
:rtype: str
|
||||
"""
|
||||
|
||||
if isinstance(cell, basestring):
|
||||
return "'" + str(cell).replace("'", "''") + "'"
|
||||
elif cell is None:
|
||||
return 'NULL'
|
||||
elif isinstance(cell, numpy.datetime64):
|
||||
return "'" + str(cell) + "'"
|
||||
elif isinstance(cell, datetime):
|
||||
return "'" + cell.isoformat() + "'"
|
||||
else:
|
||||
return str(cell)
|
||||
if cell is None:
|
||||
return None
|
||||
if isinstance(cell, datetime):
|
||||
return cell.isoformat()
|
||||
return str(cell)
|
||||
|
||||
def bulk_dump(self, table, tmp_file):
|
||||
"""
|
||||
|
|
|
@ -87,14 +87,15 @@ class MySqlHook(DbApiHook):
|
|||
@staticmethod
|
||||
def _serialize_cell(cell, conn):
|
||||
"""
|
||||
Returns the MySQL literal of the cell as a string.
|
||||
MySQLdb converts an argument to a literal when passing those seperately to execute.
|
||||
Hence, this method does nothing.
|
||||
|
||||
:param cell: The cell to insert into the table
|
||||
:type cell: object
|
||||
:param conn: The database connection
|
||||
:type conn: connection object
|
||||
:return: The serialized cell
|
||||
:rtype: str
|
||||
:return: The same cell
|
||||
:rtype: object
|
||||
"""
|
||||
|
||||
return conn.literal(cell)
|
||||
return cell
|
||||
|
|
|
@ -50,14 +50,17 @@ class PostgresHook(DbApiHook):
|
|||
@staticmethod
|
||||
def _serialize_cell(cell, conn):
|
||||
"""
|
||||
Returns the Postgres literal of the cell as a string.
|
||||
Postgresql will adapt all arguments to the execute() method internally,
|
||||
hence we return cell without any conversion.
|
||||
|
||||
See http://initd.org/psycopg/docs/advanced.html#adapting-new-types for
|
||||
more information.
|
||||
|
||||
:param cell: The cell to insert into the table
|
||||
:type cell: object
|
||||
:param conn: The database connection
|
||||
:type conn: connection object
|
||||
:return: The serialized cell
|
||||
:rtype: str
|
||||
:return: The cell
|
||||
:rtype: object
|
||||
"""
|
||||
|
||||
return psycopg2.extensions.adapt(cell).getquoted().decode('utf-8')
|
||||
return cell
|
||||
|
|
Загрузка…
Ссылка в новой задаче