Add encoding cache and lazy-train mechanism (#50)

* Add new config about knowledge distillation for query binary classifier

* remove inferenced result in knowledge distillation for query binary classifier

* Add AUC.py in tools folder

* Add test_data_path into conf_kdqbc_bilstmattn_cnn.json

* Modify AUC.py

* Rename AUC.py into calculate_AUC.py

* Modify test&calculate AUC commands for Knowledge Distillation for Query Binary Classifier

* Add cpu_thread_num parameter in conf.training_params

* Rename cpu_thread_num into cpu_num_workers

* update comments in ModelConf.py

* Add cup_num_workers in model_zoo/advanced/conf.json

* Add the description of cpu_num_workers in Tutorial.md

* Update inference speed of compressed model

* Add ProcessorsScheduler Class

* Add license in ProcessorScheduler.py

* use lazy loading instead of one-off loading

* Remove Debug Info in problem.py

* use open instead of codecs.open

* update the inference of build dictionary for classification

* add md5 function in common_utils.py

* add merge_encode_* function

* update typo

* update typo

* reorg the logical flow in train.py

* remove dummy comments in problem.py

* add encoding cache mechanism

* add lazy-load mechanism for training phase

* enumerate problem types in problem.py

* remove data_encoding.py

* add lazy load train logic

* Modify comment and remove debug code

* Judge if test_path exists

* fix parameter missing when use char embedding

* merge master

* add file_column_num in problem.py

* merge add_encoding_cache branch

* add SST-2 in .gitignore

* merge master

* use steps_per_validation instead of valid_times_per_epoch

* Fix Learning Rate decay logic bug

* add log of calculating md5 of training data

* fix multi-gpu char_emb OOM problem & add char leval fix_lengths

* Modify batch_num_to_show_results in multi-gpu

* Modify batch_num_to_show_results

* delete deepcopy in get_batches

* add new parameters chunk_size and max_building_lines in conf and update tutorials
This commit is contained in:
Flyer Cheng 2019-08-02 20:59:00 +08:00 коммит произвёл L.J. SHOU
Родитель db26940fb6
Коммит 58ad563a23
15 изменённых файлов: 590 добавлений и 362 удалений

3
.gitignore поставляемый
Просмотреть файл

@ -5,4 +5,5 @@
*.vs*
dataset/GloVe/
dataset/20_newsgroups/
models/
dataset/SST-2/
models/

Просмотреть файл

@ -13,7 +13,7 @@ import codecs
import pickle as pkl
from utils.common_utils import dump_to_pkl, load_from_pkl, get_param_num, get_trainable_param_num, \
transfer_to_gpu, transform_params2tensors, get_layer_class
transfer_to_gpu, transform_params2tensors, get_layer_class, load_from_json, dump_to_json
from utils.philly_utils import HDFSDirectTransferer, open_and_move, convert_to_tmppath, \
convert_to_hdfspath, move_from_local_to_hdfs
from Model import Model
@ -22,7 +22,7 @@ from metrics.Evaluator import Evaluator
from utils.corpus_utils import get_batches
from core.StreamingRecorder import StreamingRecorder
from core.LRScheduler import LRScheduler
from settings import ProblemTypes
from settings import ProblemTypes, Setting as st
from block_zoo import Linear
from block_zoo import CRF
from losses.CRFLoss import CRFLoss
@ -93,33 +93,17 @@ class LearningMachine(object):
def train(self, optimizer, loss_fn):
self.model.train()
logging.info("="*100 + '\n' + "*"*15 + 'Prepare data for training' + "*"*15)
if not self.conf.train_data_path.endswith('.pkl'):
train_data, train_length, train_target = self.problem.encode(self.conf.train_data_path, self.conf.file_columns,
self.conf.input_types, self.conf.file_with_col_header, self.conf.object_inputs, self.conf.answer_column_name, max_lengths=self.conf.max_lengths,
min_sentence_len = self.conf.min_sentence_len, extra_feature=self.conf.extra_feature,fixed_lengths=self.conf.fixed_lengths, file_format='tsv',
show_progress=True if self.conf.mode == 'normal' else False, cpu_num_workers=self.conf.cpu_num_workers)
else:
train_pkl_data = load_from_pkl(self.conf.train_data_path)
train_data, train_length, train_target = train_pkl_data['data'], train_pkl_data['length'], train_pkl_data['target']
if not self.conf.valid_data_path.endswith('.pkl'):
valid_data, valid_length, valid_target = self.problem.encode(self.conf.valid_data_path, self.conf.file_columns,
self.conf.input_types, self.conf.file_with_col_header, self.conf.object_inputs, self.conf.answer_column_name, max_lengths=self.conf.max_lengths,
min_sentence_len = self.conf.min_sentence_len, extra_feature = self.conf.extra_feature,fixed_lengths=self.conf.fixed_lengths, file_format='tsv',
show_progress=True if self.conf.mode == 'normal' else False, cpu_num_workers=self.conf.cpu_num_workers)
else:
valid_pkl_data = load_from_pkl(self.conf.valid_data_path)
valid_data, valid_length, valid_target = valid_pkl_data['data'], valid_pkl_data['length'], valid_pkl_data['target']
valid_data, valid_length, valid_target = self.problem.encode(self.conf.valid_data_path, self.conf.file_columns,
self.conf.input_types, self.conf.file_with_col_header, self.conf.object_inputs, self.conf.answer_column_name, max_lengths=self.conf.max_lengths,
min_sentence_len = self.conf.min_sentence_len, extra_feature = self.conf.extra_feature,fixed_lengths=self.conf.fixed_lengths, file_format='tsv',
show_progress=True if self.conf.mode == 'normal' else False, cpu_num_workers=self.conf.cpu_num_workers, chunk_size=self.conf.chunk_size)
if self.conf.test_data_path is not None:
if not self.conf.test_data_path.endswith('.pkl'):
test_data, test_length, test_target = self.problem.encode(self.conf.test_data_path, self.conf.file_columns, self.conf.input_types,
self.conf.file_with_col_header, self.conf.object_inputs, self.conf.answer_column_name, max_lengths=self.conf.max_lengths,
min_sentence_len = self.conf.min_sentence_len, extra_feature = self.conf.extra_feature,fixed_lengths=self.conf.fixed_lengths,
file_format='tsv', show_progress=True if self.conf.mode == 'normal' else False, cpu_num_workers=self.conf.cpu_num_workers)
else:
test_pkl_data = load_from_pkl(self.conf.test_data_path)
test_data, test_length, test_target = test_pkl_data['data'], test_pkl_data['length'], test_pkl_data['target']
test_data, test_length, test_target = self.problem.encode(self.conf.test_data_path, self.conf.file_columns,
self.conf.input_types, self.conf.file_with_col_header, self.conf.object_inputs, self.conf.answer_column_name, max_lengths=self.conf.max_lengths,
min_sentence_len = self.conf.min_sentence_len, extra_feature = self.conf.extra_feature,fixed_lengths=self.conf.fixed_lengths, file_format='tsv',
show_progress=True if self.conf.mode == 'normal' else False, cpu_num_workers=self.conf.cpu_num_workers, chunk_size=self.conf.chunk_size)
stop_training = False
epoch = 1
@ -139,212 +123,216 @@ class LearningMachine(object):
logging.info("=" * 100 + '\n' + "*" * 15 + 'Start training' + "*" * 15)
while not stop_training and epoch <= self.conf.max_epoch:
logging.info('Training: Epoch ' + str(epoch))
train_data_generator = self._get_training_data_generator()
part_index = 1
for train_data, train_length, train_target in train_data_generator:
logging.debug('Training: Epoch %s Part %s'%(epoch, part_index))
part_index += 1
data_batches, length_batches, target_batches = \
get_batches(self.problem, train_data, train_length, train_target, self.conf.batch_size_total,
self.conf.input_types, None, permutate=True, transform_tensor=True)
data_batches, length_batches, target_batches = \
get_batches(self.problem, train_data, train_length, train_target, self.conf.batch_size_total,
self.conf.input_types, None, permutate=True, transform_tensor=True)
whole_batch_num = len(target_batches)
valid_batch_num = max(len(target_batches) // self.conf.valid_times_per_epoch, 1)
if torch.cuda.device_count() > 1:
small_batch_num = whole_batch_num * torch.cuda.device_count() # total batch num over all the gpus
valid_batch_num_show = valid_batch_num * torch.cuda.device_count() # total batch num over all the gpus to do validation
else:
whole_batch_num = len(target_batches)
valid_batch_num = min(self.conf.steps_per_validation, whole_batch_num)
small_batch_num = whole_batch_num
valid_batch_num_show = valid_batch_num
batch_num_to_show_results = self.conf.batch_num_to_show_results
if torch.cuda.device_count() > 1:
batch_num_to_show_results *= torch.cuda.device_count() # total batch num overall all the gpus to log
small_batch_num *= torch.cuda.device_count() # total batch num over all the gpus
valid_batch_num_show *= torch.cuda.device_count() # total batch num over all the gpus to do validation
streaming_recoder.clear_records()
all_costs = []
streaming_recoder.clear_records()
all_costs = []
logging.info('There are %d batches during current period; validation are conducted every %d batch' % (small_batch_num, valid_batch_num_show))
logging.info('There are %d batches during an epoch; validation are conducted every %d batch' % (small_batch_num, valid_batch_num_show))
if self.conf.mode == 'normal':
progress = tqdm(range(len(target_batches)))
elif self.conf.mode == 'philly':
progress = range(len(target_batches))
for i in progress:
# the result shape: for classification: [batch_size, # of classes]; for sequence tagging: [batch_size, seq_len, # of tags]
param_list, inputs_desc, length_desc = transform_params2tensors(data_batches[i], length_batches[i])
logits = self.model(inputs_desc, length_desc, *param_list)
if self.conf.mode == 'normal':
progress = tqdm(range(len(target_batches)))
elif self.conf.mode == 'philly':
progress = range(len(target_batches))
for i in progress:
# the result shape: for classification: [batch_size, # of classes]; for sequence tagging: [batch_size, seq_len, # of tags]
param_list, inputs_desc, length_desc = transform_params2tensors(data_batches[i], length_batches[i])
logits = self.model(inputs_desc, length_desc, *param_list)
logits_softmax = {}
if isinstance(self.model, nn.DataParallel):
for tmp_output_layer_id in self.model.module.output_layer_id:
if isinstance(self.model.module.layers[tmp_output_layer_id], Linear) and \
(not self.model.module.layers[tmp_output_layer_id].layer_conf.last_hidden_softmax):
logits_softmax[tmp_output_layer_id] = nn.functional.softmax(
logits[tmp_output_layer_id], dim=-1)
elif isinstance(get_layer_class(self.model, tmp_output_layer_id), CRF):
pass
else:
logits_softmax[tmp_output_layer_id] = logits[tmp_output_layer_id]
else:
for tmp_output_layer_id in self.model.output_layer_id:
if isinstance(self.model.layers[tmp_output_layer_id], Linear) and \
(not self.model.layers[tmp_output_layer_id].layer_conf.last_hidden_softmax):
logits_softmax[tmp_output_layer_id] = nn.functional.softmax(
logits[tmp_output_layer_id], dim=-1)
elif isinstance(get_layer_class(self.model, tmp_output_layer_id), CRF):
pass
else:
logits_softmax[tmp_output_layer_id] = logits[tmp_output_layer_id]
# check the output
if ProblemTypes[self.problem.problem_type] == ProblemTypes.classification:
logits = list(logits.values())[0]
logits_softmax = list(logits_softmax.values())[0]
assert len(logits_softmax.shape) == 2, 'The dimension of your output is %s, but we need [batch_size*GPUs, class num]' % (str(list(logits_softmax.shape)))
assert logits_softmax.shape[1] == self.problem.output_target_num(), 'The dimension of your output layer %d is inconsistent with your type number %d!' % (logits_softmax.shape[1], self.problem.output_target_num())
# for auc metric
prediction_scores = logits_softmax[:, self.conf.pos_label].cpu().data.numpy()
if self.evaluator.has_auc_type_specific:
prediction_scores_all = logits_softmax.cpu().data.numpy()
else:
prediction_scores_all = None
elif ProblemTypes[self.problem.problem_type] == ProblemTypes.sequence_tagging:
logits = list(logits.values())[0]
if not isinstance(get_layer_class(self.model, tmp_output_layer_id), CRF):
logits_softmax = list(logits_softmax.values())[0]
assert len(logits_softmax.shape) == 3, 'The dimension of your output is %s, but we need [batch_size*GPUs, sequence length, representation dim]' % (str(list(logits_softmax.shape)), )
prediction_scores = None
prediction_scores_all = None
elif ProblemTypes[self.problem.problem_type] == ProblemTypes.regression:
logits = list(logits.values())[0]
logits_softmax = list(logits_softmax.values())[0]
assert len(logits_softmax.shape) == 2 and logits_softmax.shape[1] == 1, 'The dimension of your output is %s, but we need [batch_size*GPUs, 1]' % (str(list(logits_softmax.shape)))
prediction_scores = None
prediction_scores_all = None
elif ProblemTypes[self.problem.problem_type] == ProblemTypes.mrc:
for single_value in logits_softmax.values():
assert len(single_value.shape) == 3, 'The dimension of your output is %s, but we need [batch_size*GPUs, sequence_len, 1]' % (str(list(single_value.shape)))
prediction_scores = None
prediction_scores_all = None
logits_flat = dict()
if ProblemTypes[self.problem.problem_type] == ProblemTypes.sequence_tagging:
# Transform output shapes for metric evaluation
# for seq_tag_f1 metric
if isinstance(get_layer_class(self.model, tmp_output_layer_id), CRF):
forward_score, scores, masks, tag_seq, transitions, layer_conf = logits
prediction_indices = tag_seq.cpu().numpy()
streaming_recoder.record_one_row([self.problem.decode(prediction_indices, length_batches[i]['target'][self.conf.answer_column_name[0]].numpy()),
prediction_scores, self.problem.decode(
target_batches[i][self.conf.answer_column_name[0]],
length_batches[i]['target'][self.conf.answer_column_name[0]].numpy())], keep_dim=False)
else:
prediction_indices = logits_softmax.data.max(2)[1].cpu().numpy() # [batch_size, seq_len]
# pytorch's CrossEntropyLoss only support this
logits_flat[self.conf.output_layer_id[0]] = logits.view(-1, logits.size(2)) # [batch_size * seq_len, # of tags]
streaming_recoder.record_one_row([self.problem.decode(prediction_indices, length_batches[i]['target'][self.conf.answer_column_name[0]].numpy()),
prediction_scores, self.problem.decode(
target_batches[i][self.conf.answer_column_name[0]],
length_batches[i]['target'][self.conf.answer_column_name[0]].numpy())], keep_dim=False)
target_batches[i][self.conf.answer_column_name[0]] = target_batches[i][
self.conf.answer_column_name[0]].reshape(-1)
elif ProblemTypes[self.problem.problem_type] == ProblemTypes.classification:
prediction_indices = logits_softmax.detach().max(1)[1].cpu().numpy()
# Should not decode!
streaming_recoder.record_one_row([prediction_indices, prediction_scores, prediction_scores_all, target_batches[i][self.conf.answer_column_name[0]].numpy()])
logits_flat[self.conf.output_layer_id[0]] = logits
elif ProblemTypes[self.problem.problem_type] == ProblemTypes.regression:
temp_logits_flat = logits.squeeze(1)
prediction_scores = temp_logits_flat.detach().cpu().numpy()
streaming_recoder.record_one_row([prediction_scores, target_batches[i][self.conf.answer_column_name[0]].numpy()])
logits_flat[self.conf.output_layer_id[0]] = temp_logits_flat
elif ProblemTypes[self.problem.problem_type] == ProblemTypes.mrc:
for key, value in logits.items():
logits[key] = value.squeeze()
for key, value in logits_softmax.items():
logits_softmax[key] = value.squeeze()
passage_identify = None
for type_key in data_batches[i].keys():
if 'p' in type_key.lower():
passage_identify = type_key
break
if not passage_identify:
raise Exception('MRC task need passage information.')
prediction = self.problem.decode(logits_softmax, lengths=length_batches[i][passage_identify],
batch_data=data_batches[i][passage_identify])
logits_flat = logits
mrc_answer_target = None
for single_target in target_batches[i]:
if isinstance(target_batches[i][single_target][0], str):
mrc_answer_target = target_batches[i][single_target]
streaming_recoder.record_one_row([prediction, mrc_answer_target])
if self.use_gpu:
for single_target in self.conf.answer_column_name:
if isinstance(target_batches[i][single_target], torch.Tensor):
target_batches[i][single_target] = transfer_to_gpu(target_batches[i][single_target])
if isinstance(loss_fn.loss_fn[0], CRFLoss):
loss = loss_fn.loss_fn[0](forward_score, scores, masks, list(target_batches[i].values())[0], transitions, layer_conf)
else:
loss = loss_fn(logits_flat, target_batches[i])
all_costs.append(loss.item())
optimizer.zero_grad()
loss.backward()
if self.conf.clip_grad_norm_max_norm != -1:
torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.conf.clip_grad_norm_max_norm)
logits_softmax = {}
if isinstance(self.model, nn.DataParallel):
torch.nn.utils.clip_grad_norm_(self.model.module.layers['embedding'].get_parameters(), self.conf.clip_grad_norm_max_norm)
for tmp_output_layer_id in self.model.module.output_layer_id:
if isinstance(self.model.module.layers[tmp_output_layer_id], Linear) and \
(not self.model.module.layers[tmp_output_layer_id].layer_conf.last_hidden_softmax):
logits_softmax[tmp_output_layer_id] = nn.functional.softmax(
logits[tmp_output_layer_id], dim=-1)
elif isinstance(get_layer_class(self.model, tmp_output_layer_id), CRF):
pass
else:
logits_softmax[tmp_output_layer_id] = logits[tmp_output_layer_id]
else:
torch.nn.utils.clip_grad_norm_(self.model.layers['embedding'].get_parameters(), self.conf.clip_grad_norm_max_norm)
optimizer.step()
for tmp_output_layer_id in self.model.output_layer_id:
if isinstance(self.model.layers[tmp_output_layer_id], Linear) and \
(not self.model.layers[tmp_output_layer_id].layer_conf.last_hidden_softmax):
logits_softmax[tmp_output_layer_id] = nn.functional.softmax(
logits[tmp_output_layer_id], dim=-1)
elif isinstance(get_layer_class(self.model, tmp_output_layer_id), CRF):
pass
else:
logits_softmax[tmp_output_layer_id] = logits[tmp_output_layer_id]
del loss, logits, logits_softmax, logits_flat
del prediction_scores
if ProblemTypes[self.problem.problem_type] == ProblemTypes.sequence_tagging \
or ProblemTypes[self.problem.problem_type] == ProblemTypes.classification:
del prediction_indices
if show_result_cnt == self.conf.batch_num_to_show_results:
# check the output
if ProblemTypes[self.problem.problem_type] == ProblemTypes.classification:
result = self.evaluator.evaluate(streaming_recoder.get('target'),
streaming_recoder.get('prediction'), y_pred_pos_score=streaming_recoder.get('pred_scores'),
y_pred_scores_all=streaming_recoder.get('pred_scores_all'), formatting=True)
logits = list(logits.values())[0]
logits_softmax = list(logits_softmax.values())[0]
assert len(logits_softmax.shape) == 2, 'The dimension of your output is %s, but we need [batch_size*GPUs, class num]' % (str(list(logits_softmax.shape)))
assert logits_softmax.shape[1] == self.problem.output_target_num(), 'The dimension of your output layer %d is inconsistent with your type number %d!' % (logits_softmax.shape[1], self.problem.output_target_num())
# for auc metric
prediction_scores = logits_softmax[:, self.conf.pos_label].cpu().data.numpy()
if self.evaluator.has_auc_type_specific:
prediction_scores_all = logits_softmax.cpu().data.numpy()
else:
prediction_scores_all = None
elif ProblemTypes[self.problem.problem_type] == ProblemTypes.sequence_tagging:
result = self.evaluator.evaluate(streaming_recoder.get('target'),
streaming_recoder.get('prediction'), y_pred_pos_score=streaming_recoder.get('pred_scores'),
formatting=True)
logits = list(logits.values())[0]
if not isinstance(get_layer_class(self.model, tmp_output_layer_id), CRF):
logits_softmax = list(logits_softmax.values())[0]
assert len(logits_softmax.shape) == 3, 'The dimension of your output is %s, but we need [batch_size*GPUs, sequence length, representation dim]' % (str(list(logits_softmax.shape)), )
prediction_scores = None
prediction_scores_all = None
elif ProblemTypes[self.problem.problem_type] == ProblemTypes.regression:
result = self.evaluator.evaluate(streaming_recoder.get('target'),
streaming_recoder.get('prediction'), y_pred_pos_score=None, y_pred_scores_all=None, formatting=True)
logits = list(logits.values())[0]
logits_softmax = list(logits_softmax.values())[0]
assert len(logits_softmax.shape) == 2 and logits_softmax.shape[1] == 1, 'The dimension of your output is %s, but we need [batch_size*GPUs, 1]' % (str(list(logits_softmax.shape)))
prediction_scores = None
prediction_scores_all = None
elif ProblemTypes[self.problem.problem_type] == ProblemTypes.mrc:
result = self.evaluator.evaluate(streaming_recoder.get('answer_text'), streaming_recoder.get('prediction'),
y_pred_pos_score=None, y_pred_scores_all=None, formatting=True)
for single_value in logits_softmax.values():
assert len(single_value.shape) == 3, 'The dimension of your output is %s, but we need [batch_size*GPUs, sequence_len, 1]' % (str(list(single_value.shape)))
prediction_scores = None
prediction_scores_all = None
if torch.cuda.device_count() > 1:
logging.info("Epoch %d batch idx: %d; lr: %f; since last log, loss=%f; %s" % \
(epoch, i * torch.cuda.device_count(), lr_scheduler.get_lr(), np.sum(all_costs), result))
logits_flat = dict()
if ProblemTypes[self.problem.problem_type] == ProblemTypes.sequence_tagging:
# Transform output shapes for metric evaluation
# for seq_tag_f1 metric
if isinstance(get_layer_class(self.model, tmp_output_layer_id), CRF):
forward_score, scores, masks, tag_seq, transitions, layer_conf = logits
prediction_indices = tag_seq.cpu().numpy()
streaming_recoder.record_one_row([self.problem.decode(prediction_indices, length_batches[i]['target'][self.conf.answer_column_name[0]].numpy()),
prediction_scores, self.problem.decode(
target_batches[i][self.conf.answer_column_name[0]],
length_batches[i]['target'][self.conf.answer_column_name[0]].numpy())], keep_dim=False)
else:
prediction_indices = logits_softmax.data.max(2)[1].cpu().numpy() # [batch_size, seq_len]
# pytorch's CrossEntropyLoss only support this
logits_flat[self.conf.output_layer_id[0]] = logits.view(-1, logits.size(2)) # [batch_size * seq_len, # of tags]
streaming_recoder.record_one_row([self.problem.decode(prediction_indices, length_batches[i]['target'][self.conf.answer_column_name[0]].numpy()),
prediction_scores, self.problem.decode(
target_batches[i][self.conf.answer_column_name[0]],
length_batches[i]['target'][self.conf.answer_column_name[0]].numpy())], keep_dim=False)
target_batches[i][self.conf.answer_column_name[0]] = target_batches[i][
self.conf.answer_column_name[0]].reshape(-1)
elif ProblemTypes[self.problem.problem_type] == ProblemTypes.classification:
prediction_indices = logits_softmax.detach().max(1)[1].cpu().numpy()
# Should not decode!
streaming_recoder.record_one_row([prediction_indices, prediction_scores, prediction_scores_all, target_batches[i][self.conf.answer_column_name[0]].numpy()])
logits_flat[self.conf.output_layer_id[0]] = logits
elif ProblemTypes[self.problem.problem_type] == ProblemTypes.regression:
temp_logits_flat = logits.squeeze(1)
prediction_scores = temp_logits_flat.detach().cpu().numpy()
streaming_recoder.record_one_row([prediction_scores, target_batches[i][self.conf.answer_column_name[0]].numpy()])
logits_flat[self.conf.output_layer_id[0]] = temp_logits_flat
elif ProblemTypes[self.problem.problem_type] == ProblemTypes.mrc:
for key, value in logits.items():
logits[key] = value.squeeze()
for key, value in logits_softmax.items():
logits_softmax[key] = value.squeeze()
passage_identify = None
for type_key in data_batches[i].keys():
if 'p' in type_key.lower():
passage_identify = type_key
break
if not passage_identify:
raise Exception('MRC task need passage information.')
prediction = self.problem.decode(logits_softmax, lengths=length_batches[i][passage_identify],
batch_data=data_batches[i][passage_identify])
logits_flat = logits
mrc_answer_target = None
for single_target in target_batches[i]:
if isinstance(target_batches[i][single_target][0], str):
mrc_answer_target = target_batches[i][single_target]
streaming_recoder.record_one_row([prediction, mrc_answer_target])
if self.use_gpu:
for single_target in self.conf.answer_column_name:
if isinstance(target_batches[i][single_target], torch.Tensor):
target_batches[i][single_target] = transfer_to_gpu(target_batches[i][single_target])
if isinstance(loss_fn.loss_fn[0], CRFLoss):
loss = loss_fn.loss_fn[0](forward_score, scores, masks, list(target_batches[i].values())[0], transitions, layer_conf)
else:
logging.info("Epoch %d batch idx: %d; lr: %f; since last log, loss=%f; %s" % \
(epoch, i, lr_scheduler.get_lr(), np.mean(all_costs), result))
loss = loss_fn(logits_flat, target_batches[i])
show_result_cnt = 0
# The loss and other metrics printed during a training epoch are just the result of part of the training data.
all_costs = []
streaming_recoder.clear_records()
all_costs.append(loss.item())
optimizer.zero_grad()
loss.backward()
if self.conf.clip_grad_norm_max_norm != -1:
torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.conf.clip_grad_norm_max_norm)
if isinstance(self.model, nn.DataParallel):
torch.nn.utils.clip_grad_norm_(self.model.module.layers['embedding'].get_parameters(), self.conf.clip_grad_norm_max_norm)
else:
torch.nn.utils.clip_grad_norm_(self.model.layers['embedding'].get_parameters(), self.conf.clip_grad_norm_max_norm)
optimizer.step()
if (i != 0 and i % valid_batch_num == 0) or i == len(target_batches) - 1:
torch.cuda.empty_cache() # actually useless
logging.info('Valid & Test : Epoch ' + str(epoch))
new_result = self.evaluate(valid_data, valid_length, valid_target,
self.conf.input_types, self.evaluator, loss_fn, pad_ids=None, cur_best_result=best_result,
model_save_path=self.conf.model_save_path, phase="valid", epoch=epoch)
renew_flag = best_result != new_result
best_result = new_result
del loss, logits, logits_softmax, logits_flat
del prediction_scores
if ProblemTypes[self.problem.problem_type] == ProblemTypes.sequence_tagging \
or ProblemTypes[self.problem.problem_type] == ProblemTypes.classification:
del prediction_indices
if renew_flag and self.conf.test_data_path is not None:
self.evaluate(test_data, test_length, test_target,
self.conf.input_types, self.evaluator, loss_fn, pad_ids=None, phase="test", epoch=epoch)
self.model.train()
show_result_cnt += 1
if show_result_cnt == batch_num_to_show_results:
if ProblemTypes[self.problem.problem_type] == ProblemTypes.classification:
result = self.evaluator.evaluate(streaming_recoder.get('target'),
streaming_recoder.get('prediction'), y_pred_pos_score=streaming_recoder.get('pred_scores'),
y_pred_scores_all=streaming_recoder.get('pred_scores_all'), formatting=True)
elif ProblemTypes[self.problem.problem_type] == ProblemTypes.sequence_tagging:
result = self.evaluator.evaluate(streaming_recoder.get('target'),
streaming_recoder.get('prediction'), y_pred_pos_score=streaming_recoder.get('pred_scores'),
formatting=True)
elif ProblemTypes[self.problem.problem_type] == ProblemTypes.regression:
result = self.evaluator.evaluate(streaming_recoder.get('target'),
streaming_recoder.get('prediction'), y_pred_pos_score=None, y_pred_scores_all=None, formatting=True)
elif ProblemTypes[self.problem.problem_type] == ProblemTypes.mrc:
result = self.evaluator.evaluate(streaming_recoder.get('answer_text'), streaming_recoder.get('prediction'),
y_pred_pos_score=None, y_pred_scores_all=None, formatting=True)
del data_batches, length_batches, target_batches
if torch.cuda.device_count() > 1:
logging.info("Epoch %d batch idx: %d; lr: %f; since last log, loss=%f; %s" % \
(epoch, i * torch.cuda.device_count(), lr_scheduler.get_lr(), np.mean(all_costs), result))
else:
logging.info("Epoch %d batch idx: %d; lr: %f; since last log, loss=%f; %s" % \
(epoch, i, lr_scheduler.get_lr(), np.mean(all_costs), result))
show_result_cnt = 0
# The loss and other metrics printed during a training epoch are just the result of part of the training data.
all_costs = []
streaming_recoder.clear_records()
if (i != 0 and i % valid_batch_num == 0) or i == len(target_batches) - 1:
torch.cuda.empty_cache() # actually useless
logging.info('Valid & Test : Epoch ' + str(epoch))
new_result = self.evaluate(valid_data, valid_length, valid_target,
self.conf.input_types, self.evaluator, loss_fn, pad_ids=None, cur_best_result=best_result,
model_save_path=self.conf.model_save_path, phase="valid", epoch=epoch)
renew_flag = best_result != new_result
best_result = new_result
if renew_flag and self.conf.test_data_path is not None:
self.evaluate(test_data, test_length, test_target,
self.conf.input_types, self.evaluator, loss_fn, pad_ids=None, phase="test", epoch=epoch)
self.model.train()
show_result_cnt += 1
del data_batches, length_batches, target_batches
lr_scheduler.step()
epoch += 1
@ -357,7 +345,7 @@ class LearningMachine(object):
test_data, test_length, test_target = self.problem.encode(test_data_path, self.conf.file_columns, self.conf.input_types,
self.conf.file_with_col_header, self.conf.object_inputs, self.conf.answer_column_name, max_lengths=self.conf.max_lengths,
min_sentence_len = self.conf.min_sentence_len, extra_feature = self.conf.extra_feature,fixed_lengths=self.conf.fixed_lengths, file_format='tsv',
show_progress=True if self.conf.mode == 'normal' else False, cpu_num_workers=self.conf.cpu_num_workers)
show_progress=True if self.conf.mode == 'normal' else False, cpu_num_workers=self.conf.cpu_num_workers, chunk_size=self.conf.chunk_size)
else:
test_pkl_data = load_from_pkl(test_data_path)
test_data, test_length, test_target = test_pkl_data['data'], test_pkl_data['length'], test_pkl_data['target']
@ -681,7 +669,7 @@ class LearningMachine(object):
self.conf.file_with_col_header,self.conf.object_inputs, None, min_sentence_len=self.conf.min_sentence_len,
extra_feature=self.conf.extra_feature,max_lengths=self.conf.max_lengths, fixed_lengths=self.conf.fixed_lengths,
file_format='tsv', show_progress=True if self.conf.mode == 'normal' else False,
cpu_num_workers=self.conf.cpu_num_workers)
cpu_num_workers=self.conf.cpu_num_workers, chunk_size=self.conf.chunk_size)
logging.info("Starting predict ...")
self.model.eval()
@ -910,5 +898,19 @@ class LearningMachine(object):
logging.info("Model %s loaded!" % model_path)
logging.info("Total trainable parameters: %d" % (get_trainable_param_num(self.model)))
def _get_training_data_generator(self):
if not self.conf.use_cache:
return self.problem.get_encode_generator(self.conf, build_cache=False)
if not self.conf.encoding_file_index:
return self._get_save_encode_generator()
assert self.conf.load_encoding_cache_generator, 'function conf.load_encoding_cache_generator is not defined'
return self.conf.load_encoding_cache_generator(self.conf.encoding_cache_dir, self.conf.encoding_file_index)
def _get_save_encode_generator(self):
load_save_encode_generator = self.problem.get_encode_generator(self.conf, build_cache=True)
for data, lengths, target in load_save_encode_generator:
yield data, lengths, target
cache_index = load_from_json(self.conf.encoding_cache_index_file_path)
self.conf.encoding_file_index = cache_index[st.cencoding_key_index]

Просмотреть файл

@ -14,8 +14,8 @@ import shutil
from losses.BaseLossConf import BaseLossConf
#import traceback
from settings import LanguageTypes, ProblemTypes, TaggingSchemes, SupportedMetrics, PredictionTypes, DefaultPredictionFields
from utils.common_utils import log_set, prepare_dir
from settings import LanguageTypes, ProblemTypes, TaggingSchemes, SupportedMetrics, PredictionTypes, DefaultPredictionFields, ConstantStatic
from utils.common_utils import log_set, prepare_dir, md5
from utils.exceptions import ConfigurationError
import numpy as np
@ -219,6 +219,10 @@ class ModelConf(object):
# vocabulary setting
self.max_vocabulary = self.get_item(['training_params', 'vocabulary', 'max_vocabulary'], default=800000, use_default=True)
self.min_word_frequency = self.get_item(['training_params', 'vocabulary', 'min_word_frequency'], default=3, use_default=True)
self.max_building_lines = self.get_item(['training_params', 'vocabulary', 'max_building_lines'], default=1000 * 1000, use_default=True)
# chunk_size
self.chunk_size = self.get_item(['training_params', 'chunk_size'], default=1000 * 1000, use_default=True)
# file column header setting
self.file_with_col_header = self.get_item(['inputs', 'file_with_col_header'], default=False, use_default=True)
@ -280,6 +284,9 @@ class ModelConf(object):
tmp_problem_path = os.path.join(self.save_base_dir, '.necessary_cache', 'problem.pkl')
self.problem_path = tmp_problem_path if os.path.isfile(tmp_problem_path) else os.path.join(self.save_base_dir, 'necessary_cache', 'problem.pkl')
# cache configuration
self._load_cache_config_from_conf()
# training params
self.training_params = self.get_item(['training_params'])
@ -303,7 +310,9 @@ class ModelConf(object):
self.max_epoch = self.params.max_epoch
else:
self.max_epoch = self.get_item(['training_params', 'max_epoch'], default=float('inf'))
self.valid_times_per_epoch = self.get_item(['training_params', 'valid_times_per_epoch'], default=1)
if 'valid_times_per_epoch' in self.conf['training_params']:
logging.info("configuration[training_params][valid_times_per_epoch] is deprecated, please use configuration[training_params][steps_per_validation] instead")
self.steps_per_validation = self.get_item(['training_params', 'steps_per_validation'], default=10)
self.batch_num_to_show_results = self.get_item(['training_params', 'batch_num_to_show_results'], default=10)
self.max_lengths = self.get_item(['training_params', 'max_lengths'], default=None, use_default=True)
self.fixed_lengths = self.get_item(['training_params', 'fixed_lengths'], default=None, use_default=True)
@ -529,3 +538,23 @@ class ModelConf(object):
shutil.copy(params.conf_path, self.save_base_dir)
logging.info('Configuration file is backed up to %s' % (self.save_base_dir))
def _load_cache_config_from_conf(self):
# training data
self.train_data_md5 = None
if self.phase == 'train' and self.train_data_path:
logging.info("Calculating the md5 of traing data ...")
self.train_data_md5 = md5([self.train_data_path])
logging.info("the md5 of traing data is %s"%(self.train_data_md5))
# problem
self.problem_md5 = None
# encoding
self.encoding_cache_dir = None
self.encoding_cache_index_file_path = None
self.encoding_cache_index_file_md5_path = None
self.encoding_file_index = None
self.encoding_cache_legal_line_cnt = 0
self.encoding_cache_illegal_line_cnt = 0
self.load_encoding_cache_generator = None

