Ensure that downloaded file is terminated by \n + skip invalid lines

This commit is contained in:
Marco Rossi 2019-08-29 15:31:11 -07:00
Родитель 615b28232a
Коммит 3c9b69f9ab
3 изменённых файлов: 21 добавлений и 4 удалений

Просмотреть файл

@ -13,7 +13,7 @@ except ImportError as e:
pip.main(['install', 'azure.storage.blob'])
print('Please re-run script.')
sys.exit()
def valid_date(s):
try:
@ -42,6 +42,19 @@ def cmp_files(f1, f2, start_range_f1=0, start_range_f2=0, erase_checkpoint_line=
return True
prev_b1 = b1
def erase_invalid_end_line(fp):
with open(fp,'rb+') as f:
f.seek(-1, os.SEEK_END)
pos = f.tell()
initial_pos = pos
while pos > 0 and f.read(1) != b'\n':
pos -= 1
f.seek(pos, os.SEEK_SET)
if pos < initial_pos:
f.seek(pos+1, os.SEEK_SET)
f.truncate()
def add_parser_args(parser):
parser.add_argument('-a','--app_id', help="app id (i.e., Azure storage blob container name)", required=True)
parser.add_argument('-l','--log_dir', help="base dir to download data (a subfolder will be created)", required=True)
@ -70,6 +83,8 @@ def add_parser_args(parser):
parser.add_argument('-v','--version', type=int, default=2, help='''version of log downloader to use:
1: for uncooked logs (only for backward compatibility) [deprecated]
2: for cooked logs [default]''')
parser.add_argument('--keep_invalid_eof', help="avoid to erase the last line when it is invalid", action='store_true')
def update_progress(current, total):
barLength = 50 # Length of the progress bar
@ -79,7 +94,7 @@ def update_progress(current, total):
sys.stdout.write(text)
sys.stdout.flush()
def download_container(app_id, log_dir, container=None, conn_string=None, account_name=None, sas_token=None, start_date=None, end_date=None, overwrite_mode=0, dry_run=False, version=2, verbose=False, create_gzip_mode=-1, delta_mod_t=3600, max_connections=4, confirm=False, report_progress=True, if_match=None):
def download_container(app_id, log_dir, container=None, conn_string=None, account_name=None, sas_token=None, start_date=None, end_date=None, overwrite_mode=0, dry_run=False, version=2, verbose=False, create_gzip_mode=-1, delta_mod_t=3600, max_connections=4, confirm=False, report_progress=True, if_match=None, keep_invalid_eof=False):
t_start = time.time()
if not container:
container=app_id
@ -240,6 +255,8 @@ def download_container(app_id, log_dir, container=None, conn_string=None, accoun
download_time = time.time()-t0
download_size_MB = os.path.getsize(fp)/(1024**2) # file size in MB
print('\nDownloaded {:.3f} MB in {:.1f} sec. ({:.3f} MB/sec)\n'.format(download_size_MB, download_time, download_size_MB/download_time))
if not keep_invalid_eof:
erase_invalid_end_line(fp)
except Exception as e:
print('Error: {}'.format(e))

Просмотреть файл

@ -141,7 +141,7 @@ def create_stats(log_fp, d=None, predictions_files=None):
else:
ds_parse.update_progress(bytes_count,tot_bytes)
if x.startswith(b'{"_label_cost":'):
if x.startswith(b'{"_label_cost":') and x.strip().endswith(b'}'):
data = ds_parse.json_cooked(x)
# Skip not activated lines or wrongly formated lines

Просмотреть файл

@ -57,7 +57,7 @@ def process_dsjson_file(fp, d=None, e=None):
if x.startswith(b'['): # Ignore checkpoint info line
continue
if not (x.startswith(b'{"') or x.strip().endswith(b'}')):
if not (x.startswith(b'{"') and x.strip().endswith(b'}')):
print('Corrupted line: {}'.format(x))
continue