[AIRFLOW-3343] Update DockerOperator for Docker-py 3.0.0 API changes (#4187)

The API of `wait()` changed to return a dict, not just a number so this
Operator wasn't actually working, but the tests were passing because the
return was mocked in-correctly.

I also removed `shm_size` from kwargs passed to BaseOperator to avoid
the deprecation warning about unknown args.
This commit is contained in:
Ash Berlin-Taylor 2018-11-14 22:06:03 +00:00 коммит произвёл GitHub
Родитель ef633e6f2a
Коммит 7d6e8cd4e6
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
3 изменённых файлов: 16 добавлений и 12 удалений

Просмотреть файл

@ -108,6 +108,9 @@ class DockerOperator(BaseOperator):
:type xcom_all: bool
:param docker_conn_id: ID of the Airflow connection to use
:type docker_conn_id: str
:param shm_size: Size of ``/dev/shm`` in bytes. The size must be
greater than 0. If omitted uses system default.
:type shm_size: int
"""
template_fields = ('command', 'environment',)
template_ext = ('.sh', '.bash',)
@ -139,6 +142,7 @@ class DockerOperator(BaseOperator):
dns=None,
dns_search=None,
auto_remove=False,
shm_size=None,
*args,
**kwargs):
@ -167,7 +171,7 @@ class DockerOperator(BaseOperator):
self.xcom_push_flag = xcom_push
self.xcom_all = xcom_all
self.docker_conn_id = docker_conn_id
self.shm_size = kwargs.get('shm_size')
self.shm_size = shm_size
self.cli = None
self.container = None
@ -197,7 +201,7 @@ class DockerOperator(BaseOperator):
if self.force_pull or len(self.cli.images(name=self.image)) == 0:
self.log.info('Pulling docker image %s', self.image)
for l in self.cli.pull(self.image, stream=True):
output = json.loads(l.decode('utf-8'))
output = json.loads(l.decode('utf-8').strip())
if 'status' in output:
self.log.info("%s", output['status'])
@ -230,9 +234,9 @@ class DockerOperator(BaseOperator):
line = line.decode('utf-8')
self.log.info(line)
exit_code = self.cli.wait(self.container['Id'])
if exit_code != 0:
raise AirflowException('docker container failed')
result = self.cli.wait(self.container['Id'])
if result['StatusCode'] != 0:
raise AirflowException('docker container failed: ' + repr(result))
if self.xcom_push_flag:
return self.cli.logs(container=self.container['Id']) \

Просмотреть файл

@ -174,7 +174,7 @@ doc = [
'sphinx-rtd-theme>=0.1.6',
'Sphinx-PyPI-upload>=0.2.1'
]
docker = ['docker>=3.0.0']
docker = ['docker~=3.0']
druid = ['pydruid>=0.4.1']
elasticsearch = [
'elasticsearch>=5.0.0,<6.0.0',

Просмотреть файл

@ -51,7 +51,7 @@ class DockerOperatorTestCase(unittest.TestCase):
client_mock.images.return_value = []
client_mock.logs.return_value = ['container log']
client_mock.pull.return_value = [b'{"status":"pull log"}']
client_mock.wait.return_value = 0
client_mock.wait.return_value = {"StatusCode": 0}
client_class_mock.return_value = client_mock
@ -97,7 +97,7 @@ class DockerOperatorTestCase(unittest.TestCase):
client_mock.images.return_value = []
client_mock.logs.return_value = []
client_mock.pull.return_value = []
client_mock.wait.return_value = 0
client_mock.wait.return_value = {"StatusCode": 0}
client_class_mock.return_value = client_mock
tls_mock = mock.Mock()
@ -123,7 +123,7 @@ class DockerOperatorTestCase(unittest.TestCase):
client_mock.images.return_value = []
client_mock.logs.return_value = ['unicode container log 😁']
client_mock.pull.return_value = []
client_mock.wait.return_value = 0
client_mock.wait.return_value = {"StatusCode": 0}
client_class_mock.return_value = client_mock
@ -145,7 +145,7 @@ class DockerOperatorTestCase(unittest.TestCase):
client_mock.images.return_value = []
client_mock.logs.return_value = []
client_mock.pull.return_value = []
client_mock.wait.return_value = 1
client_mock.wait.return_value = {"StatusCode": 1}
client_class_mock.return_value = client_mock
@ -174,7 +174,7 @@ class DockerOperatorTestCase(unittest.TestCase):
client_mock.create_container.return_value = {'Id': 'some_id'}
client_mock.logs.return_value = []
client_mock.pull.return_value = []
client_mock.wait.return_value = 0
client_mock.wait.return_value = {"StatusCode": 0}
operator_client_mock.return_value = client_mock
# Create the DockerOperator
@ -209,7 +209,7 @@ class DockerOperatorTestCase(unittest.TestCase):
client_mock.create_container.return_value = {'Id': 'some_id'}
client_mock.logs.return_value = []
client_mock.pull.return_value = []
client_mock.wait.return_value = 0
client_mock.wait.return_value = {"StatusCode": 0}
operator_client_mock.return_value = client_mock
# Create the DockerOperator