зеркало из https://github.com/mozilla/DSAlign.git
This commit is contained in:
Родитель
ed6a2a15c3
Коммит
a7025d5ab6
149
align/align.py
149
align/align.py
|
@ -143,10 +143,10 @@ def main(args):
|
|||
with open(args.transcript, 'r') as transcript_file:
|
||||
original_transcript = transcript_file.read()
|
||||
tc = TextCleaner(original_transcript,
|
||||
alphabet,
|
||||
dashes_to_ws=not args.text_keep_dashes,
|
||||
normalize_space=not args.text_keep_ws,
|
||||
to_lower=not args.text_keep_casing)
|
||||
alphabet,
|
||||
dashes_to_ws=not args.text_keep_dashes,
|
||||
normalize_space=not args.text_keep_ws,
|
||||
to_lower=not args.text_keep_casing)
|
||||
clean_text_path = args.transcript + '.clean'
|
||||
with open(clean_text_path, 'w') as clean_text_file:
|
||||
clean_text_file.write(tc.clean_text)
|
||||
|
@ -241,15 +241,9 @@ def main(args):
|
|||
search = FuzzySearch(tc.clean_text,
|
||||
max_candidates=args.align_max_candidates,
|
||||
candidate_threshold=args.align_candidate_threshold,
|
||||
snap_to_word=not args.align_no_snap,
|
||||
snap_radius=args.align_snap_radius,
|
||||
match_score=args.align_match_score,
|
||||
mismatch_score=args.align_mismatch_score,
|
||||
gap_score=args.align_gap_score,
|
||||
min_ngram_size=args.align_min_ngram_size,
|
||||
max_ngram_size=args.align_max_ngram_size,
|
||||
size_factor=args.align_ngram_size_factor,
|
||||
position_factor=args.align_ngram_position_factor)
|
||||
gap_score=args.align_gap_score)
|
||||
result_fragments = []
|
||||
substitutions = Counter()
|
||||
statistics = Counter()
|
||||
|
@ -283,28 +277,107 @@ def main(args):
|
|||
return True
|
||||
return False
|
||||
|
||||
for index, fragment in enumerate(fragments):
|
||||
fragment['index'] = index
|
||||
|
||||
def split_match(fragments, start=0, end=-1):
|
||||
n = len(fragments)
|
||||
if n == 0:
|
||||
return
|
||||
weighted_fragments = enweight(enumerate(fragments))
|
||||
# assigns high values to long statements near the center of the list
|
||||
weighted_fragments = map(lambda fw: (fw[0], (1 - fw[1]) * len(fw[0][1]['transcript'])), weighted_fragments)
|
||||
# highest first
|
||||
weighted_fragments = sorted(weighted_fragments, key=lambda fw: fw[1], reverse=True)
|
||||
for fragment, _ in weighted_fragments:
|
||||
index, fragment = fragment
|
||||
if n < 1:
|
||||
raise StopIteration
|
||||
elif n == 1:
|
||||
weighted_fragments = [(0, fragments[0])]
|
||||
else:
|
||||
# so we later know the original index of each fragment
|
||||
weighted_fragments = enumerate(fragments)
|
||||
# assigns high values to long statements near the center of the list
|
||||
weighted_fragments = enweight(weighted_fragments)
|
||||
weighted_fragments = map(lambda fw: (fw[0], (1 - fw[1]) * len(fw[0][1]['transcript'])), weighted_fragments)
|
||||
# fragments with highest weights first
|
||||
weighted_fragments = sorted(weighted_fragments, key=lambda fw: fw[1], reverse=True)
|
||||
# strip weights
|
||||
weighted_fragments = map(lambda fw: fw[0], weighted_fragments)
|
||||
for index, fragment in weighted_fragments:
|
||||
match = search.find_best(fragment['transcript'], start=start, end=end)
|
||||
match_interval, sws_score, match_substitutions = match
|
||||
match_start, match_end, sws_score, match_substitutions = match
|
||||
if sws_score > 0.1:
|
||||
fragment['match'] = match
|
||||
split_match(fragments[0:index], start=start, end=match_interval.start)
|
||||
split_match(fragments[index + 1:], start=match_interval.end, end=end)
|
||||
return
|
||||
fragment['match_start'] = match_start
|
||||
fragment['match_end'] = match_end
|
||||
fragment['sws_score'] = sws_score
|
||||
fragment['substitutions'] = match_substitutions
|
||||
for f in split_match(fragments[0:index], start=start, end=match_start):
|
||||
yield f
|
||||
yield fragment
|
||||
for f in split_match(fragments[index + 1:], start=match_end, end=end):
|
||||
yield f
|
||||
raise StopIteration
|
||||
|
||||
split_match(fragments)
|
||||
matched_fragments = list(split_match(fragments))
|
||||
|
||||
for index, fragment in enumerate(fragments):
|
||||
def phrase_similarity(a, b, direction):
|
||||
return similarity(a,
|
||||
b,
|
||||
direction=direction,
|
||||
min_ngram_size=args.align_min_ngram_size,
|
||||
max_ngram_size=args.align_max_ngram_size,
|
||||
size_factor=args.align_ngram_size_factor,
|
||||
position_factor=args.align_ngram_position_factor)
|
||||
|
||||
def stretch(a, b, c, direction):
|
||||
best_s = 0
|
||||
best_i = 0
|
||||
for i in range(len(c)):
|
||||
s = phrase_similarity(a, b + c[:i], 1) if direction > 0 else phrase_similarity(a, c[-i:] + b, -1)
|
||||
if s > best_s:
|
||||
best_s = s
|
||||
best_i = i
|
||||
return best_i
|
||||
|
||||
for index in range(len(matched_fragments) - 1):
|
||||
a, b = matched_fragments[index], matched_fragments[index + 1]
|
||||
a_start, a_end = a['match_start'], a['match_end']
|
||||
b_start, b_end = b['match_start'], b['match_end']
|
||||
assert a_end <= b_start
|
||||
gap_text = search.text[a_end:b_start]
|
||||
if a_end == b_start or len(gap_text.strip()) == 0:
|
||||
continue
|
||||
a_trans, b_trans = a['transcript'], b['transcript']
|
||||
a_match, b_match = search.text[a_start:a_end], search.text[b_start:b_end]
|
||||
|
||||
new_a_end = a_end + stretch(a_trans, a_match, gap_text, 1)
|
||||
new_b_start = b_start - stretch(a_trans, a_match, gap_text, -1)
|
||||
|
||||
|
||||
print(a['transcript'], '-', b['transcript'])
|
||||
print('{}<{}>{}'.format(a_match[-10:], gap_text, b_match[:10]))
|
||||
print('{}<{}>{}'.format(search.text[new_a_end-10:new_a_end][-10:],
|
||||
search.text[new_a_end:new_b_start],
|
||||
search.text[new_b_start:new_b_start+10]))
|
||||
print()
|
||||
|
||||
|
||||
# min_ngram_size = args.align_min_ngram_size
|
||||
# max_ngram_size = args.align_max_ngram_size
|
||||
# stretch_factor = args.align_stretch_factor
|
||||
# size_factor = args.align_ngram_size_factor
|
||||
# position_factor = args.align_ngram_position_factor
|
||||
# snap_to_word = not args.align_no_snap
|
||||
|
||||
#start_token = TextRange.token_at(self.text, start)
|
||||
#if len(start_token) == 0:
|
||||
# start_token = TextRange.token_at(self.text, start + 1)
|
||||
#end_token = TextRange.token_at(self.text, max(0, end - 1))
|
||||
#if self.snap_radius > 0:
|
||||
# look_for = look_for.split(' ')
|
||||
# lf_start, lf_end = look_for[0], look_for[-1]
|
||||
# start_token = self.extend(lf_start, start_token, -1)
|
||||
# end_token = self.extend(lf_end, end_token, 1)
|
||||
#snap_range = start_token + end_token
|
||||
#return snap_range.start, snap_range.end
|
||||
|
||||
return
|
||||
|
||||
for fragment in fragments:
|
||||
index = fragment['index']
|
||||
time_start = fragment['time-start']
|
||||
time_length = fragment['time-length']
|
||||
fragment_transcript = fragment['transcript']
|
||||
|
@ -319,25 +392,25 @@ def main(args):
|
|||
if args.output_stt:
|
||||
result_fragment['stt'] = fragment_transcript
|
||||
|
||||
if 'match' not in fragment:
|
||||
if 'match_start' not in fragment:
|
||||
skip(index, 'No match for transcript')
|
||||
continue
|
||||
match_range, sws_score, match_substitutions = fragment['match']
|
||||
original_start = tc.get_original_offset(match_range.start)
|
||||
original_end = tc.get_original_offset(match_range.end)
|
||||
match_start, match_end = fragment['match_start'], fragment['match_end']
|
||||
original_start = tc.get_original_offset(match_start)
|
||||
original_end = tc.get_original_offset(match_end)
|
||||
result_fragment['text-start'] = original_start
|
||||
result_fragment['text-length'] = original_end - original_start
|
||||
|
||||
if args.output_aligned_raw:
|
||||
result_fragment['aligned-raw'] = original_transcript[original_start:original_end]
|
||||
substitutions += match_substitutions
|
||||
fragment_matched = tc.clean_text[match_range.start:match_range.end]
|
||||
|
||||
fragment_matched = tc.clean_text[match_start:match_end]
|
||||
if should_skip('mlen', index, sample_numbers, lambda: len(fragment_matched)):
|
||||
continue
|
||||
if args.output_aligned:
|
||||
result_fragment['aligned'] = fragment_matched
|
||||
|
||||
if should_skip('SWS', index, sample_numbers, lambda: 100 * sws_score):
|
||||
if should_skip('SWS', index, sample_numbers, lambda: 100 * fragment['sws_score']):
|
||||
continue
|
||||
|
||||
if should_skip('WNG', index, sample_numbers,
|
||||
|
@ -359,14 +432,16 @@ def main(args):
|
|||
len(fragment_matched.split())):
|
||||
continue
|
||||
|
||||
substitutions += fragment['substitutions']
|
||||
|
||||
result_fragments.append(result_fragment)
|
||||
logging.debug('Fragment %d aligned with %s' % (index, ' '.join(sample_numbers)))
|
||||
logging.debug('- T: ' + args.text_context * ' ' + '"%s"' % fragment_transcript)
|
||||
logging.debug('- O: %s|%s|%s' % (
|
||||
tc.clean_text[match_range.start-args.text_context:match_range.start],
|
||||
tc.clean_text[match_start-args.text_context:match_start],
|
||||
fragment_matched,
|
||||
tc.clean_text[match_range.end:match_range.end+args.text_context]))
|
||||
start = match_range.end
|
||||
tc.clean_text[match_end:match_end+args.text_context]))
|
||||
start = match_end
|
||||
if args.play:
|
||||
subprocess.check_call(['play',
|
||||
'--no-show-progress',
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
from collections import Counter
|
||||
from text import TextRange, ngrams, similarity
|
||||
from text import ngrams, similarity
|
||||
|
||||
|
||||
class FuzzySearch(object):
|
||||
|
@ -7,12 +7,6 @@ class FuzzySearch(object):
|
|||
text,
|
||||
max_candidates=10,
|
||||
candidate_threshold=0.92,
|
||||
snap_to_word=True,
|
||||
snap_radius=0,
|
||||
min_ngram_size=1,
|
||||
max_ngram_size=3,
|
||||
size_factor=1,
|
||||
position_factor=3,
|
||||
match_score=100,
|
||||
mismatch_score=-100,
|
||||
gap_score=-100,
|
||||
|
@ -20,12 +14,6 @@ class FuzzySearch(object):
|
|||
self.text = text
|
||||
self.max_candidates = max_candidates
|
||||
self.candidate_threshold = candidate_threshold
|
||||
self.snap_to_word = snap_to_word
|
||||
self.snap_radius = snap_radius
|
||||
self.min_ngram_size = min_ngram_size
|
||||
self.max_ngram_size = max_ngram_size
|
||||
self.size_factor = size_factor
|
||||
self.position_factor = position_factor
|
||||
self.match_score = match_score
|
||||
self.mismatch_score = mismatch_score
|
||||
self.gap_score = gap_score
|
||||
|
@ -86,56 +74,15 @@ class FuzzySearch(object):
|
|||
j -= 1
|
||||
else:
|
||||
raise Exception('Smith–Waterman failure')
|
||||
return start + j - 1, start + start_j, f[start_i][start_j], substitutions
|
||||
|
||||
def phrase_similarity(self, a, b, direction):
|
||||
return similarity(a,
|
||||
b,
|
||||
direction=direction,
|
||||
min_ngram_size=self.min_ngram_size,
|
||||
max_ngram_size=self.max_ngram_size,
|
||||
size_factor=self.size_factor,
|
||||
position_factor=self.position_factor)
|
||||
|
||||
def extend(self, target, start_token, direction):
|
||||
best_similarity = 0
|
||||
current_token = best_token = start_token
|
||||
for i in range(self.snap_radius + 1):
|
||||
# current_similarity = self.phrase_similarity(current_token.get_text(), target, direction)
|
||||
# if current_similarity > best_similarity:
|
||||
# best_similarity = current_similarity
|
||||
# best_token = current_token
|
||||
current_similarity = self.phrase_similarity((current_token + start_token).get_text(), target, direction)
|
||||
if current_similarity > best_similarity:
|
||||
best_similarity = current_similarity
|
||||
best_token = current_token
|
||||
current_token = current_token.neighbour_token(direction)
|
||||
return best_token
|
||||
|
||||
def snap(self, look_for, start, end):
|
||||
start_token = TextRange.token_at(self.text, start)
|
||||
if len(start_token) == 0:
|
||||
start_token = TextRange.token_at(self.text, start + 1)
|
||||
end_token = TextRange.token_at(self.text, max(0, end - 1))
|
||||
if self.snap_radius > 0:
|
||||
look_for = look_for.split(' ')
|
||||
lf_start, lf_end = look_for[0], look_for[-1]
|
||||
start_token = self.extend(lf_start, start_token, -1)
|
||||
end_token = self.extend(lf_end, end_token, 1)
|
||||
snap_range = start_token + end_token
|
||||
return snap_range.start, snap_range.end
|
||||
|
||||
def find_best_in_interval(self, look_for, start, end):
|
||||
interval_start, interval_end, score, substitutions = self.sw_align(look_for, start, end)
|
||||
score = score / (self.match_score * max(interval_end - interval_start, len(look_for)))
|
||||
if self.snap_to_word:
|
||||
interval_start, interval_end = self.snap(look_for, interval_start, interval_end)
|
||||
return TextRange(self.text, interval_start, interval_end), score, substitutions
|
||||
align_start = max(start, start + j - 1)
|
||||
align_end = min(end, start + start_j)
|
||||
score = f[start_i][start_j] / (self.match_score * max(align_end - align_start, n))
|
||||
return align_start, align_end, score, substitutions
|
||||
|
||||
def find_best(self, look_for, start=0, end=-1):
|
||||
end = len(self.text) if end < 0 else end
|
||||
if end - start < 2 * len(look_for):
|
||||
return self.find_best_in_interval(look_for, start, end)
|
||||
return self.sw_align(look_for, start, end)
|
||||
window_size = len(look_for)
|
||||
windows = {}
|
||||
for i, ngram in enumerate(ngrams(' ' + look_for + ' ', 3)):
|
||||
|
@ -147,9 +94,7 @@ class FuzzySearch(object):
|
|||
window = occurrence // window_size
|
||||
windows[window] = (windows[window] + 1) if window in windows else 1
|
||||
candidate_windows = sorted(windows.keys(), key=lambda w: windows[w], reverse=True)
|
||||
best_interval = None
|
||||
best_substitutions = None
|
||||
best_score = 0
|
||||
best = (-1, -1, 0, None)
|
||||
last_window_grams = 0.1
|
||||
for window in candidate_windows[:self.max_candidates]:
|
||||
ngram_factor = (windows[window] / last_window_grams)
|
||||
|
@ -158,10 +103,7 @@ class FuzzySearch(object):
|
|||
last_window_grams = windows[window]
|
||||
interval_start = max(start, int((window - 1) * window_size))
|
||||
interval_end = min(end, int((window + 2) * window_size))
|
||||
interval, score, substitutions = self.find_best_in_interval(look_for, interval_start, interval_end)
|
||||
if score > best_score:
|
||||
best_interval = interval
|
||||
best_score = score
|
||||
best_substitutions = substitutions
|
||||
|
||||
return best_interval, best_score, best_substitutions
|
||||
search_result = self.sw_align(look_for, interval_start, interval_end)
|
||||
if search_result[2] > best[2]:
|
||||
best = search_result
|
||||
return best
|
||||
|
|
Загрузка…
Ссылка в новой задаче