ongoing state, fixes #35, r=@twobraids

This commit is contained in:
Peter Bengtsson 2014-06-13 09:03:21 -07:00
Родитель 5db3e11c22
Коммит e39aaad464
6 изменённых файлов: 227 добавлений и 104 удалений

Просмотреть файл

@ -26,7 +26,7 @@ from dbapi2_util import (
)
from generic_app import App, main
from datetimeutil import utc_now
from datetimeutil import utc_now, timesince
from base import (
convert_frequency,
FrequencyDefinitionError,
@ -50,6 +50,7 @@ CREATE_CRONTABBER_SQL = """
first_run timestamp with time zone,
last_run timestamp with time zone,
last_success timestamp with time zone,
ongoing timestamp with time zone,
error_count integer DEFAULT 0,
depends_on text[],
last_error json
@ -145,6 +146,25 @@ class JobStateDatabase(RequiredConfig):
execute_no_results,
CREATE_CRONTABBER_SQL
)
else:
# Check that it has the new `ongoing` column.
try:
self.transaction_executor(
single_value_sql,
"SELECT column_name FROM information_schema.columns "
"WHERE table_name='crontabber' AND column_name='ongoing'"
)
except SQLDidNotReturnSingleValue:
# So that's why then!
# We have to do a quick migration.
self.config.logger.info(
"Have to do a migration and add the `ongoing` field"
)
self.transaction_executor(
execute_no_results,
"ALTER TABLE crontabber ADD ongoing TIMESTAMP "
"WITH TIME ZONE"
)
found = self.transaction_executor(
execute_query_fetchall,
@ -243,13 +263,14 @@ class JobStateDatabase(RequiredConfig):
last_success,
depends_on,
error_count,
last_error
last_error,
ongoing
FROM crontabber
WHERE
app_name = %s"""
columns = (
'next_run', 'first_run', 'last_run', 'last_success',
'depends_on', 'error_count', 'last_error'
'depends_on', 'error_count', 'last_error', 'ongoing'
)
try:
record = self.transaction_executor(single_row_sql, sql, (key,))
@ -278,7 +299,8 @@ class JobStateDatabase(RequiredConfig):
(key,)
)
# the key exists, do an update
next_sql = """UPDATE crontabber
next_sql = """
UPDATE crontabber
SET
next_run = %(next_run)s,
first_run = %(first_run)s,
@ -286,31 +308,36 @@ class JobStateDatabase(RequiredConfig):
last_success = %(last_success)s,
depends_on = %(depends_on)s,
error_count = %(error_count)s,
last_error = %(last_error)s
last_error = %(last_error)s,
ongoing = %(ongoing)s
WHERE
app_name = %(app_name)s"""
app_name = %(app_name)s
"""
except SQLDidNotReturnSingleValue:
# the key does not exist, do an insert
next_sql = """
INSERT INTO crontabber (
app_name,
next_run,
first_run,
last_run,
last_success,
depends_on,
error_count,
last_error
) VALUES (
%(app_name)s,
%(next_run)s,
%(first_run)s,
%(last_run)s,
%(last_success)s,
%(depends_on)s,
%(error_count)s,
%(last_error)s
)"""
INSERT INTO crontabber (
app_name,
next_run,
first_run,
last_run,
last_success,
depends_on,
error_count,
last_error,
ongoing
) VALUES (
%(app_name)s,
%(next_run)s,
%(first_run)s,
%(last_run)s,
%(last_success)s,
%(depends_on)s,
%(error_count)s,
%(last_error)s,
%(ongoing)s
)
"""
parameters = {
'app_name': key,
'next_run': value['next_run'],
@ -319,8 +346,13 @@ class JobStateDatabase(RequiredConfig):
'last_success': value.get('last_success'),
'depends_on': value['depends_on'],
'error_count': value['error_count'],
'last_error': json.dumps(value['last_error'], cls=LastErrorEncoder)
'last_error': json.dumps(
value['last_error'],
cls=LastErrorEncoder
),
'ongoing': value.get('ongoing'),
}
execute_no_results(
connection,
next_sql,
@ -337,13 +369,14 @@ class JobStateDatabase(RequiredConfig):
last_success,
depends_on,
error_count,
last_error
last_error,
ongoing
FROM crontabber
"""
columns = (
'app_name',
'next_run', 'first_run', 'last_run', 'last_success',
'depends_on', 'error_count', 'last_error'
'depends_on', 'error_count', 'last_error', 'ongoing'
)
all = {}
for record in execute_query_iter(connection, sql):
@ -403,62 +436,6 @@ class JobStateDatabase(RequiredConfig):
)
def timesince(d, now): # pragma: no cover
"""
Taken from django.utils.timesince
"""
def ungettext(a, b, n):
if n == 1:
return a
return b
def ugettext(s):
return s
def is_aware(v):
return v.tzinfo is not None and v.tzinfo.utcoffset(v) is not None
chunks = (
(60 * 60 * 24 * 365, lambda n: ungettext('year', 'years', n)),
(60 * 60 * 24 * 30, lambda n: ungettext('month', 'months', n)),
(60 * 60 * 24 * 7, lambda n: ungettext('week', 'weeks', n)),
(60 * 60 * 24, lambda n: ungettext('day', 'days', n)),
(60 * 60, lambda n: ungettext('hour', 'hours', n)),
(60, lambda n: ungettext('minute', 'minutes', n))
)
# Convert datetime.date to datetime.datetime for comparison.
if not isinstance(d, datetime.datetime):
d = datetime.datetime(d.year, d.month, d.day)
if now and not isinstance(now, datetime.datetime):
now = datetime.datetime(now.year, now.month, now.day)
if not now:
now = datetime.datetime.utcnow()
delta = now - d
# ignore microseconds
since = delta.days * 24 * 60 * 60 + delta.seconds
if since <= 0:
# d is in the future compared to now, stop processing.
return u'0 ' + ugettext('minutes')
for i, (seconds, name) in enumerate(chunks):
count = since // seconds
if count != 0:
break
s = ugettext('%(number)d %(type)s') % {
'number': count, 'type': name(count)
}
if i + 1 < len(chunks):
# Now get the second item
seconds2, name2 = chunks[i + 1]
count2 = (since - (seconds * count)) // seconds2
if count2 != 0:
s += ugettext(', %(number)d %(type)s') % {
'number': count2, 'type': name2(count2)
}
return s
# -----------------------------------------------------------------------------
def _default_list_splitter(class_list_str):
return [x.strip() for x in class_list_str.split(',')]
@ -889,10 +866,17 @@ class CronTabber(App):
except KeyError:
print >>stream, '*NO PREVIOUS RUN INFO*'
continue
if info.get('ongoing'):
print >>stream, 'Ongoing now!'.ljust(PAD),
print >>stream, 'Started', '%s ago' % timesince(
_now, info.get('ongoing')
)
print >>stream, 'Last run:'.ljust(PAD),
print >>stream, info['last_run'].strftime(_fmt).ljust(20),
print >>stream, '(%s ago)' % timesince(info['last_run'], _now)
if info['last_run']:
print >>stream, info['last_run'].strftime(_fmt).ljust(20),
print >>stream, '(%s ago)' % timesince(info['last_run'], _now)
else:
print >>stream, 'none'
print >>stream, 'Last success:'.ljust(PAD),
if info.get('last_success'):
print >>stream, info['last_success'].strftime(_fmt).ljust(20),
@ -901,12 +885,18 @@ class CronTabber(App):
else:
print >>stream, 'no previous successful run'
print >>stream, 'Next run:'.ljust(PAD),
print >>stream, info['next_run'].strftime(_fmt).ljust(20),
if _now > info['next_run']:
print >>stream, ('(was %s ago)' %
timesince(info['next_run'], _now))
if info['next_run']:
print >>stream, info['next_run'].strftime(_fmt).ljust(20),
if _now > info['next_run']:
print >>stream, ('(was %s ago)' %
timesince(info['next_run'], _now))
else:
print >>stream, '(in %s)' % timesince(
_now,
info['next_run']
)
else:
print >>stream, '(in %s)' % timesince(_now, info['next_run'])
print >>stream, 'none'
if info.get('last_error'):
print >>stream, 'Error!!'.ljust(PAD),
print >>stream, '(%s times)' % info['error_count']
@ -1135,7 +1125,32 @@ class CronTabber(App):
def _run_job(self, class_, config, info):
# here we go!
instance = class_(config, info)
return instance.main()
self._set_ongoing_job(class_)
result = instance.main()
return result
def _set_ongoing_job(self, class_):
app_name = class_.app_name
info = self.job_state_database.get(app_name)
if info:
info['ongoing'] = datetime.datetime.utcnow()
else:
depends_on = getattr(class_, 'depends_on', [])
if isinstance(depends_on, basestring):
depends_on = [depends_on]
elif not isinstance(depends_on, list):
depends_on = list(depends_on)
info = {
'next_run': None,
'first_run': None,
'last_run': None,
'last_success': None,
'last_error': {},
'error_count': 0,
'depends_on': depends_on,
'ongoing': datetime.datetime.utcnow()
}
self.job_state_database[app_name] = info
def _log_run(self, class_, seconds, time_, last_success, now,
exc_type, exc_value, exc_tb):
@ -1148,7 +1163,7 @@ class CronTabber(App):
elif not isinstance(depends_on, list):
depends_on = list(depends_on)
info['depends_on'] = depends_on
if 'first_run' not in info:
if not info.get('first_run'):
info['first_run'] = now
info['last_run'] = now
if last_success:
@ -1179,6 +1194,10 @@ class CronTabber(App):
info['last_error'] = {}
info['error_count'] = 0
# Clearly it's not "ongoing" any more when it's here, because
# being here means the job has finished.
info['ongoing'] = None
self.job_state_database[app_name] = info
def configtest(self):

