[AIRFLOW-1036] Randomize exponential backoff

This prevents the thundering herd problem. Using a
combination of
dag_run, task_id, and execution_date makes this
random with respect to
task instances, while still being deterministic
across machines. The
retry delay is within a range that doubles in
size.

Closes #2262 from saguziel/aguziel-random-
exponential-backoff
This commit is contained in:
Alex Guziel 2017-04-29 17:11:58 +02:00 коммит произвёл Bolke de Bruin
Родитель 2fa6905f41
Коммит 66168efa12
2 изменённых файлов: 18 добавлений и 5 удалений

Просмотреть файл

@ -1183,13 +1183,19 @@ class TaskInstance(Base):
""" """
delay = self.task.retry_delay delay = self.task.retry_delay
if self.task.retry_exponential_backoff: if self.task.retry_exponential_backoff:
min_backoff = int(delay.total_seconds() * (2 ** (self.try_number - 2)))
# deterministic per task instance
hash = int(hashlib.sha1("{}#{}#{}#{}".format(self.dag_id, self.task_id,
self.execution_date, self.try_number).encode('utf-8')).hexdigest(), 16)
# between 0.5 * delay * (2^retry_number) and 1.0 * delay * (2^retry_number)
modded_hash = min_backoff + hash % min_backoff
# timedelta has a maximum representable value. The exponentiation # timedelta has a maximum representable value. The exponentiation
# here means this value can be exceeded after a certain number # here means this value can be exceeded after a certain number
# of tries (around 50 if the initial delay is 1s, even fewer if # of tries (around 50 if the initial delay is 1s, even fewer if
# the delay is larger). Cap the value here before creating a # the delay is larger). Cap the value here before creating a
# timedelta object so the operation doesn't fail. # timedelta object so the operation doesn't fail.
delay_backoff_in_seconds = min( delay_backoff_in_seconds = min(
delay.total_seconds() * (2 ** (self.try_number - 1)), modded_hash,
timedelta.max.total_seconds() - 1 timedelta.max.total_seconds() - 1
) )
delay = timedelta(seconds=delay_backoff_in_seconds) delay = timedelta(seconds=delay_backoff_in_seconds)

Просмотреть файл

@ -838,18 +838,25 @@ class TaskInstanceTest(unittest.TestCase):
owner='airflow', owner='airflow',
start_date=datetime.datetime(2016, 2, 1, 0, 0, 0)) start_date=datetime.datetime(2016, 2, 1, 0, 0, 0))
ti = TI( ti = TI(
task=task, execution_date=datetime.datetime.now()) task=task, execution_date=DEFAULT_DATE)
ti.end_date = datetime.datetime.now() ti.end_date = datetime.datetime.now()
ti.try_number = 1 ti.try_number = 1
dt = ti.next_retry_datetime() dt = ti.next_retry_datetime()
self.assertEqual(dt, ti.end_date + delay) # between 30 * 2^0.5 and 30 * 2^1 (15 and 30)
self.assertEqual(dt, ti.end_date + datetime.timedelta(seconds=20.0))
ti.try_number = 4
dt = ti.next_retry_datetime()
# between 30 * 2^2 and 30 * 2^3 (120 and 240)
self.assertEqual(dt, ti.end_date + datetime.timedelta(seconds=181.0))
ti.try_number = 6 ti.try_number = 6
dt = ti.next_retry_datetime() dt = ti.next_retry_datetime()
self.assertEqual(dt, ti.end_date + (2 ** 5) * delay) # between 30 * 2^4 and 30 * 2^5 (480 and 960)
self.assertEqual(dt, ti.end_date + datetime.timedelta(seconds=825.0))
ti.try_number = 8 ti.try_number = 9
dt = ti.next_retry_datetime() dt = ti.next_retry_datetime()
self.assertEqual(dt, ti.end_date+max_delay) self.assertEqual(dt, ti.end_date+max_delay)