Source code for nlp_architect.models.transformers.base_model

# ******************************************************************************
# Copyright 2017-2019 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ******************************************************************************
import io
import logging
import os
from typing import List, Union

import torch
from torch.utils.data import DataLoader
from tqdm import tqdm, trange
from transformers import (
    AdamW,
    BertConfig,
    BertTokenizer,
    RobertaConfig,
    RobertaTokenizer,
    XLMConfig,
    XLMTokenizer,
    XLNetConfig,
    XLNetTokenizer,
    get_linear_schedule_with_warmup,
)

from nlp_architect.models import TrainableModel
from nlp_architect.models.transformers.quantized_bert import QuantizedBertConfig

logger = logging.getLogger(__name__)


ALL_MODELS = sum(
    (
        tuple(conf.pretrained_config_archive_map.keys())
        for conf in (BertConfig, XLNetConfig, XLMConfig)
    ),
    (),
)


[docs]def get_models(models: List[str]): if models is not None: return [m for m in ALL_MODELS if m.split("-")[0] in models] return ALL_MODELS
[docs]class TransformerBase(TrainableModel): """ Transformers base model (for working with pytorch-transformers models) """ MODEL_CONFIGURATIONS = { "bert": (BertConfig, BertTokenizer), "quant_bert": (QuantizedBertConfig, BertTokenizer), "xlnet": (XLNetConfig, XLNetTokenizer), "xlm": (XLMConfig, XLMTokenizer), "roberta": (RobertaConfig, RobertaTokenizer), } def __init__( self, model_type: str, model_name_or_path: str, labels: List[str] = None, num_labels: int = None, config_name=None, tokenizer_name=None, do_lower_case=False, output_path=None, device="cpu", n_gpus=0, ): """ Transformers base model (for working with pytorch-transformers models) Args: model_type (str): transformer model type model_name_or_path (str): model name or path to model labels (List[str], optional): list of labels. Defaults to None. num_labels (int, optional): number of labels. Defaults to None. config_name ([type], optional): configuration name. Defaults to None. tokenizer_name ([type], optional): tokenizer name. Defaults to None. do_lower_case (bool, optional): lower case input words. Defaults to False. output_path ([type], optional): model output path. Defaults to None. device (str, optional): backend device. Defaults to 'cpu'. n_gpus (int, optional): num of gpus. Defaults to 0. Raises: FileNotFoundError: [description] """ assert model_type in self.MODEL_CONFIGURATIONS.keys(), "unsupported model_type" self.model_type = model_type self.model_name_or_path = model_name_or_path self.labels = labels self.num_labels = num_labels self.do_lower_case = do_lower_case if output_path is not None and not os.path.exists(output_path): raise FileNotFoundError("output_path is not found") self.output_path = output_path self.model_class = None config_class, tokenizer_class = self.MODEL_CONFIGURATIONS[model_type] self.config_class = config_class self.tokenizer_class = tokenizer_class self.tokenizer_name = tokenizer_name self.tokenizer = self._load_tokenizer(self.tokenizer_name) self.config_name = config_name self.config = self._load_config(config_name) self.model = None self.device = device self.n_gpus = n_gpus self._optimizer = None self._scheduler = None self.training_args = None
[docs] def to(self, device="cpu", n_gpus=0): if self.model is not None: self.model.to(device) if n_gpus > 1: self.model = torch.nn.DataParallel(self.model) self.device = device self.n_gpus = n_gpus
@property def optimizer(self): return self._optimizer @optimizer.setter def optimizer(self, opt): self._optimizer = opt @property def scheduler(self): return self._scheduler @scheduler.setter def scheduler(self, sch): self._scheduler = sch
[docs] def setup_default_optimizer( self, weight_decay: float = 0.0, learning_rate: float = 5e-5, adam_epsilon: float = 1e-8, warmup_steps: int = 0, total_steps: int = 0, ): # Prepare optimizer and schedule (linear warmup and decay) no_decay = ["bias", "LayerNorm.weight"] optimizer_grouped_parameters = [ { "params": [ p for n, p in self.model.named_parameters() if not any(nd in n for nd in no_decay) ], "weight_decay": weight_decay, }, { "params": [ p for n, p in self.model.named_parameters() if any(nd in n for nd in no_decay) ], "weight_decay": 0.0, }, ] self.optimizer = AdamW(optimizer_grouped_parameters, lr=learning_rate, eps=adam_epsilon) self.scheduler = get_linear_schedule_with_warmup( self.optimizer, num_warmup_steps=warmup_steps, num_training_steps=total_steps )
def _load_config(self, config_name=None): config = self.config_class.from_pretrained( config_name if config_name else self.model_name_or_path, num_labels=self.num_labels ) return config def _load_tokenizer(self, tokenizer_name=None): tokenizer = self.tokenizer_class.from_pretrained( tokenizer_name if tokenizer_name else self.model_name_or_path, do_lower_case=self.do_lower_case, ) return tokenizer
[docs] def save_model(self, output_dir: str, save_checkpoint: bool = False, args=None): """ Save model/tokenizer/arguments to given output directory Args: output_dir (str): path to output directory save_checkpoint (bool, optional): save as checkpoint. Defaults to False. args ([type], optional): arguments object to save. Defaults to None. """ # Create output directory if needed if not os.path.exists(output_dir): os.makedirs(output_dir) logger.info("Saving model checkpoint to %s", output_dir) model_to_save = self.model.module if hasattr(self.model, "module") else self.model model_to_save.save_pretrained(output_dir) if not save_checkpoint: if self.tokenizer is not None: self.tokenizer.save_pretrained(output_dir) with io.open(output_dir + os.sep + "labels.txt", "w", encoding="utf-8") as fw: for label in self.labels: fw.write("{}\n".format(label)) if args is not None: torch.save(args, os.path.join(output_dir, "training_args.bin"))
[docs] @classmethod def load_model(cls, model_path: str, model_type: str, *args, **kwargs): """ Create a TranformerBase deom from given path Args: model_path (str): path to model model_type (str): model type Returns: TransformerBase: model """ # Load a trained model and vocabulary from given path if not os.path.exists(model_path): raise FileNotFoundError with io.open(model_path + os.sep + "labels.txt") as fp: labels = [line.strip() for line in fp.readlines()] return cls( model_type=model_type, model_name_or_path=model_path, labels=labels, *args, **kwargs )
[docs] @staticmethod def get_train_steps_epochs( max_steps: int, num_train_epochs: int, gradient_accumulation_steps: int, num_samples: int ): """ get train steps and epochs Args: max_steps (int): max steps num_train_epochs (int): num epochs gradient_accumulation_steps (int): gradient accumulation steps num_samples (int): number of samples Returns: Tuple: total steps, number of epochs """ if max_steps > 0: t_total = max_steps num_train_epochs = max_steps // (num_samples // gradient_accumulation_steps) + 1 else: t_total = num_samples // gradient_accumulation_steps * num_train_epochs return t_total, num_train_epochs
[docs] def get_logits(self, batch): self.model.eval() inputs = self._batch_mapper(batch) outputs = self.model(**inputs) return outputs[-1]
def _train( self, data_set: DataLoader, dev_data_set: Union[DataLoader, List[DataLoader]] = None, test_data_set: Union[DataLoader, List[DataLoader]] = None, gradient_accumulation_steps: int = 1, per_gpu_train_batch_size: int = 8, max_steps: int = -1, num_train_epochs: int = 3, max_grad_norm: float = 1.0, logging_steps: int = 50, save_steps: int = 100, best_result_file: str = None, ): """Run model training batch_mapper: a function that maps a batch into parameters that the model expects in the forward method (for use with custom heads and models). If None it will default to the basic models input structure. logging_callback_fn: a function that is called in each evaluation step with the model as a parameter. """ t_total, num_train_epochs = self.get_train_steps_epochs( max_steps, num_train_epochs, gradient_accumulation_steps, len(data_set) ) if self.optimizer is None and self.scheduler is None: logger.info("Loading default optimizer and scheduler") self.setup_default_optimizer(total_steps=t_total) train_batch_size = per_gpu_train_batch_size * max(1, self.n_gpus) logger.info("***** Running training *****") logger.info(" Num examples = %d", len(data_set.dataset)) logger.info(" Num Epochs = %d", num_train_epochs) logger.info(" Instantaneous batch size per GPU/CPU = %d", per_gpu_train_batch_size) logger.info( " Total train batch size (w. parallel, distributed & accumulation) = %d", train_batch_size * gradient_accumulation_steps, ) logger.info(" Gradient Accumulation steps = %d", gradient_accumulation_steps) logger.info(" Total optimization steps = %d", t_total) global_step = 0 best_dev = 0 dev_test = 0 best_model_path = os.path.join(self.output_path, "best_dev") tr_loss, logging_loss = 0.0, 0.0 self.model.zero_grad() train_iterator = trange(num_train_epochs, desc="Epoch") for epoch, _ in enumerate(train_iterator): print("****** Epoch: " + str(epoch)) epoch_iterator = tqdm(data_set, desc="Train iteration") for step, batch in enumerate(epoch_iterator): self.model.train() batch = tuple(t.to(self.device) for t in batch) inputs = self._batch_mapper(batch) outputs = self.model(**inputs) loss = outputs[0] # get loss if self.n_gpus > 1: loss = loss.mean() # mean() to average on multi-gpu parallel training if gradient_accumulation_steps > 1: loss = loss / gradient_accumulation_steps loss.backward() torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_grad_norm) tr_loss += loss.item() if (step + 1) % gradient_accumulation_steps == 0: self.optimizer.step() self.scheduler.step() self.model.zero_grad() global_step += 1 if logging_steps > 0 and global_step % logging_steps == 0: # Log metrics and run evaluation on dev/test best_dev, dev_test = self.update_best_model( dev_data_set, test_data_set, best_dev, dev_test, best_result_file, save_path=best_model_path, ) logger.info("lr = {}".format(self.scheduler.get_lr()[0])) logger.info("loss = {}".format((tr_loss - logging_loss) / logging_steps)) logging_loss = tr_loss if save_steps > 0 and global_step % save_steps == 0: # Save model checkpoint self.save_model_checkpoint( output_path=self.output_path, name="checkpoint-{}".format(global_step) ) if 0 < max_steps < global_step: epoch_iterator.close() break if 0 < max_steps < global_step: train_iterator.close() break logger.info(" global_step = %s, average loss = %s", global_step, tr_loss) logger.info("lr = {}".format(self.scheduler.get_lr()[0])) logger.info("loss = {}".format((tr_loss - logging_loss) / logging_steps)) # final evaluation: self.update_best_model( dev_data_set, test_data_set, best_dev, dev_test, best_result_file, save_path=best_model_path, )
[docs] def update_best_model( self, dev_data_set, test_data_set, best_dev, best_dev_test, best_result_file, save_path=None, ): new_best_dev = best_dev new_test_dev = best_dev_test set_test = False for i, ds in enumerate([dev_data_set, test_data_set]): if ds is None: # got no data loader continue if isinstance(ds, DataLoader): ds = [ds] for d in ds: logits, label_ids = self._evaluate(d) f1 = self.evaluate_predictions(logits, label_ids) if f1: if i == 0 and f1 > best_dev: # dev set new_best_dev = f1 set_test = True if save_path is not None: self.save_model(save_path, args=self.training_args) elif set_test: new_test_dev = f1 set_test = False if best_result_file is not None: with open(best_result_file, "a+") as f: f.write( "best dev= " + str(new_best_dev) + ", test= " + str(new_test_dev) ) logger.info("\n\nBest dev=%s. test=%s\n", str(new_best_dev), str(new_test_dev)) return new_best_dev, new_test_dev
def _evaluate(self, data_set: DataLoader): logger.info("***** Running inference *****") logger.info(" Batch size: {}".format(data_set.batch_size)) eval_loss = 0.0 nb_eval_steps = 0 preds = None out_label_ids = None for batch in tqdm(data_set, desc="Inference iteration"): self.model.eval() batch = tuple(t.to(self.device) for t in batch) with torch.no_grad(): inputs = self._batch_mapper(batch) outputs = self.model(**inputs) if "labels" in inputs: tmp_eval_loss, logits = outputs[:2] eval_loss += tmp_eval_loss.mean().item() else: logits = outputs[0] nb_eval_steps += 1 model_output = logits.detach().cpu() model_out_label_ids = inputs["labels"].detach().cpu() if "labels" in inputs else None if preds is None: preds = model_output out_label_ids = model_out_label_ids else: preds = torch.cat((preds, model_output), dim=0) out_label_ids = ( torch.cat((out_label_ids, model_out_label_ids), dim=0) if out_label_ids is not None else None ) if out_label_ids is None: return preds return preds, out_label_ids def _batch_mapper(self, batch): mapping = { "input_ids": batch[0], "attention_mask": batch[1], # XLM don't use segment_ids "token_type_ids": batch[2] if self.model_type in ["bert", "quant_bert", "xlnet"] else None, } if len(batch) == 4: mapping.update({"labels": batch[3]}) return mapping
[docs] def evaluate_predictions(self, logits, label_ids): raise NotImplementedError( "evaluate_predictions method must be implemented in order to" "be used for dev/test set evaluation" )
[docs] def save_model_checkpoint(self, output_path: str, name: str): """ save model checkpoint Args: output_path (str): output path name (str): name of checkpoint """ output_dir_path = os.path.join(output_path, name) self.save_model(output_dir_path, save_checkpoint=True)
[docs]class InputFeatures(object): """A single set of features of data.""" def __init__(self, input_ids, input_mask, segment_ids, label_id=None, valid_ids=None): self.input_ids = input_ids self.input_mask = input_mask self.segment_ids = segment_ids self.label_id = label_id self.valid_ids = valid_ids