Просмотреть файл

@ -43,3 +43,59 @@ def utc_now():
"""
return datetime.datetime.now(UTC())
def timesince(d, now): # pragma: no cover
"""
Taken from django.utils.timesince
"""
def ungettext(a, b, n):
if n == 1:
return a
return b
def ugettext(s):
return s
def is_aware(v):
return v.tzinfo is not None and v.tzinfo.utcoffset(v) is not None
chunks = (
(60 * 60 * 24 * 365, lambda n: ungettext('year', 'years', n)),
(60 * 60 * 24 * 30, lambda n: ungettext('month', 'months', n)),
(60 * 60 * 24 * 7, lambda n: ungettext('week', 'weeks', n)),
(60 * 60 * 24, lambda n: ungettext('day', 'days', n)),
(60 * 60, lambda n: ungettext('hour', 'hours', n)),
(60, lambda n: ungettext('minute', 'minutes', n))
)
# Convert datetime.date to datetime.datetime for comparison.
if not isinstance(d, datetime.datetime):
d = datetime.datetime(d.year, d.month, d.day)
if now and not isinstance(now, datetime.datetime):
now = datetime.datetime(now.year, now.month, now.day)
if not now:
now = datetime.datetime.utcnow()
delta = now - d
# ignore microseconds
since = delta.days * 24 * 60 * 60 + delta.seconds
if since <= 0:
# d is in the future compared to now, stop processing.
return u'0 ' + ugettext('minutes')
for i, (seconds, name) in enumerate(chunks):
count = since // seconds
if count != 0:
break
s = ugettext('%(number)d %(type)s') % {
'number': count, 'type': name(count)
}
if i + 1 < len(chunks):
# Now get the second item
seconds2, name2 = chunks[i + 1]
count2 = (since - (seconds * count)) // seconds2
if count2 != 0:
s += ugettext(', %(number)d %(type)s') % {
'number': count2, 'type': name2(count2)
}
return s

Просмотреть файл

@ -176,12 +176,13 @@ class IntegrationTestCaseBase(TestCaseBase):
last_success,
error_count,
depends_on,
last_error
last_error,
ongoing
FROM crontabber;
""")
columns = (
'app_name', 'next_run', 'first_run', 'last_run', 'last_success',
'error_count', 'depends_on', 'last_error'
'error_count', 'depends_on', 'last_error', 'ongoing'
)
structure = {}
try:

