[AIRFLOW-XXX] Add task execution process on Celery Execution diagram (#6961)

This commit is contained in:
Kamil Breguła 2020-09-02 13:47:34 +02:00 коммит произвёл GitHub
Родитель 0d76b59c68
Коммит 72b2be71b0
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
4 изменённых файлов: 114 добавлений и 2 удалений

Просмотреть файл

@ -58,8 +58,8 @@ repos:
- --fuzzy-match-generates-todo
files: \.rst$
- id: insert-license
name: Add license for all JS/CSS files
files: \.(js|css)$
name: Add license for all JS/CSS/PUML files
files: \.(js|css|puml)$
exclude: ^\.github/.*$
args:
- --comment-style

Просмотреть файл

@ -158,6 +158,41 @@ The components communicate with each other in many places
* [10] **Scheduler** --> **Celery's result backend** - Gets information about the status of completed tasks
* [11] **Scheduler** --> **Celery's broker** - Put the commands to be executed
Task execution process
----------------------
.. figure:: ../img/run_task_on_celery_executor.png
:scale: 50 %
Sequence diagram - task execution process
Initially, two processes are running:
- SchedulerProcess - process the tasks and run using CeleryExecutor
- WorkerProcess - observes the queue waiting for new tasks to appear
- WorkerChildProcess - waits for new tasks
Two databases are also available:
- QueueBroker
- ResultBackend
During this process, two 2 process are created:
- LocalTaskJobProcess - It logic is described by LocalTaskJob. It is monitoring RawTaskProcess. New processes are started using TaskRunner.
- RawTaskProcess - It is process with the user code e.g. :meth:`~airflow.models.BaseOperator.execute`.
| [1] **SchedulerProcess** processes the tasks and when it finds a task that needs to be done, sends it to the **QueueBroker**.
| [2] **QueueBroker** also begins to periodically query **ResultBackend** for the status of the task.
| [3] **QueueBroker**, when it becomes aware of the task, sends information about it to one WorkerProcess.
| [4] **WorkerProcess** assigns a single task to a one **WorkerChildProcess**.
| [5] **WorkerChildProcess** performs the proper task handling functions - :meth:`~airflow.executor.celery_executor.execute_command`. It creates a new process - **LocalTaskJobProcess**.
| [6] LocalTaskJobProcess logic is described by :class:`~airflow.jobs.local_task_job.LocalTaskJob` class. It starts new process using TaskRunner.
| [7][8] Process **RawTaskProcess** and **LocalTaskJobProcess** is stopped when they have finished their work.
| [10][12] **WorkerChildProcess** notifies the main process - **WorkerProcess** about the end of the task and the availability of subsequent tasks.
| [11] **WorkerProcess** saves status information in **ResultBackend**.
| [13] When **SchedulerProcess** asks **ResultBackend** again about the status, it will get information about the status of the task.
Queues
------

Двоичные данные
docs/img/run_task_on_celery_executor.png Normal file

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 55 KiB

Просмотреть файл

@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/**
* This file contains source code of run_task_on_celery_executor.png image.
*
* If you want regenerate this image, you should follow instructions here:
* https://plantuml.com/starting
*/
@startuml
autonumber
box Scheduler
participant SchedulerProcess order 10
endbox
database QueueBroker order 20
database ResultBackend order 30
box Worker
participant WorkerProcess order 40
participant WorkerChildProcess order 50
participant LocalTaskJobProcess order 60
participant RawTaskProcess order 70
endbox
activate SchedulerProcess
activate WorkerChildProcess
SchedulerProcess->>QueueBroker: Send task
activate QueueBroker
SchedulerProcess->ResultBackend: Pool celery \ntask state
deactivate SchedulerProcess
WorkerChildProcess->QueueBroker: Pool task
QueueBroker->WorkerChildProcess: Send task
deactivate QueueBroker
activate WorkerChildProcess
create LocalTaskJobProcess
WorkerChildProcess->LocalTaskJobProcess: Start process
deactivate
create RawTaskProcess
activate LocalTaskJobProcess
LocalTaskJobProcess->RawTaskProcess: Start process
deactivate LocalTaskJobProcess
activate RawTaskProcess
RawTaskProcess->RawTaskProcess: Execute user code
RawTaskProcess-->LocalTaskJobProcess: Finish process
destroy RawTaskProcess
activate LocalTaskJobProcess
LocalTaskJobProcess-->WorkerChildProcess: Finish process
destroy LocalTaskJobProcess
activate WorkerChildProcess
WorkerChildProcess-->WorkerProcess: Report task result
deactivate WorkerChildProcess
activate WorkerProcess
WorkerProcess-->ResultBackend: Save Celery task state
deactivate WorkerProcess
activate ResultBackend
ResultBackend-->SchedulerProcess: Send celery task state
deactivate ResultBackend
@enduml