[AIRFLOW-4598] Task retries are not exhausted for K8s executor (#5347)

This commit is contained in:
andy-g14 2019-06-04 07:10:02 +05:30 коммит произвёл Daniel Imberman
Родитель e8c5c7a3d8
Коммит 89bc657553
2 изменённых файлов: 37 добавлений и 10 удалений

Просмотреть файл

@ -797,16 +797,6 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
self.log.debug('Could not find key: %s', str(key))
pass
self.event_buffer[key] = state
(dag_id, task_id, ex_time, try_number) = key
with create_session() as session:
item = session.query(TaskInstance).filter_by(
dag_id=dag_id,
task_id=task_id,
execution_date=ex_time
).one()
if state:
item.state = state
session.add(item)
def end(self):
self.log.info('Shutting down Kubernetes executor')

Просмотреть файл

@ -36,6 +36,7 @@ try:
from airflow.kubernetes.worker_configuration import WorkerConfiguration
from airflow.exceptions import AirflowConfigException
from airflow.kubernetes.secret import Secret
from airflow.utils.state import State
except ImportError:
AirflowKubernetesScheduler = None # type: ignore
@ -706,6 +707,42 @@ class TestKubernetesExecutor(unittest.TestCase):
mock.call('executor.running_tasks', mock.ANY)]
mock_stats_gauge.assert_has_calls(calls)
@mock.patch('airflow.executors.kubernetes_executor.KubeConfig')
@mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
@mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
def test_change_state_running(self, mock_get_kube_client, mock_kubernetes_job_watcher, mock_kube_config):
executor = KubernetesExecutor()
executor.start()
key = ('dag_id', 'task_id', 'ex_time', 'try_number1')
executor._change_state(key, State.RUNNING, 'pod_id')
self.assertTrue(executor.event_buffer[key] == State.RUNNING)
@mock.patch('airflow.executors.kubernetes_executor.KubeConfig')
@mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
@mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
@mock.patch('airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
def test_change_state_success(self, mock_delete_pod, mock_get_kube_client, mock_kubernetes_job_watcher,
mock_kube_config):
executor = KubernetesExecutor()
executor.start()
key = ('dag_id', 'task_id', 'ex_time', 'try_number2')
executor._change_state(key, State.SUCCESS, 'pod_id')
self.assertTrue(executor.event_buffer[key] == State.SUCCESS)
mock_delete_pod.assert_called_with('pod_id')
@mock.patch('airflow.executors.kubernetes_executor.KubeConfig')
@mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
@mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
@mock.patch('airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
def test_change_state_failed(self, mock_delete_pod, mock_get_kube_client, mock_kubernetes_job_watcher,
mock_kube_config):
executor = KubernetesExecutor()
executor.start()
key = ('dag_id', 'task_id', 'ex_time', 'try_number3')
executor._change_state(key, State.FAILED, 'pod_id')
self.assertTrue(executor.event_buffer[key] == State.FAILED)
mock_delete_pod.assert_called_with('pod_id')
if __name__ == '__main__':
unittest.main()