Source code for nlp_architect.models.transformers.sequence_classification

# ******************************************************************************
# Copyright 2017-2019 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ******************************************************************************

import logging
import os
from typing import List, Union

import numpy as np
import torch
from torch.utils.data import DataLoader, SequentialSampler, TensorDataset
from transformers import (
    BertForSequenceClassification,
    RobertaForSequenceClassification,
    XLMForSequenceClassification,
    XLNetForSequenceClassification,
)

from nlp_architect.data.sequence_classification import SequenceClsInputExample
from nlp_architect.models.transformers.base_model import InputFeatures, TransformerBase
from nlp_architect.models.transformers.quantized_bert import QuantizedBertForSequenceClassification
from nlp_architect.utils.metrics import accuracy

logger = logging.getLogger(__name__)


[docs]class TransformerSequenceClassifier(TransformerBase): """ Transformer sequence classifier Args: model_type (str): transformer base model type labels (List[str], optional): list of labels. Defaults to None. task_type (str, optional): task type (classification/regression). Defaults to classification. metric_fn ([type], optional): metric to use for evaluation. Defaults to simple_accuracy. """ MODEL_CLASS = { "bert": BertForSequenceClassification, "quant_bert": QuantizedBertForSequenceClassification, "xlnet": XLNetForSequenceClassification, "xlm": XLMForSequenceClassification, "roberta": RobertaForSequenceClassification, } def __init__( self, model_type: str, labels: List[str] = None, task_type="classification", metric_fn=accuracy, load_quantized=False, *args, **kwargs, ): assert model_type in self.MODEL_CLASS.keys(), "unsupported model type" self.labels = labels self.num_labels = len(labels) super(TransformerSequenceClassifier, self).__init__( model_type, labels=labels, num_labels=self.num_labels, *args, **kwargs ) self.model_class = self.MODEL_CLASS[model_type] if model_type == "quant_bert" and load_quantized: self.model = self.model_class.from_pretrained( self.model_name_or_path, from_tf=bool(".ckpt" in self.model_name_or_path), config=self.config, from_8bit=load_quantized, ) else: self.model = self.model_class.from_pretrained( self.model_name_or_path, from_tf=bool(".ckpt" in self.model_name_or_path), config=self.config, ) self.task_type = task_type self.metric_fn = metric_fn self.to(self.device, self.n_gpus)
[docs] def train( self, train_data_set: DataLoader, dev_data_set: Union[DataLoader, List[DataLoader]] = None, test_data_set: Union[DataLoader, List[DataLoader]] = None, gradient_accumulation_steps: int = 1, per_gpu_train_batch_size: int = 8, max_steps: int = -1, num_train_epochs: int = 3, max_grad_norm: float = 1.0, logging_steps: int = 50, save_steps: int = 100, ): """ Train a model Args: train_data_set (DataLoader): training data set dev_data_set (Union[DataLoader, List[DataLoader]], optional): development set. Defaults to None. test_data_set (Union[DataLoader, List[DataLoader]], optional): test set. Defaults to None. gradient_accumulation_steps (int, optional): num of gradient accumulation steps. Defaults to 1. per_gpu_train_batch_size (int, optional): per GPU train batch size. Defaults to 8. max_steps (int, optional): max steps. Defaults to -1. num_train_epochs (int, optional): number of train epochs. Defaults to 3. max_grad_norm (float, optional): max gradient normalization. Defaults to 1.0. logging_steps (int, optional): number of steps between logging. Defaults to 50. save_steps (int, optional): number of steps between model save. Defaults to 100. """ self._train( train_data_set, dev_data_set, test_data_set, gradient_accumulation_steps, per_gpu_train_batch_size, max_steps, num_train_epochs, max_grad_norm, logging_steps=logging_steps, save_steps=save_steps, )
[docs] def evaluate_predictions(self, logits, label_ids): """ Run evaluation of given logits and truth labels Args: logits: model logits label_ids: truth label ids """ preds = self._postprocess_logits(logits) label_ids = label_ids.numpy() result = self.metric_fn(preds, label_ids) try: output_eval_file = os.path.join(self.output_path, "eval_results.txt") except TypeError: output_eval_file = os.devnull with open(output_eval_file, "w") as writer: logger.info("***** Evaluation results *****") for key in sorted(result.keys()): logger.info(" %s = %s", key, str(result[key])) writer.write("%s = %s\n" % (key, str(result[key])))
[docs] def convert_to_tensors( self, examples: List[SequenceClsInputExample], max_seq_length: int = 128, include_labels: bool = True, ) -> TensorDataset: """ Convert examples to tensor dataset Args: examples (List[SequenceClsInputExample]): examples max_seq_length (int, optional): max sequence length. Defaults to 128. include_labels (bool, optional): include labels. Defaults to True. Returns: TensorDataset: """ features = self._convert_examples_to_features( examples, max_seq_length, self.tokenizer, self.task_type, include_labels, pad_on_left=bool(self.model_type in ["xlnet"]), pad_token=self.tokenizer.convert_tokens_to_ids([self.tokenizer.pad_token])[0], pad_token_segment_id=4 if self.model_type in ["xlnet"] else 0, ) # Convert to Tensors and build dataset all_input_ids = torch.tensor([f.input_ids for f in features], dtype=torch.long) all_input_mask = torch.tensor([f.input_mask for f in features], dtype=torch.long) all_segment_ids = torch.tensor([f.segment_ids for f in features], dtype=torch.long) if include_labels: if self.task_type == "classification": all_label_ids = torch.tensor([f.label_id for f in features], dtype=torch.long) elif self.task_type == "regression": all_label_ids = torch.tensor([f.label_id for f in features], dtype=torch.float) return TensorDataset(all_input_ids, all_input_mask, all_segment_ids, all_label_ids) return TensorDataset(all_input_ids, all_input_mask, all_segment_ids)
[docs] def inference( self, examples: List[SequenceClsInputExample], max_seq_length: int, batch_size: int = 64, evaluate=False, ): """ Run inference on given examples Args: examples (List[SequenceClsInputExample]): examples batch_size (int, optional): batch size. Defaults to 64. Returns: logits """ data_set = self.convert_to_tensors( examples, max_seq_length=max_seq_length, include_labels=evaluate ) inf_sampler = SequentialSampler(data_set) inf_dataloader = DataLoader(data_set, sampler=inf_sampler, batch_size=batch_size) logits = self._evaluate(inf_dataloader) if not evaluate: preds = self._postprocess_logits(logits) else: logits, label_ids = logits preds = self._postprocess_logits(logits) self.evaluate_predictions(logits, label_ids) return preds
def _postprocess_logits(self, logits): preds = logits.numpy() if self.task_type == "classification": preds = np.argmax(preds, axis=1) elif self.task_type == "regression": preds = np.squeeze(preds) return preds def _convert_examples_to_features( self, examples, max_seq_length, tokenizer, task_type, include_labels=True, pad_on_left=False, pad_token=0, pad_token_segment_id=0, mask_padding_with_zero=True, ): """Loads a data file into a list of `InputBatch`s `cls_token_at_end` define the location of the CLS token: - False (Default, BERT/XLM pattern): [CLS] + A + [SEP] + B + [SEP] - True (XLNet/GPT pattern): A + [SEP] + B + [SEP] + [CLS] `cls_token_segment_id` define the segment id associated to the CLS token (0 for BERT, 2 for XLNet) """ if include_labels: label_map = {label: i for i, label in enumerate(self.labels)} features = [] for (ex_index, example) in enumerate(examples): if ex_index % 10000 == 0: logger.info("Writing example %d of %d", ex_index, len(examples)) inputs = tokenizer.encode_plus( example.text, example.text_b, add_special_tokens=True, max_length=max_seq_length, ) input_ids, token_type_ids = inputs["input_ids"], inputs["token_type_ids"] attention_mask = [1 if mask_padding_with_zero else 0] * len(input_ids) padding_length = max_seq_length - len(input_ids) if pad_on_left: input_ids = ([pad_token] * padding_length) + input_ids attention_mask = ( [0 if mask_padding_with_zero else 1] * padding_length ) + attention_mask token_type_ids = ([pad_token_segment_id] * padding_length) + token_type_ids else: input_ids = input_ids + ([pad_token] * padding_length) attention_mask = attention_mask + ( [0 if mask_padding_with_zero else 1] * padding_length ) token_type_ids = token_type_ids + ([pad_token_segment_id] * padding_length) assert len(input_ids) == max_seq_length assert len(attention_mask) == max_seq_length assert len(token_type_ids) == max_seq_length if include_labels: if task_type == "classification": label_id = label_map[example.label] elif task_type == "regression": label_id = float(example.label) else: raise KeyError(task_type) else: label_id = None features.append( InputFeatures( input_ids=input_ids, input_mask=attention_mask, segment_ids=token_type_ids, label_id=label_id, ) ) return features