[AIRFLOW-2525] Fix PostgresHook.copy_expert to work with "COPY FROM"

For now PostgresHook.copy_expert supports
"COPY TO" but not "COPY FROM", because it
opens a file with write mode and doesn't
commit operations. This PR fixes it by
opening a file with read and write mode
and committing operations at last.
This commit is contained in:
Kengo Seki 2018-05-25 00:04:19 -04:00
Родитель e4e7b55ad7
Коммит dabf1b962d
2 изменённых файлов: 10 добавлений и 10 удалений

Просмотреть файл

@ -64,10 +64,11 @@ class PostgresHook(DbApiHook):
Executes SQL using psycopg2 copy_expert method
Necessary to execute COPY command without access to a superuser
"""
f = open(filename, 'w')
with closing(self.get_conn()) as conn:
with closing(conn.cursor()) as cur:
cur.copy_expert(sql, f)
with open(filename, 'w+') as f:
with closing(self.get_conn()) as conn:
with closing(conn.cursor()) as cur:
cur.copy_expert(sql, f)
conn.commit()
@staticmethod
def _serialize_cell(cell, conn):

Просмотреть файл

@ -43,17 +43,16 @@ class TestPostgresHook(unittest.TestCase):
def test_copy_expert(self):
m = mock.mock_open(read_data='{"some": "json"}')
with mock.patch('airflow.hooks.postgres_hook.open', m, create=True) as m:
with mock.patch('airflow.hooks.postgres_hook.open', m):
statement = "SQL"
filename = "filename"
self.cur.fetchall.return_value = None
f = m(filename, 'w')
def test_open(filename, mode):
return f
self.assertEqual(None, self.db_hook.copy_expert(statement, filename, open=test_open))
self.assertEqual(None, self.db_hook.copy_expert(statement, filename, open=m))
self.conn.close.assert_called_once()
self.cur.close.assert_called_once()
self.cur.copy_expert.assert_called_once_with(statement, f)
self.conn.commit.assert_called_once()
self.cur.copy_expert.assert_called_once_with(statement, m.return_value)
m.assert_called_once_with(filename, "w+")