Просмотреть файл

@ -147,10 +147,12 @@ The architecture of the configuration file is:
CUDA_VISIBLE_DEVICES= python train.py
```
- ***cpu_num_workers***. [default: -1] Define the number of processes to preprocess the dataset. The number of processes is equal to that of logical cores CPU supports if value is negtive or 0, otherwise it is equal to *cpu_num_workers*.
- ***chunk_size***. [default: 1000000] Define the chunk size of files that NB reads every time for avoiding out of memory and the mechanism of lazy-loading.
- ***batch_size***. Define the batch size here. If there are multiple GPUs, *batch_size* is the batch size of each GPU.
- ***batch_num_to_show_results***. [necessary for training] During the training process, show the results every batch_num_to_show_results batches.
- ***max_epoch***. [necessary for training] The maximum number of epochs to train.
- ***valid_times_per_epoch***. [optional for training, default: 1] Define how many times to conduct validation per epoch. Usually, we conduct validation after each epoch, but for a very large corpus, we'd better validate multiple times in case to miss the best state of our model. The default value is 1.
- ~~***valid_times_per_epoch***~~. [**deprecated**] Please use steps_per_validation instead.
- ***steps_per_validation***. [default: 10] Define how many steps does each validation take place.
- ***tokenizer***. [optional] Define tokenizer here. Currently, we support 'nltk' and 'jieba'. By default, 'nltk' for English and 'jieba' for Chinese.
- **architecture**. Define the model architecture. The node is a list of layers (blocks) in block_zoo to represent a model. The supported layers of this toolkit are given in [block_zoo overview](https://microsoft.github.io/NeuronBlocks).
@ -729,5 +731,7 @@ To solve the above problems, NeuronBlocks supports *fixing embedding weight* (em
***training_params/vocabulary/max_vocabulary***. [int, optional for training, default: 800,000] The max size of corpus vocabulary. If corpus vocabulary size is larger than *max_vocabulary*, it will be cut according to word frequency.
***training_params/vocabulary/max_building_lines***. [int, optional for training, default: 1,000,000] The max lines NB will read from every file to build vocabulary
## <span id="faq">Frequently Asked Questions</span>

Просмотреть файл

@ -137,10 +137,12 @@ python predict.py --conf_path=model_zoo/demo/conf.json
CUDA_VISIBLE_DEVICES= python train.py
```
- ***cpu_num_workers***. [default: -1] Define the number of processes to preprocess the dataset. The number of processes is equal to that of logical cores CPU supports if value is negtive or 0, otherwise it is equal to *cpu_num_workers*.
- ***chunk_size***. [default: 1000000] Define the chunk size of files that NB reads every time for avoiding out of memory and the mechanism of lazy-loading.
- ***batch_size***. Define the batch size here. If there are multiple GPUs, *batch_size* is the batch size of each GPU.
- ***batch_num_to_show_results***. [necessary for training] During the training process, show the results every batch_num_to_show_results batches.
- ***max_epoch***. [necessary for training] The maximum number of epochs to train.
- ***valid_times_per_epoch***. [optional for training, default: 1] Define how many times to conduct validation per epoch. Usually, we conduct validation after each epoch, but for a very large corpus, we'd better validate multiple times in case to miss the best state of our model. The default value is 1.
- ~~***valid_times_per_epoch***~~. [**deprecated**] Please use steps_per_validation instead.
- ***steps_per_validation***. [default: 10] Define how many steps does each validation take place.
- ***tokenizer***. [optional] Define tokenizer here. Currently, we support 'nltk' and 'jieba'. By default, 'nltk' for English and 'jieba' for Chinese.
- **architecture**. Define the model architecture. The node is a list of layers (blocks) in block_zoo to represent a model. The supported layers of this toolkit are given in [block_zoo overview](https://microsoft.github.io/NeuronBlocks).
@ -719,4 +721,6 @@ To solve the above problems, NeuronBlocks supports *fixing embedding weight* (em
***training_params/vocabulary/max_vocabulary***. [int, optional for training, default: 800,000] The max size of corpus vocabulary. If corpus vocabulary size is larger than *max_vocabulary*, it will be cut according to word frequency.
***training_params/vocabulary/max_building_lines***. [int, optional for training, default: 1,000,000] The max lines NB will read from every file to build vocabulary
## <span id="faq">常见问题与答案</span>

Просмотреть файл

@ -66,7 +66,10 @@ class EmbeddingConf(BaseConf):
for emb_type in self.conf:
if emb_type == 'position':
continue
self.output_dim[2] += self.conf[emb_type]['dim']
if isinstance(self.conf[emb_type]['dim'], list):
self.output_dim[2] += sum(self.conf[emb_type]['dim'])
else:
self.output_dim[2] += self.conf[emb_type]['dim']
super(EmbeddingConf, self).inference()
@ -113,6 +116,7 @@ class Embedding(BaseLayer):
self.layer_conf = layer_conf
self.embeddings = nn.ModuleDict() if layer_conf.weight_on_gpu else dict()
self.char_embeddings = nn.ModuleDict()
for input_cluster in layer_conf.conf:
if 'type' in layer_conf.conf[input_cluster]:
# char embedding
@ -122,7 +126,7 @@ class Embedding(BaseLayer):
char_emb_conf = eval(layer_conf.conf[input_cluster]['type'] + "Conf")(** char_emb_conf_dict)
char_emb_conf.inference()
char_emb_conf.verify()
self.embeddings[input_cluster] = eval(layer_conf.conf[input_cluster]['type'])(char_emb_conf)
self.char_embeddings[input_cluster] = eval(layer_conf.conf[input_cluster]['type'])(char_emb_conf)
else:
# word embedding, postag embedding, and so on
self.embeddings[input_cluster] = nn.Embedding(layer_conf.conf[input_cluster]['vocab_size'], layer_conf.conf[input_cluster]['dim'], padding_idx=0)
@ -155,14 +159,13 @@ class Embedding(BaseLayer):
if 'extra' in input_cluster:
continue
input = inputs[input_cluster]
# if 'type' in self.layer_conf.conf[input_cluster]:
# emb = self.embeddings[input_cluster](input, lengths[input]).float()
# else:
# emb = self.embeddings[input_cluster](input).float()
if list(self.embeddings[input_cluster].parameters())[0].device.type == 'cpu':
emb = self.embeddings[input_cluster](input.cpu()).float()
if input_cluster == 'char':
emb = self.char_embeddings[input_cluster](input).float()
else:
emb = self.embeddings[input_cluster](input).float()
if list(self.embeddings[input_cluster].parameters())[0].device.type == 'cpu':
emb = self.embeddings[input_cluster](input.cpu()).float()
else:
emb = self.embeddings[input_cluster](input).float()
if use_gpu is True:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
emb = emb.to(device)

Просмотреть файл

@ -29,7 +29,7 @@ class Pooling2DConf(BaseConf):
self.pool_type = 'max' # Supported: ['max', mean']
self.stride = 1
self.padding = 0
self.window_size = 3
# self.window_size = [self.input_dims[0][1], self.input_dims[0][2]]
@DocInherit
def declare(self):
@ -38,7 +38,7 @@ class Pooling2DConf(BaseConf):
def check_size(self, value, attr):
res = value
if isinstance(value,int):
if isinstance(value, int):
res = [value, value]
elif (isinstance(self.window_size, tuple) or isinstance(self.window_size, list)) and len(value)==2:
res = list(value)
@ -48,6 +48,9 @@ class Pooling2DConf(BaseConf):
@DocInherit
def inference(self):
if not hasattr(self, "window_size"):
self.window_size = [self.input_dims[0][1], self.input_dims[0][2]]
self.window_size = self.check_size(self.window_size, "window_size")
self.stride = self.check_size(self.stride, "stride")

Просмотреть файл

@ -28,11 +28,11 @@ class CNNCharEmbeddingConf(BaseConf):
@DocInherit
def default(self):
self.dim = 30 # cnn's output channel dim
self.dim = [30] # cnn's output channel dim
self.embedding_matrix_dim = 30 #
self.stride = 1
self.stride = [1]
self.padding = 0
self.window_size = 3
self.window_size = [3]
self.activation = 'ReLU'
@DocInherit
@ -41,8 +41,14 @@ class CNNCharEmbeddingConf(BaseConf):
self.num_of_inputs = 1
self.input_ranks = [3]
def change_to_list(self, attribute):
for single in attribute:
if not isinstance(getattr(self, single), list):
setattr(self, single, [getattr(self, single)])
@DocInherit
def inference(self):
self.change_to_list(['dim', 'stride', 'window_size'])
self.output_channel_num = self.dim
self.output_rank = 3
@ -65,20 +71,24 @@ class CNNCharEmbedding(BaseLayer):
super(CNNCharEmbedding, self).__init__(layer_conf)
self.layer_conf = layer_conf
assert len(layer_conf.dim) == len(layer_conf.window_size) == len(layer_conf.stride), "The attribute dim/window_size/stride must have the same length."
self.char_embeddings = nn.Embedding(layer_conf.vocab_size, layer_conf.embedding_matrix_dim, padding_idx=self.layer_conf.padding)
nn.init.uniform_(self.char_embeddings.weight, -0.001, 0.001)
self.char_cnn = nn.Conv2d(1, layer_conf.output_channel_num, (layer_conf.window_size, layer_conf.embedding_matrix_dim),
stride=self.layer_conf.stride, padding=self.layer_conf.padding)
self.char_cnn = nn.ModuleList()
for i in range(len(layer_conf.output_channel_num)):
self.char_cnn.append(nn.Conv2d(1, layer_conf.output_channel_num[i], (layer_conf.window_size[i], layer_conf.embedding_matrix_dim),
stride=self.layer_conf.stride[i], padding=self.layer_conf.padding))
if layer_conf.activation:
self.activation = eval("nn." + self.layer_conf.activation)()
else:
self.activation = None
if self.is_cuda():
self.char_embeddings = self.char_embeddings.cuda()
self.char_cnn = self.char_cnn.cuda()
if self.activation and hasattr(self.activation, 'weight'):
self.activation.weight = torch.nn.Parameter(self.activation.weight.cuda())
# if self.is_cuda():
# self.char_embeddings = self.char_embeddings.cuda()
# self.char_cnn = self.char_cnn.cuda()
# if self.activation and hasattr(self.activation, 'weight'):
# self.activation.weight = torch.nn.Parameter(self.activation.weight.cuda())
def forward(self, string):
"""
@ -102,14 +112,24 @@ class CNNCharEmbedding(BaseLayer):
char_embs_lookup = char_embs_lookup.view(-1, string.size()[2], self.layer_conf.embedding_matrix_dim) #[batch_size * seq_len, char num in words, embedding_dim]
string_input = torch.unsqueeze(char_embs_lookup, 1) # [batch_size * seq_len, input_channel_num=1, char num in words, embedding_dim]
string_conv = self.char_cnn(string_input).squeeze()
if self.activation:
string_conv = self.activation(string_conv)
string_maxpooling = F.max_pool1d(string_conv, string_conv.size(2)).squeeze()
string_out = string_maxpooling.view(string.size()[0], -1, self.layer_conf.output_channel_num)
outputs = []
for index, single_cnn in enumerate(self.char_cnn):
string_conv = single_cnn(string_input).squeeze(3)
if self.activation:
string_conv = self.activation(string_conv)
return string_out
string_maxpooling = F.max_pool1d(string_conv, string_conv.size(2)).squeeze()
string_out = string_maxpooling.view(string.size()[0], -1, self.layer_conf.output_channel_num[index])
outputs.append(string_out)
if len(outputs) > 1:
string_output = torch.cat(outputs, 2)
else:
string_output = outputs[0]
return string_output
if __name__ == '__main__':

Просмотреть файл

@ -49,7 +49,8 @@
"training_params": {
"vocabulary": {
"min_word_frequency": 1,
"max_vocabulary": 100000
"max_vocabulary": 100000,
"max_building_lines": 1000000
},
"optimizer": {
"name": "Adam",
@ -57,6 +58,7 @@
"lr": 0.001
}
},
"chunk_size": 1000000,
"lr_decay": 0.95,
"minimum_lr": 0.0001,
"epoch_start_lr_decay": 1,
@ -65,7 +67,7 @@
"batch_size": 30,
"batch_num_to_show_results": 10,
"max_epoch": 3,
"valid_times_per_epoch": 1,
"steps_per_validation": 10,
"text_preprocessing": ["DBC2SBC"],
"max_lengths":{
"question": 30,
@ -90,10 +92,10 @@
"cols": ["question_char", "answer_char"],
"type": "CNNCharEmbedding",
"dropout": 0.2,
"dim": 30,
"embedding_matrix_dim": 8,
"stride":1,
"window_size": 5,
"dim": [30, 20, 100],
"embedding_matrix_dim": 50,
"stride":[1, 2, 3],
"window_size": [3,3,5],
"activation": "ReLU"
}
}

Просмотреть файл

@ -53,7 +53,7 @@
"batch_size": 256,
"batch_num_to_show_results": 10,
"max_epoch": 30,
"valid_times_per_epoch": 10,
"steps_per_validation": 10,
"fixed_lengths":{
"query": 30
}

Просмотреть файл

@ -12,9 +12,9 @@ nltk.download('stopwords', quiet=True)
from utils.BPEEncoder import BPEEncoder
import os
import pickle as pkl
from utils.common_utils import load_from_pkl, dump_to_pkl
from utils.common_utils import load_from_pkl, dump_to_pkl, load_from_json, dump_to_json, prepare_dir, md5
from settings import ProblemTypes
from settings import ProblemTypes, Setting as st
import math
from utils.ProcessorsScheduler import ProcessorsScheduler
@ -112,24 +112,21 @@ class Problem():
else:
return None
def get_data_generator_from_file(self, data_path_list, file_with_col_header, chunk_size=1000000):
# NOTE: file_path is a list type
for single_path in data_path_list:
data_list = list()
if single_path is not None:
with open(single_path, "r", encoding='utf-8') as f:
if file_with_col_header:
f.readline()
for index, line in enumerate(f):
line = line.rstrip()
if not line:
break
data_list.append(line)
if (index + 1) % chunk_size == 0:
yield data_list
data_list = list()
if len(data_list) > 0:
yield data_list
def get_data_generator_from_file(self, data_path, file_with_col_header, chunk_size=1000000):
data_list = list()
with open(data_path, "r", encoding='utf-8') as f:
if file_with_col_header:
f.readline()
for index, line in enumerate(f):
line = line.rstrip()
if not line:
break
data_list.append(line)
if (index + 1) % chunk_size == 0:
yield data_list
data_list = list()
if len(data_list) > 0:
yield data_list
def build_training_data_list(self, training_data_list, file_columns, input_types, answer_column_name, bpe_encoder=None):
docs = dict() # docs of each type of input
@ -226,11 +223,11 @@ class Problem():
def build(self, data_path_list, file_columns, input_types, file_with_col_header, answer_column_name, word2vec_path=None, word_emb_dim=None,
format=None, file_type=None, involve_all_words=None, file_format="tsv", show_progress=True,
cpu_num_workers=-1, max_vocabulary=800000, word_frequency=3):
cpu_num_workers=-1, max_vocabulary=800000, word_frequency=3, max_building_lines=1000*1000):
"""
Args:
training_data_path:
data_path_list:
file_columns: {
"word1": 0,
"word2": 1,
@ -268,39 +265,29 @@ class Problem():
"""
# parameter check
if not word2vec_path:
word_emb_dim, format, file_type, involve_all_words = None, None, None, None
if 'bpe' in input_types:
try:
bpe_encoder = BPEEncoder(input_types['bpe']['bpe_path'])
except KeyError:
raise Exception('Please define a bpe path at the embedding layer.')
else:
bpe_encoder = None
bpe_encoder = self._check_bpe_encoder(input_types)
self.file_column_num = len(file_columns)
progress = self.get_data_generator_from_file(data_path_list, file_with_col_header)
preprocessed_data_generator = self.build_training_multi_processor(progress, cpu_num_workers, file_columns, input_types, answer_column_name, bpe_encoder=bpe_encoder)
# update symbol universe
total_cnt_legal, total_cnt_illegal = 0, 0
for docs, target_docs, cnt_legal, cnt_illegal in tqdm(preprocessed_data_generator):
total_cnt_legal += cnt_legal
total_cnt_illegal += cnt_illegal
# input_type
for input_type in input_types:
self.input_dicts[input_type].update(docs[input_type])
# problem_type
if ProblemTypes[self.problem_type] == ProblemTypes.classification or \
ProblemTypes[self.problem_type] == ProblemTypes.sequence_tagging:
self.output_dict.update(list(target_docs.values())[0])
elif ProblemTypes[self.problem_type] == ProblemTypes.regression or \
ProblemTypes[self.problem_type] == ProblemTypes.mrc:
pass
logging.info("Corpus imported: %d legal lines, %d illegal lines." % (total_cnt_legal, total_cnt_illegal))
for data_path in data_path_list:
if data_path:
progress = self.get_data_generator_from_file(data_path, file_with_col_header, chunk_size=max_building_lines)
preprocessed_data_generator= self.build_training_multi_processor(progress, cpu_num_workers, file_columns, input_types, answer_column_name, bpe_encoder=bpe_encoder)
# update symbol universe
docs, target_docs, cnt_legal, cnt_illegal = next(preprocessed_data_generator)
# input_type
for input_type in input_types:
self.input_dicts[input_type].update(docs[input_type])
# problem_type
if ProblemTypes[self.problem_type] == ProblemTypes.classification or \
ProblemTypes[self.problem_type] == ProblemTypes.sequence_tagging:
self.output_dict.update(list(target_docs.values())[0])
elif ProblemTypes[self.problem_type] == ProblemTypes.regression or \
ProblemTypes[self.problem_type] == ProblemTypes.mrc:
pass
logging.info("[Building Dictionary] in %s at most %d lines imported: %d legal lines, %d illegal lines." % (data_path, max_building_lines, cnt_legal, cnt_illegal))
# build dictionary
for input_type in input_types:
@ -404,8 +391,6 @@ class Problem():
def encode_data_multi_processor(self, data_generator, cpu_num_workers, file_columns, input_types, object_inputs,
answer_column_name, min_sentence_len, extra_feature, max_lengths=None, fixed_lengths=None, file_format="tsv", bpe_encoder=None):
for data in data_generator:
scheduler = ProcessorsScheduler(cpu_num_workers)
func_args = (data, file_columns, input_types, object_inputs,
@ -445,6 +430,9 @@ class Problem():
type_branches = dict() # branch of input type, e.g. type_branches['query_index'] = 'query'
# for char: don't split these word
word_no_split = ['<start>', '<pad>', '<eos>', '<unk>']
for branch in object_inputs:
data[branch] = dict()
lengths[branch] = dict()
@ -594,8 +582,18 @@ class Problem():
temp_word_char = []
temp_word_length = []
for single_token in tokens:
temp_word_char.append(self.input_dicts[type2cluster[single_input_type]].lookup(single_token))
temp_word_length.append(len(single_token))
if single_token in word_no_split:
# temp_word_length.append(1)
temp_id = [self.input_dicts[type2cluster[single_input_type]].id(single_token)]
else:
temp_id = self.input_dicts[type2cluster[single_input_type]].lookup(single_token)
if fixed_lengths and 'word' in fixed_lengths:
if len(temp_id) >= fixed_lengths['word']:
temp_id = temp_id[:fixed_lengths['word']]
else:
temp_id = temp_id + [self.input_dicts[type2cluster[single_input_type]].id('<pad>')] * (fixed_lengths['word'] - len(temp_id))
temp_word_char.append(temp_id)
temp_word_length.append(len(temp_id))
data[branch][single_input_type].append(temp_word_char)
lengths[branch]['word_length'].append(temp_word_length)
else:
@ -675,7 +673,7 @@ class Problem():
def encode(self, data_path, file_columns, input_types, file_with_col_header, object_inputs, answer_column_name,
min_sentence_len, extra_feature, max_lengths=None, fixed_lengths=None, file_format="tsv", show_progress=True,
cpu_num_workers = -1):
cpu_num_workers=-1, chunk_size=1000*1000):
"""
Args:
@ -751,22 +749,16 @@ class Problem():
target: [...]
"""
if 'bpe' in input_types:
try:
bpe_encoder = BPEEncoder(input_types['bpe']['bpe_path'])
except KeyError:
raise Exception('Please define a bpe path at the embedding layer.')
else:
bpe_encoder = None
bpe_encoder = self._check_bpe_encoder(input_types)
progress = self.get_data_generator_from_file([data_path], file_with_col_header)
encoder_generator = self.encode_data_multi_processor(progress, cpu_num_workers,
progress = self.get_data_generator_from_file(data_path, file_with_col_header, chunk_size=chunk_size)
encode_generator = self.encode_data_multi_processor(progress, cpu_num_workers,
file_columns, input_types, object_inputs, answer_column_name, min_sentence_len, extra_feature, max_lengths,
fixed_lengths, file_format, bpe_encoder=bpe_encoder)
data, lengths, target = dict(), dict(), dict()
cnt_legal, cnt_illegal = 0, 0
for temp_data, temp_lengths, temp_target, temp_cnt_legal, temp_cnt_illegal in tqdm(encoder_generator):
for temp_data, temp_lengths, temp_target, temp_cnt_legal, temp_cnt_illegal in tqdm(encode_generator):
data = self._merge_encode_data(data, temp_data)
lengths = self._merge_encode_lengths(lengths, temp_lengths)
target = self._merge_target(target, temp_target)
@ -776,6 +768,59 @@ class Problem():
logging.info("%s: %d legal samples, %d illegal samples" % (data_path, cnt_legal, cnt_illegal))
return data, lengths, target
def build_encode_cache(self, conf, file_format="tsv"):
logging.info("[Cache] building encoding cache")
build_encode_cache_generator = self.get_encode_generator(conf, build_cache=True, file_format=file_format)
for _ in build_encode_cache_generator:
continue
logging.info("[Cache] encoding is saved to %s" % conf.encoding_cache_dir)
def get_encode_generator(self, conf, build_cache=True, file_format="tsv"):
# parameter check
if build_cache:
assert conf.encoding_cache_dir, 'There is no property encoding_cache_dir in object conf'
assert conf.encoding_cache_index_file_path, 'There is no property encoding_cache_index_file_path in object conf'
assert conf.encoding_cache_index_file_md5_path, 'There is no property encoding_cache_index_file_md5_path in object conf'
bpe_encoder = self._check_bpe_encoder(conf.input_types)
data_generator = self.get_data_generator_from_file(conf.train_data_path, conf.file_with_col_header, chunk_size=conf.chunk_size)
encode_generator = self.encode_data_multi_processor(data_generator, conf.cpu_num_workers,
conf.file_columns, conf.input_types, conf.object_inputs, conf.answer_column_name,
conf.min_sentence_len, conf.extra_feature, conf.max_lengths,
conf.fixed_lengths, file_format, bpe_encoder=bpe_encoder)
file_index = []
total_cnt_legal, total_cnt_illegal = 0, 0
for part_number, encode_data in enumerate(encode_generator):
data, lengths, target, cnt_legal, cnt_illegal = encode_data
if build_cache:
total_cnt_legal = total_cnt_legal + cnt_legal
total_cnt_illegal = total_cnt_illegal + cnt_illegal
file_name = st.cencoding_file_name_pattern % (part_number)
file_path = os.path.join(conf.encoding_cache_dir, file_name)
dump_to_pkl((data, lengths, target), file_path)
file_index.append([file_name, md5([file_path])])
logging.info("Up to now, in %s: %d legal samples, %d illegal samples" % (conf.train_data_path, total_cnt_legal, total_cnt_illegal))
yield data, lengths, target
if build_cache:
cache_index = dict()
cache_index[st.cencoding_key_index] = file_index
cache_index[st.cencoding_key_legal_cnt] = total_cnt_legal
cache_index[st.cencoding_key_illegal_cnt] = total_cnt_illegal
dump_to_json(cache_index, conf.encoding_cache_index_file_path)
dump_to_json(md5([conf.encoding_cache_index_file_path]), conf.encoding_cache_index_file_md5_path)
@staticmethod
def _check_bpe_encoder(input_types):
bpe_encoder = None
if 'bpe' in input_types:
try:
bpe_encoder = BPEEncoder(input_types['bpe']['bpe_path'])
except KeyError:
raise Exception('Please define a bpe path at the embedding layer.')
return bpe_encoder
def decode(self, model_output, lengths=None, batch_data=None):
""" decode the model output, either a batch of output or a single output

Просмотреть файл

@ -53,3 +53,27 @@ DefaultPredictionFields = {
# nltk's models
nltk.data.path.append(os.path.join(os.getcwd(), 'dataset', 'nltk_data'))
class Constant(type):
def __setattr__(self, name, value):
raise AttributeError("Class %s can not be modified"%(self.__name__))
class ConstantStatic(metaclass=Constant):
def __init__(self, *args,**kwargs):
raise Exception("Class %s can not be instantiated"%(self.__class__.__name__))
class Setting(ConstantStatic):
# cache
## cencoding (cache_encoding)
cencodig_index_file_name = 'index.json'
cencoding_index_md5_file_name = 'index_md5.json'
cencoding_file_name_pattern = 'encoding_cache_%s.pkl'
cencoding_key_finish = 'finish'
cencoding_key_index = 'index'
cencoding_key_legal_cnt = 'legal_line_cnt'
cencoding_key_illegal_cnt = 'illegal_line_cnt'

136
train.py
Просмотреть файл

@ -1,7 +1,7 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT license.
from settings import ProblemTypes, version
from settings import ProblemTypes, version, Setting as st
import os
import argparse
@ -15,7 +15,7 @@ import torch
import torch.nn as nn
from ModelConf import ModelConf
from problem import Problem
from utils.common_utils import dump_to_pkl, load_from_pkl, prepare_dir
from utils.common_utils import dump_to_pkl, load_from_pkl, load_from_json, dump_to_json, prepare_dir, md5
from utils.philly_utils import HDFSDirectTransferer
from losses import *
from optimizers import *
@ -33,33 +33,76 @@ class Cache:
self.dictionary_invalid = True
self.embedding_invalid = True
# cache_conf
cache_conf = None
cache_conf_path = os.path.join(conf.cache_dir, 'conf_cache.json')
if os.path.isfile(cache_conf_path):
params_cache = copy.deepcopy(params)
try:
cache_conf = ModelConf('cache', cache_conf_path, version, params_cache)
except Exception as e:
cache_conf = None
if cache_conf is None or not self._verify_conf(cache_conf, conf):
return False
# problem
if not os.path.isfile(conf.problem_path):
return False
# embedding
if conf.emb_pkl_path:
if not os.path.isfile(conf.emb_pkl_path):
if not conf.pretrained_model_path:
# cache_conf
cache_conf = None
cache_conf_path = os.path.join(conf.cache_dir, 'conf_cache.json')
if os.path.isfile(cache_conf_path):
params_cache = copy.deepcopy(params)
try:
cache_conf = ModelConf('cache', cache_conf_path, version, params_cache)
except Exception as e:
cache_conf = None
if cache_conf is None or not self._verify_conf(cache_conf, conf):
return False
self.embedding_invalid = False
# problem
if not os.path.isfile(conf.problem_path):
return False
# embedding
if conf.emb_pkl_path:
if not os.path.isfile(conf.emb_pkl_path):
return False
self.embedding_invalid = False
self.dictionary_invalid = False
self.dictionary_invalid = False
logging.info('[Cache] dictionary found')
return True
def _check_encoding(self, conf):
self.encoding_invalid = True
if not conf.pretrained_model_path and self.dictionary_invalid:
return False
# Calculate the MD5 of problem
problem_path = conf.problem_path if not conf.pretrained_model_path else conf.saved_problem_path
try:
conf.problem_md5 = md5([problem_path])
except Exception as e:
conf.problem_md5 = None
logging.info('Can not calculate md5 of problem.pkl from %s'%(problem_path))
return False
# check the valid of encoding cache
## encoding cache dir
conf.encoding_cache_dir = os.path.join(conf.cache_dir, conf.train_data_md5 + conf.problem_md5)
logging.debug('[Cache] conf.encoding_cache_dir %s' % (conf.encoding_cache_dir))
if not os.path.exists(conf.encoding_cache_dir):
return False
## encoding cache index
conf.encoding_cache_index_file_path = os.path.join(conf.encoding_cache_dir, st.cencodig_index_file_name)
conf.encoding_cache_index_file_md5_path = os.path.join(conf.encoding_cache_dir, st.cencoding_index_md5_file_name)
if not os.path.exists(conf.encoding_cache_index_file_path) or not os.path.exists(conf.encoding_cache_index_file_md5_path):
return False
if md5([conf.encoding_cache_index_file_path]) != load_from_json(conf.encoding_cache_index_file_md5_path):
return False
cache_index = load_from_json(conf.encoding_cache_index_file_path)
## encoding cache content
for index in cache_index[st.cencoding_key_index]:
file_name, file_md5 = index[0], index[1]
if file_md5 != md5([os.path.join(conf.encoding_cache_dir, file_name)]):
return False
if (st.cencoding_key_legal_cnt in cache_index) and (st.cencoding_key_illegal_cnt in cache_index):
conf.encoding_cache_legal_line_cnt = cache_index[st.cencoding_key_legal_cnt]
conf.encoding_cache_illegal_line_cnt = cache_index[st.cencoding_key_illegal_cnt]
self.encoding_invalid = False
logging.info('[Cache] encoding found')
logging.info('%s: %d legal samples, %d illegal samples' % (conf.train_data_path, conf.encoding_cache_legal_line_cnt, conf.encoding_cache_illegal_line_cnt))
return True
def check(self, conf, params):
@ -69,7 +112,7 @@ class Cache:
return
# encoding
if not self._check_encoding(conf):
self._renew_cache(params, conf.cache_dir)
self._renew_cache(params, conf.encoding_cache_dir)
def load(self, conf, problem, emb_matrix):
# load dictionary when (not finetune) and (cache valid)
@ -80,13 +123,17 @@ class Cache:
logging.info('[Cache] loading dictionary successfully')
if not self.encoding_invalid:
pass
self._prepare_encoding_cache(conf, problem, build=False)
logging.info('[Cache] preparing encoding successfully')
return problem, emb_matrix
def save(self, conf, params, problem, emb_matrix):
# make cache dir
if not os.path.exists(conf.cache_dir):
os.makedirs(conf.cache_dir)
shutil.copy(params.conf_path, os.path.join(conf.cache_dir, 'conf_cache.json'))
# dictionary
if self.dictionary_invalid:
if conf.mode == 'philly' and conf.emb_pkl_path.startswith('/hdfs/'):
with HDFSDirectTransferer(conf.problem_path, with_hdfs_command=True) as transferer:
@ -100,10 +147,11 @@ class Cache:
transferer.pkl_dump(emb_matrix)
else:
dump_to_pkl(emb_matrix, conf.emb_pkl_path)
logging.info("Embedding matrix saved to %s" % conf.emb_pkl_path)
logging.info("[Cache] Embedding matrix saved to %s" % conf.emb_pkl_path)
# encoding
if self.encoding_invalid:
pass
self._prepare_encoding_cache(conf, problem, build=params.make_cache_only)
def back_up(self, conf, problem):
cache_bakup_path = os.path.join(conf.save_base_dir, 'necessary_cache/')
@ -149,6 +197,34 @@ class Cache:
flag = False
return flag
def _prepare_encoding_cache(self, conf, problem, build=False):
# encoding cache dir
problem_path = conf.problem_path if not conf.pretrained_model_path else conf.saved_problem_path
conf.problem_md5 = md5([problem_path])
conf.encoding_cache_dir = os.path.join(conf.cache_dir, conf.train_data_md5 + conf.problem_md5)
if not os.path.exists(conf.encoding_cache_dir):
os.makedirs(conf.encoding_cache_dir)
# encoding cache files
conf.encoding_cache_index_file_path = os.path.join(conf.encoding_cache_dir, st.cencodig_index_file_name)
conf.encoding_cache_index_file_md5_path = os.path.join(conf.encoding_cache_dir, st.cencoding_index_md5_file_name)
conf.load_encoding_cache_generator = self._load_encoding_cache_generator
if build:
prepare_dir(conf.encoding_cache_dir, True, allow_overwrite=True, clear_dir_if_exist=True)
problem.build_encode_cache(conf)
self.encoding_invalid = False
if not self.encoding_invalid:
cache_index = load_from_json(conf.encoding_cache_index_file_path)
conf.encoding_file_index = cache_index[st.cencoding_key_index]
@staticmethod
def _load_encoding_cache_generator(cache_dir, file_index):
for index in file_index:
file_path = os.path.join(cache_dir, index[0])
yield load_from_pkl(file_path)
def main(params):
# init
conf = ModelConf("train", params.conf_path, version, params, mode=params.mode)
@ -181,11 +257,7 @@ def main(params):
word_emb_dim=conf.pretrained_emb_dim, format=conf.pretrained_emb_type,
file_type=conf.pretrained_emb_binary_or_text, involve_all_words=conf.involve_all_words_in_pretrained_emb,
show_progress=True if params.mode == 'normal' else False, cpu_num_workers = conf.cpu_num_workers,
max_vocabulary=conf.max_vocabulary, word_frequency=conf.min_word_frequency)
## encode rawdata when do not use cache
if conf.use_cache == False:
pass
max_vocabulary=conf.max_vocabulary, word_frequency=conf.min_word_frequency, max_building_lines=conf.max_building_lines)
# environment preparing
## cache save

Просмотреть файл

@ -3,6 +3,7 @@
import logging
import pickle as pkl
import json
import torch
import torch.nn as nn
import os
@ -49,6 +50,17 @@ def dump_to_pkl(obj, pkl_path):
pkl.dump(obj, fout, protocol=pkl.HIGHEST_PROTOCOL)
logging.debug("Obj dumped to %s!" % pkl_path)
def load_from_json(json_path):
data = None
with open(json_path, 'r', encoding='utf-8') as f:
data = json.loads(f.read())
logging.debug("%s loaded!" % json_path)
return data
def dump_to_json(obj, json_path):
with open(json_path, 'w', encoding='utf-8') as f:
f.write(json.dumps(obj))
logging.debug("Obj dumped to %s!" % json_path)
def get_trainable_param_num(model):
""" get the number of trainable parameters
@ -60,9 +72,15 @@ def get_trainable_param_num(model):
"""
if isinstance(model, nn.DataParallel):
model_param = list(model.parameters()) + list(model.module.layers['embedding'].get_parameters())
if isinstance(model.module.layers['embedding'].embeddings, dict):
model_param = list(model.parameters()) + list(model.module.layers['embedding'].get_parameters())
else:
model_param = list(model.parameters())
else:
model_param = list(model.parameters()) + list(model.layers['embedding'].get_parameters())
if isinstance(model.layers['embedding'].embeddings, dict):
model_param = list(model.parameters()) + list(model.layers['embedding'].get_parameters())
else:
model_param = list(model.parameters())
return sum(p.numel() for p in model_param if p.requires_grad)
@ -228,7 +246,7 @@ def md5(file_paths, chunk_size=1024*1024*1024):
""" Calculate a md5 of lists of files.
Args:
file_paths: an iterable object contains files. Files will be concatenated orderly if there are more than one file
file_paths: an iterable object contains file paths. Files will be concatenated orderly if there are more than one file
chunk_size: unit is byte, default value is 1GB
Returns:
md5

Просмотреть файл

@ -16,6 +16,7 @@ import codecs
import copy
from settings import ProblemTypes
import torch
import time
if sys.version_info < (3,):
@ -236,10 +237,10 @@ def get_batches(problem, data, length, target, batch_size, input_types, pad_ids=
logging.info("Start making batches")
if permutate is True:
#CAUTION! data and length would be revised
data = copy.deepcopy(data)
length = copy.deepcopy(length)
if target is not None:
target = copy.deepcopy(target)
# data = copy.deepcopy(data)
# length = copy.deepcopy(length)
# if target is not None:
# target = copy.deepcopy(target)
# shuffle the data
permutation = np.random.permutation(len(list(target.values())[0]))