765 строки
31 KiB
Python
765 строки
31 KiB
Python
# encoding: utf-8
|
|
#
|
|
#
|
|
# This Source Code Form is subject to the terms of the Mozilla Public
|
|
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
|
# You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
from __future__ import absolute_import
|
|
from __future__ import division
|
|
from __future__ import unicode_literals
|
|
|
|
import time
|
|
|
|
# Use import as follows to prevent
|
|
# circular dependency conflict for
|
|
# TUIDService, which makes use of the
|
|
# Clogger
|
|
import tuid.service
|
|
from jx_python import jx
|
|
from mo_dots import Null, coalesce
|
|
from mo_hg.hg_mozilla_org import HgMozillaOrg
|
|
from mo_logs import Log
|
|
from mo_threads import Till, Thread, Lock, Queue, Signal
|
|
from mo_times.durations import DAY
|
|
from mo_http import http
|
|
from pyLibrary.sql import sql_list, quote_set
|
|
from tuid import sql
|
|
|
|
RETRY = {"times": 3, "sleep": 5}
|
|
SQL_CSET_BATCH_SIZE = 500
|
|
CSET_TIP_WAIT_TIME = 5 * 60 # seconds
|
|
CSET_BACKFILL_WAIT_TIME = 1 * 60 # seconds
|
|
CSET_MAINTENANCE_WAIT_TIME = 30 * 60 # seconds
|
|
CSET_DELETION_WAIT_TIME = 1 * 60 # seconds
|
|
TUID_EXISTENCE_WAIT_TIME = 1 * 60 # seconds
|
|
TIME_TO_KEEP_ANNOTATIONS = 5 * DAY
|
|
MAX_TIPFILL_CLOGS = 60 # changeset logs
|
|
MAX_BACKFILL_CLOGS = 200 # changeset logs
|
|
CHANGESETS_PER_CLOG = 20 # changesets
|
|
BACKFILL_REVNUM_TIMEOUT = int(MAX_BACKFILL_CLOGS * 2.5) # Assume 2.5 seconds per clog
|
|
MINIMUM_PERMANENT_CSETS = 1000 # changesets
|
|
MAXIMUM_NONPERMANENT_CSETS = 20000 # changesets
|
|
SIGNAL_MAINTENACE_CSETS = MAXIMUM_NONPERMANENT_CSETS + (0.1 * MAXIMUM_NONPERMANENT_CSETS)
|
|
UPDATE_VERY_OLD_FRONTIERS = False
|
|
|
|
|
|
class Clogger:
|
|
def __init__(self, conn=None, tuid_service=None, kwargs=None):
|
|
try:
|
|
self.config = kwargs
|
|
|
|
self.conn = conn if conn else sql.Sql(self.config.database.name)
|
|
self.hg_cache = HgMozillaOrg(kwargs=self.config.hg_cache, use_cache=True) if self.config.hg_cache else Null
|
|
|
|
self.tuid_service = tuid_service if tuid_service else tuid.service.TUIDService(
|
|
database=None, hg=None, kwargs=self.config, conn=self.conn, clogger=self
|
|
)
|
|
self.rev_locker = Lock()
|
|
self.working_locker = Lock()
|
|
|
|
self.init_db()
|
|
self.next_revnum = coalesce(self.conn.get_one("SELECT max(revnum)+1 FROM csetLog")[0], 1)
|
|
self.csets_todo_backwards = Queue(name="Clogger.csets_todo_backwards")
|
|
self.deletions_todo = Queue(name="Clogger.deletions_todo")
|
|
self.maintenance_signal = Signal(name="Clogger.maintenance_signal")
|
|
self.config = self.config.tuid
|
|
|
|
self.disable_backfilling = False
|
|
self.disable_tipfilling = False
|
|
self.disable_deletion = False
|
|
self.disable_maintenance = False
|
|
|
|
# Make sure we are filled before allowing queries
|
|
numrevs = self.conn.get_one("SELECT count(revnum) FROM csetLog")[0]
|
|
if numrevs < MINIMUM_PERMANENT_CSETS:
|
|
Log.note("Filling in csets to hold {{minim}} csets.", minim=MINIMUM_PERMANENT_CSETS)
|
|
oldest_rev = 'tip'
|
|
with self.conn.transaction() as t:
|
|
tmp = t.query("SELECT min(revnum), revision FROM csetLog").data[0][1]
|
|
if tmp:
|
|
oldest_rev = tmp
|
|
self._fill_in_range(
|
|
MINIMUM_PERMANENT_CSETS - numrevs,
|
|
oldest_rev,
|
|
timestamp=False
|
|
)
|
|
|
|
Log.note(
|
|
"Table is filled with atleast {{minim}} entries. Starting workers...",
|
|
minim=MINIMUM_PERMANENT_CSETS
|
|
)
|
|
|
|
Thread.run('clogger-tip', self.fill_forward_continuous)
|
|
Thread.run('clogger-backfill', self.fill_backward_with_list)
|
|
Thread.run('clogger-maintenance', self.csetLog_maintenance)
|
|
Thread.run('clogger-deleter', self.csetLog_deleter)
|
|
|
|
Log.note("Started clogger workers.")
|
|
except Exception as e:
|
|
Log.warning("Cannot setup clogger: {{cause}}", cause=str(e))
|
|
|
|
|
|
def init_db(self):
|
|
with self.conn.transaction() as t:
|
|
t.execute('''
|
|
CREATE TABLE IF NOT EXISTS csetLog (
|
|
revnum INTEGER PRIMARY KEY,
|
|
revision CHAR(12) NOT NULL,
|
|
timestamp INTEGER
|
|
);''')
|
|
|
|
|
|
def revnum(self):
|
|
"""
|
|
:return: max revnum that was added
|
|
"""
|
|
return coalesce(self.conn.get_one("SELECT max(revnum) as revnum FROM csetLog")[0], 0)
|
|
|
|
|
|
def get_tip(self, transaction):
|
|
return transaction.get_one(
|
|
"SELECT max(revnum) as revnum, revision FROM csetLog"
|
|
)
|
|
|
|
|
|
def get_tail(self, transaction):
|
|
return transaction.get_one(
|
|
"SELECT min(revnum) as revnum, revision FROM csetLog"
|
|
)
|
|
|
|
|
|
def _get_clog(self, clog_url):
|
|
try:
|
|
Log.note("Searching through changelog {{url}}", url=clog_url)
|
|
clog_obj = http.get_json(clog_url, retry=RETRY)
|
|
return clog_obj
|
|
except Exception as e:
|
|
Log.error(
|
|
"Unexpected error getting changset-log for {{url}}: {{error}}",
|
|
url=clog_url,
|
|
error=e
|
|
)
|
|
|
|
|
|
def _get_one_revision(self, transaction, cset_entry):
|
|
# Returns a single revision if it exists
|
|
_, rev, _ = cset_entry
|
|
return transaction.get_one("SELECT revision FROM csetLog WHERE revision=?", (rev,))
|
|
|
|
|
|
def _get_one_revnum(self, transaction, rev):
|
|
# Returns a single revnum if it exists
|
|
return transaction.get_one("SELECT revnum FROM csetLog WHERE revision=?", (rev,))
|
|
|
|
|
|
def _get_revnum_range(self, transaction, revnum1, revnum2):
|
|
# Returns a range of revision numbers (that is inclusive)
|
|
high_num = max(revnum1, revnum2)
|
|
low_num = min(revnum1, revnum2)
|
|
|
|
return transaction.query(
|
|
"SELECT revnum, revision FROM csetLog WHERE "
|
|
"revnum >= " + str(low_num) + " AND revnum <= " + str(high_num)
|
|
).data
|
|
|
|
|
|
def recompute_table_revnums(self):
|
|
'''
|
|
Recomputes the revnums for the csetLog table
|
|
by creating a new table, and copying csetLog to
|
|
it. The INTEGER PRIMARY KEY in the temp table auto increments
|
|
as rows are added.
|
|
|
|
IMPORTANT: Only call this after acquiring the
|
|
lock `self.working_locker`.
|
|
:return:
|
|
'''
|
|
with self.conn.transaction() as t:
|
|
t.execute('''
|
|
CREATE TABLE temp (
|
|
revnum INTEGER PRIMARY KEY,
|
|
revision CHAR(12) NOT NULL,
|
|
timestamp INTEGER
|
|
);''')
|
|
|
|
t.execute(
|
|
"INSERT INTO temp (revision, timestamp) "
|
|
"SELECT revision, timestamp FROM csetlog ORDER BY revnum ASC"
|
|
)
|
|
|
|
t.execute("DROP TABLE csetLog;")
|
|
t.execute("ALTER TABLE temp RENAME TO csetLog;")
|
|
|
|
|
|
def check_for_maintenance(self):
|
|
'''
|
|
Returns True if the maintenance worker should be run now,
|
|
and False otherwise.
|
|
:return:
|
|
'''
|
|
numrevs = self.conn.get_one("SELECT count(revnum) FROM csetLog")[0]
|
|
if numrevs >= SIGNAL_MAINTENACE_CSETS:
|
|
return True
|
|
return False
|
|
|
|
|
|
def add_cset_entries(self, ordered_rev_list, timestamp=False, number_forward=True):
|
|
'''
|
|
Adds a list of revisions to the table. Assumes ordered_rev_list is an ordered
|
|
based on how changesets are found in the changelog. Going forwards or backwards is dealt
|
|
with by flipping the list
|
|
:param ordered_cset_list: Order given from changeset log searching.
|
|
:param timestamp: If false, records are kept indefinitely
|
|
but if holes exist: (delete, None, delete, None)
|
|
those delete's with None's around them
|
|
will not be deleted.
|
|
:param numbered: If True, this function will number the revision list
|
|
by going forward from max(revNum), else it'll go backwards
|
|
from revNum, then add X to all revnums and self.next_revnum
|
|
where X is the length of ordered_rev_list
|
|
:return:
|
|
'''
|
|
with self.conn.transaction() as t:
|
|
current_min = t.get_one("SELECT min(revnum) FROM csetlog")[0]
|
|
current_max = t.get_one("SELECT max(revnum) FROM csetlog")[0]
|
|
if not current_min or not current_max:
|
|
current_min = 0
|
|
current_max = 0
|
|
|
|
direction = -1
|
|
start = current_min - 1
|
|
if number_forward:
|
|
direction = 1
|
|
start = current_max + 1
|
|
ordered_rev_list = ordered_rev_list[::-1]
|
|
|
|
insert_list = [
|
|
(
|
|
start + direction * count,
|
|
rev,
|
|
int(time.time()) if timestamp else -1
|
|
)
|
|
for count, rev in enumerate(ordered_rev_list)
|
|
]
|
|
|
|
# In case of overlapping requests
|
|
fmt_insert_list = []
|
|
for cset_entry in insert_list:
|
|
tmp = self._get_one_revision(t, cset_entry)
|
|
if not tmp:
|
|
fmt_insert_list.append(cset_entry)
|
|
|
|
for _, tmp_insert_list in jx.chunk(fmt_insert_list, size=SQL_CSET_BATCH_SIZE):
|
|
t.execute(
|
|
"INSERT INTO csetLog (revnum, revision, timestamp)" +
|
|
" VALUES " +
|
|
sql_list(
|
|
quote_set((revnum, revision, timestamp))
|
|
for revnum, revision, timestamp in tmp_insert_list
|
|
)
|
|
)
|
|
|
|
# Move the revision numbers forward if needed
|
|
self.recompute_table_revnums()
|
|
|
|
# Start a maintenance run if needed
|
|
if self.check_for_maintenance():
|
|
self.maintenance_signal.go()
|
|
|
|
|
|
def _fill_in_range(self, parent_cset, child_cset, timestamp=False, number_forward=True):
|
|
'''
|
|
Fills cset logs in a certain range. 'parent_cset' can be an int and in that case,
|
|
we get that many changesets instead. If parent_cset is an int, then we consider
|
|
that we are going backwards (number_forward is False) and we ignore the first
|
|
changeset of the first log, and we ignore the setting for number_forward.
|
|
Otherwise, we continue until we find the given 'parent_cset'.
|
|
:param parent_cset:
|
|
:param child_cset:
|
|
:param timestamp:
|
|
:param number_forward:
|
|
:return:
|
|
'''
|
|
csets_to_add = []
|
|
found_parent = False
|
|
find_parent = False
|
|
if type(parent_cset) != int:
|
|
find_parent = True
|
|
elif parent_cset >= MAX_BACKFILL_CLOGS * CHANGESETS_PER_CLOG:
|
|
Log.warning(
|
|
"Requested number of new changesets {{num}} is too high. "
|
|
"Max number that can be requested is {{maxnum}}.",
|
|
num=parent_cset,
|
|
maxnum=MAX_BACKFILL_CLOGS * CHANGESETS_PER_CLOG
|
|
)
|
|
return None
|
|
|
|
csets_found = 0
|
|
clogs_seen = 0
|
|
final_rev = child_cset
|
|
while not found_parent and clogs_seen < MAX_BACKFILL_CLOGS:
|
|
clog_url = self.tuid_service.hg_url / self.config.hg.branch / 'json-log' / final_rev
|
|
clog_obj = self._get_clog(clog_url)
|
|
clog_csets_list = list(clog_obj['changesets'])
|
|
for clog_cset in clog_csets_list[:-1]:
|
|
if not number_forward and csets_found <= 0:
|
|
# Skip this entry it already exists
|
|
csets_found += 1
|
|
continue
|
|
|
|
nodes_cset = clog_cset['node'][:12]
|
|
if find_parent:
|
|
if nodes_cset == parent_cset:
|
|
found_parent = True
|
|
if not number_forward:
|
|
# When going forward this entry is
|
|
# the given parent
|
|
csets_to_add.append(nodes_cset)
|
|
break
|
|
else:
|
|
if csets_found + 1 > parent_cset:
|
|
found_parent = True
|
|
if not number_forward:
|
|
# When going forward this entry is
|
|
# the given parent (which is supposed
|
|
# to already exist)
|
|
csets_to_add.append(nodes_cset)
|
|
break
|
|
csets_found += 1
|
|
csets_to_add.append(nodes_cset)
|
|
if found_parent == True:
|
|
break
|
|
|
|
clogs_seen += 1
|
|
final_rev = clog_csets_list[-1]['node'][:12]
|
|
|
|
if found_parent:
|
|
self.add_cset_entries(csets_to_add, timestamp=timestamp, number_forward=number_forward)
|
|
else:
|
|
Log.warning(
|
|
"Couldn't find the end of the request for {{request}}. "
|
|
"Max number that can be requested through _fill_in_range is {{maxnum}}.",
|
|
request={
|
|
'parent_cset': parent_cset,
|
|
'child_cset':child_cset,
|
|
'number_forward': number_forward
|
|
},
|
|
maxnum=MAX_BACKFILL_CLOGS * CHANGESETS_PER_CLOG
|
|
)
|
|
return None
|
|
return csets_to_add
|
|
|
|
|
|
def fill_backward_with_list(self, please_stop=None):
|
|
'''
|
|
Expects requests of the tuple form: (parent_cset, timestamp)
|
|
parent_cset can be an int X to go back by X changesets, or
|
|
a string to search for going backwards in time. If timestamp
|
|
is false, no timestamps will be added to the entries.
|
|
:param please_stop:
|
|
:return:
|
|
'''
|
|
while not please_stop:
|
|
try:
|
|
request = self.csets_todo_backwards.pop(till=please_stop)
|
|
if please_stop:
|
|
break
|
|
|
|
# If backfilling is disabled, all requests
|
|
# are ignored.
|
|
if self.disable_backfilling:
|
|
Till(till=CSET_BACKFILL_WAIT_TIME).wait()
|
|
continue
|
|
|
|
if request:
|
|
parent_cset, timestamp = request
|
|
else:
|
|
continue
|
|
|
|
with self.working_locker:
|
|
with self.conn.transaction() as t:
|
|
parent_revnum = self._get_one_revnum(t, parent_cset)
|
|
if parent_revnum:
|
|
continue
|
|
|
|
with self.conn.transaction() as t:
|
|
_, oldest_revision = self.get_tail(t)
|
|
|
|
self._fill_in_range(
|
|
parent_cset,
|
|
oldest_revision,
|
|
timestamp=timestamp,
|
|
number_forward=False
|
|
)
|
|
Log.note("Finished {{cset}}", cset=parent_cset)
|
|
except Exception as e:
|
|
Log.warning("Unknown error occurred during backfill: ", cause=e)
|
|
|
|
|
|
def update_tip(self):
|
|
'''
|
|
Returns False if the tip is already at the newest, or True
|
|
if an update has taken place.
|
|
:return:
|
|
'''
|
|
clog_obj = self._get_clog(self.tuid_service.hg_url / self.config.hg.branch / 'json-log' / 'tip')
|
|
|
|
# Get current tip in DB
|
|
with self.conn.transaction() as t:
|
|
_, newest_known_rev = self.get_tip(t)
|
|
|
|
# If we are still at the newest, wait for CSET_TIP_WAIT_TIME seconds
|
|
# before checking again.
|
|
first_clog_entry = clog_obj['changesets'][0]['node'][:12]
|
|
if newest_known_rev == first_clog_entry:
|
|
return False
|
|
|
|
csets_to_gather = None
|
|
if not newest_known_rev:
|
|
Log.note(
|
|
"No revisions found in table, adding {{minim}} entries...",
|
|
minim=MINIMUM_PERMANENT_CSETS
|
|
)
|
|
csets_to_gather = MINIMUM_PERMANENT_CSETS
|
|
|
|
found_newest_known = False
|
|
csets_to_add = []
|
|
csets_found = 0
|
|
clogs_seen = 0
|
|
Log.note("Found new revisions. Updating csetLog tip to {{rev}}...", rev=first_clog_entry)
|
|
while not found_newest_known and clogs_seen < MAX_TIPFILL_CLOGS:
|
|
clog_csets_list = list(clog_obj['changesets'])
|
|
for clog_cset in clog_csets_list[:-1]:
|
|
nodes_cset = clog_cset['node'][:12]
|
|
if not csets_to_gather:
|
|
if nodes_cset == newest_known_rev:
|
|
found_newest_known = True
|
|
break
|
|
else:
|
|
if csets_found >= csets_to_gather:
|
|
found_newest_known = True
|
|
break
|
|
csets_found += 1
|
|
csets_to_add.append(nodes_cset)
|
|
if not found_newest_known:
|
|
# Get the next page
|
|
clogs_seen += 1
|
|
final_rev = clog_csets_list[-1]['node'][:12]
|
|
clog_url = self.tuid_service.hg_url / self.config.hg.branch / 'json-log' / final_rev
|
|
clog_obj = self._get_clog(clog_url)
|
|
|
|
if clogs_seen >= MAX_TIPFILL_CLOGS:
|
|
Log.error(
|
|
"Too many changesets, can't find last tip or the number is too high: {{rev}}. "
|
|
"Maximum possible to request is {{maxnum}}",
|
|
rev=coalesce(newest_known_rev, csets_to_gather),
|
|
maxnum=MAX_TIPFILL_CLOGS * CHANGESETS_PER_CLOG
|
|
)
|
|
return False
|
|
|
|
with self.working_locker:
|
|
Log.note("Adding {{csets}}", csets=csets_to_add)
|
|
self.add_cset_entries(csets_to_add, timestamp=False)
|
|
return True
|
|
|
|
|
|
def fill_forward_continuous(self, please_stop=None):
|
|
while not please_stop:
|
|
try:
|
|
waiting_a_bit = False
|
|
if self.disable_tipfilling:
|
|
waiting_a_bit = True
|
|
|
|
if not waiting_a_bit:
|
|
# If an update was done, check if there are
|
|
# more changesets that have arrived just in case,
|
|
# otherwise, we wait.
|
|
did_an_update = self.update_tip()
|
|
if not did_an_update:
|
|
waiting_a_bit = True
|
|
|
|
if waiting_a_bit:
|
|
(please_stop | Till(seconds=CSET_TIP_WAIT_TIME)).wait()
|
|
continue
|
|
except Exception as e:
|
|
Log.warning("Unknown error occurred during tip maintenance:", cause=e)
|
|
|
|
|
|
def csetLog_maintenance(self, please_stop=None):
|
|
'''
|
|
Handles deleting old csetLog entries and timestamping
|
|
revisions once they pass the length for permanent
|
|
storage for deletion later.
|
|
:param please_stop:
|
|
:return:
|
|
'''
|
|
while not please_stop:
|
|
try:
|
|
# Wait until something signals the maintenance cycle
|
|
# to begin (or end).
|
|
(self.maintenance_signal | please_stop).wait()
|
|
|
|
if please_stop:
|
|
break
|
|
if self.disable_maintenance:
|
|
continue
|
|
|
|
# Reset signal so we don't request
|
|
# maintenance infinitely.
|
|
with self.maintenance_signal.lock:
|
|
self.maintenance_signal._go = False
|
|
|
|
with self.working_locker:
|
|
all_data = None
|
|
with self.conn.transaction() as t:
|
|
all_data = sorted(
|
|
t.get("SELECT revnum, revision, timestamp FROM csetLog"),
|
|
key=lambda x: int(x[0])
|
|
)
|
|
|
|
# Restore maximum permanents (if overflowing)
|
|
new_data = []
|
|
modified = False
|
|
for count, (revnum, revision, timestamp) in enumerate(all_data[::-1]):
|
|
if count < MINIMUM_PERMANENT_CSETS:
|
|
if timestamp != -1:
|
|
modified = True
|
|
new_data.append((revnum, revision, -1))
|
|
else:
|
|
new_data.append((revnum, revision, timestamp))
|
|
elif type(timestamp) != int or timestamp == -1:
|
|
modified = True
|
|
new_data.append((revnum, revision, int(time.time())))
|
|
else:
|
|
new_data.append((revnum, revision, timestamp))
|
|
|
|
# Delete annotations at revisions with timestamps
|
|
# that are too old. The csetLog entries will have
|
|
# their timestamps reset here.
|
|
new_data1 = []
|
|
annrevs_to_del = []
|
|
current_time = time.time()
|
|
for count, (revnum, revision, timestamp) in enumerate(new_data[::-1]):
|
|
new_timestamp = timestamp
|
|
if timestamp != -1:
|
|
if current_time >= timestamp + TIME_TO_KEEP_ANNOTATIONS.seconds:
|
|
modified = True
|
|
new_timestamp = current_time
|
|
annrevs_to_del.append(revision)
|
|
new_data1.append((revnum, revision, new_timestamp))
|
|
|
|
if len(annrevs_to_del) > 0:
|
|
# Delete any latestFileMod and annotation entries
|
|
# that are too old.
|
|
Log.note(
|
|
"Deleting annotations and latestFileMod for revisions for being "
|
|
"older than {{oldest}}: {{revisions}}",
|
|
oldest=TIME_TO_KEEP_ANNOTATIONS,
|
|
revisions=annrevs_to_del
|
|
)
|
|
with self.conn.transaction() as t:
|
|
t.execute(
|
|
"DELETE FROM latestFileMod WHERE revision IN " +
|
|
quote_set(annrevs_to_del)
|
|
)
|
|
t.execute(
|
|
"DELETE FROM annotations WHERE revision IN " +
|
|
quote_set(annrevs_to_del)
|
|
)
|
|
|
|
# Delete any overflowing entries
|
|
new_data2 = new_data1
|
|
reved_all_data = all_data[::-1]
|
|
deleted_data = reved_all_data[MAXIMUM_NONPERMANENT_CSETS:]
|
|
delete_overflowing_revstart = None
|
|
if len(deleted_data) > 0:
|
|
_, delete_overflowing_revstart, _ = deleted_data[0]
|
|
new_data2 = set(all_data) - set(deleted_data)
|
|
|
|
# Update old frontiers if requested, otherwise
|
|
# they will all get deleted by the csetLog_deleter
|
|
# worker
|
|
if UPDATE_VERY_OLD_FRONTIERS:
|
|
_, max_revision, _ = all_data[-1]
|
|
for _, revision, _ in deleted_data:
|
|
with self.conn.transaction() as t:
|
|
old_files = t.get(
|
|
"SELECT file FROM latestFileMod WHERE revision=?",
|
|
(revision,)
|
|
)
|
|
if old_files is None or len(old_files) <= 0:
|
|
continue
|
|
|
|
self.tuid_service.get_tuids_from_files(
|
|
old_files,
|
|
max_revision,
|
|
going_forward=True,
|
|
)
|
|
|
|
still_exist = True
|
|
while still_exist and not please_stop:
|
|
Till(seconds=TUID_EXISTENCE_WAIT_TIME).wait()
|
|
with self.conn.transaction() as t:
|
|
old_files = t.get(
|
|
"SELECT file FROM latestFileMod WHERE revision=?",
|
|
(revision,)
|
|
)
|
|
if old_files is None or len(old_files) <= 0:
|
|
still_exist = False
|
|
|
|
# Update table and schedule a deletion
|
|
if modified:
|
|
with self.conn.transaction() as t:
|
|
t.execute(
|
|
"INSERT OR REPLACE INTO csetLog (revnum, revision, timestamp) VALUES " +
|
|
sql_list(
|
|
quote_set(cset_entry)
|
|
for cset_entry in new_data2
|
|
)
|
|
)
|
|
if not deleted_data:
|
|
continue
|
|
|
|
Log.note("Scheduling {{num_csets}} for deletion", num_csets=len(deleted_data))
|
|
self.deletions_todo.add(delete_overflowing_revstart)
|
|
except Exception as e:
|
|
Log.warning("Unexpected error occured while maintaining csetLog, continuing to try: ", cause=e)
|
|
return
|
|
|
|
|
|
def csetLog_deleter(self, please_stop=None):
|
|
'''
|
|
Deletes changesets from the csetLog table
|
|
and also changesets from the annotation table
|
|
that have revisions matching the given changesets.
|
|
Accepts lists of csets from self.deletions_todo.
|
|
:param please_stop:
|
|
:return:
|
|
'''
|
|
while not please_stop:
|
|
try:
|
|
request = self.deletions_todo.pop(till=please_stop)
|
|
if please_stop:
|
|
break
|
|
|
|
# If deletion is disabled, ignore the current
|
|
# request - it will need to be re-requested.
|
|
if self.disable_deletion:
|
|
Till(till=CSET_DELETION_WAIT_TIME).wait()
|
|
continue
|
|
|
|
with self.working_locker:
|
|
first_cset = request
|
|
|
|
# Since we are deleting and moving stuff around in the
|
|
# TUID tables, we need everything to be contained in
|
|
# one transaction with no interruptions.
|
|
with self.conn.transaction() as t:
|
|
revnum = self._get_one_revnum(t, first_cset)[0]
|
|
csets_to_del = t.get(
|
|
"SELECT revnum, revision FROM csetLog WHERE revnum <= ?", (revnum,)
|
|
)
|
|
csets_to_del = [cset for _, cset in csets_to_del]
|
|
existing_frontiers = t.query(
|
|
"SELECT revision FROM latestFileMod WHERE revision IN " +
|
|
quote_set(csets_to_del)
|
|
).data
|
|
|
|
existing_frontiers = [existing_frontiers[i][0] for i, _ in enumerate(existing_frontiers)]
|
|
Log.note(
|
|
"Deleting all annotations and changeset log entries with revisions in the list: {{csets}}",
|
|
csets=csets_to_del
|
|
)
|
|
|
|
if len(existing_frontiers) > 0:
|
|
# This handles files which no longer exist anymore in
|
|
# the main branch.
|
|
Log.note(
|
|
"Deleting existing frontiers for revisions: {{revisions}}",
|
|
revisions=existing_frontiers
|
|
)
|
|
t.execute(
|
|
"DELETE FROM latestFileMod WHERE revision IN " +
|
|
quote_set(existing_frontiers)
|
|
)
|
|
|
|
Log.note("Deleting annotations...")
|
|
t.execute(
|
|
"DELETE FROM annotations WHERE revision IN " +
|
|
quote_set(csets_to_del)
|
|
)
|
|
|
|
Log.note(
|
|
"Deleting {{num_entries}} csetLog entries...",
|
|
num_entries=len(csets_to_del)
|
|
)
|
|
t.execute(
|
|
"DELETE FROM csetLog WHERE revision IN " +
|
|
quote_set(csets_to_del)
|
|
)
|
|
|
|
# Recalculate the revnums
|
|
self.recompute_table_revnums()
|
|
except Exception as e:
|
|
Log.warning("Unexpected error occured while deleting from csetLog:", cause=e)
|
|
Till(seconds=CSET_DELETION_WAIT_TIME).wait()
|
|
return
|
|
|
|
|
|
def get_old_cset_revnum(self, revision):
|
|
self.csets_todo_backwards.add((revision, True))
|
|
|
|
revnum = None
|
|
timeout = Till(seconds=BACKFILL_REVNUM_TIMEOUT)
|
|
while not timeout:
|
|
with self.conn.transaction() as t:
|
|
revnum = self._get_one_revnum(t, revision)
|
|
|
|
if revnum and revnum[0] >= 0:
|
|
break
|
|
elif revnum[0] < 0:
|
|
Log.note("Waiting for table to recompute...")
|
|
else:
|
|
Log.note("Waiting for backfill to complete...")
|
|
Till(seconds=CSET_BACKFILL_WAIT_TIME).wait()
|
|
|
|
if timeout:
|
|
Log.error(
|
|
"Cannot find revision {{rev}} after waiting {{timeout}} seconds",
|
|
rev=revision,
|
|
timeout=BACKFILL_REVNUM_TIMEOUT
|
|
)
|
|
return revnum
|
|
|
|
|
|
def get_revnnums_from_range(self, revision1, revision2):
|
|
with self.conn.transaction() as t:
|
|
revnum1 = self._get_one_revnum(t, revision1)
|
|
revnum2 = self._get_one_revnum(t, revision2)
|
|
if not revnum1 or not revnum2:
|
|
did_an_update = self.update_tip()
|
|
if did_an_update:
|
|
with self.conn.transaction() as t:
|
|
revnum1 = self._get_one_revnum(t, revision1)
|
|
revnum2 = self._get_one_revnum(t, revision2)
|
|
|
|
if not revnum1:
|
|
revnum1 = self.get_old_cset_revnum(revision1)
|
|
# Refresh the second entry
|
|
with self.conn.transaction() as t:
|
|
revnum2 = self._get_one_revnum(t, revision2)
|
|
|
|
if not revnum2:
|
|
revnum2 = self.get_old_cset_revnum(revision2)
|
|
|
|
# The first revnum might change also
|
|
with self.conn.transaction() as t:
|
|
revnum1 = self._get_one_revnum(t, revision1)
|
|
|
|
with self.conn.transaction() as t:
|
|
result = self._get_revnum_range(t, revnum1[0], revnum2[0])
|
|
return sorted(
|
|
result,
|
|
key=lambda x: int(x[0])
|
|
)
|