Просмотреть файл

@ -150,6 +150,22 @@ class TestStateDatabase(IntegrationTestCaseBase):
}
ok_(self.database.has_data())
def test_migration_of_ongoing_field(self):
# this makes sure the table exists
self.database = app.JobStateDatabase(self.config.crontabber)
self.conn.cursor().execute("""
ALTER TABLE crontabber DROP COLUMN ongoing
""")
self.conn.commit()
# this will run the migration
self.database = app.JobStateDatabase(self.config.crontabber)
cursor = self.conn.cursor()
cursor.execute("""
SELECT column_name FROM information_schema.columns
WHERE table_name='crontabber' AND column_name='ongoing'
""")
ok_(cursor.fetchone())
def test_iterate_app_names(self):
app_names = set()
for app_name in self.database:
@ -229,7 +245,8 @@ class TestStateDatabase(IntegrationTestCaseBase):
'last_success': None,
'depends_on': [],
'error_count': 0,
'last_error': {}
'last_error': {},
'ongoing': None,
}
self.database['foo'] = data
eq_(self.database['foo'], data)
@ -242,7 +259,8 @@ class TestStateDatabase(IntegrationTestCaseBase):
'last_success': utc_now(),
'depends_on': ['bar'],
'error_count': 1,
'last_error': {}
'last_error': {},
'ongoing': None
}
bar = {
'next_run': utc_now(),
@ -251,7 +269,8 @@ class TestStateDatabase(IntegrationTestCaseBase):
'last_success': None,
'depends_on': [],
'error_count': 2,
'last_error': {}
'last_error': {},
'ongoing': None
}
self.database['foo'] = foo
self.database['bar'] = bar
@ -274,7 +293,8 @@ class TestStateDatabase(IntegrationTestCaseBase):
'last_success': None,
'depends_on': [],
'error_count': 0,
'last_error': {}
'last_error': {},
'ongoing': None,
}
self.database['foo'] = foo
eq_(self.database.get('foo'), foo)
@ -288,7 +308,8 @@ class TestStateDatabase(IntegrationTestCaseBase):
'last_success': None,
'depends_on': [],
'error_count': 0,
'last_error': {}
'last_error': {},
'ongoing': None,
}
self.database['foo'] = foo
popped_foo = self.database.pop('foo')
@ -359,6 +380,7 @@ class TestCrontabber(IntegrationTestCaseBase):
information['last_run'],
information['last_success']
)
ok_(not information['ongoing'])
# run it again and nothing should happen
count_infos = len([x for x in infos if 'Ran BasicJob' in x])
@ -403,8 +425,8 @@ class TestCrontabber(IntegrationTestCaseBase):
infos = [x[0][0] for x in config.logger.info.call_args_list]
infos = [x for x in infos if x.startswith('Ran ')]
ok_('Ran FooJob' in infos)
ok_('Ran BarJob' in infos)
ok_('Ran FooJob' in infos, infos)
ok_('Ran BarJob' in infos, infos)
ok_(infos.index('Ran FooJob') <
infos.index('Ran BarJob'))
count = len(infos)
@ -445,6 +467,7 @@ class TestCrontabber(IntegrationTestCaseBase):
eq_(information['error_count'], 1)
ok_(information['last_error'])
ok_(not information.get('last_success'), {})
ok_(not information['ongoing'])
today = utc_now()
self.assertAlmostEqual(today, information['last_run'])
_next_run = utc_now() + datetime.timedelta(seconds=100)
@ -1160,6 +1183,7 @@ class TestCrontabber(IntegrationTestCaseBase):
structure = self._load_structure()
information = structure['foo-backfill']
ok_(not information['last_error'])
eq_(information['first_run'], information['last_run'])
# last_success might be a few microseconds off

Просмотреть файл

@ -0,0 +1,23 @@
import datetime
import unittest
from nose.tools import eq_, ok_
from crontabber.datetimeutil import utc_now, timesince
class TestDatetimeUtils(unittest.TestCase):
def test_utc_now(self):
now = utc_now()
ok_(now.tzinfo)
dt = datetime.datetime.utcnow()
eq_(now.tzinfo.tzname(dt), 'UTC')
eq_(now.tzinfo.utcoffset(dt), datetime.timedelta(0))
eq_(now.tzinfo.dst(dt), datetime.timedelta(0))
def test_timesince(self):
now = utc_now()
then = now - datetime.timedelta(days=365)
eq_(timesince(then, now), '1 year')

Просмотреть файл

@ -6,7 +6,7 @@
1. To run this example, you need to first generate a crontabber.ini
file. You do that with::
crontabber --admin.print_config=ini > crontabber.ini
crontabber --admin.print_conf=ini > crontabber.ini
2. Now you need to edit crontabber.ini. It can be quite scary.
The first two things to do are: