Source code for nlp_architect.data.utils

# ******************************************************************************
# Copyright 2017-2019 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ******************************************************************************
from __future__ import absolute_import, division, print_function

import csv
import os
import random
import sys
from abc import ABC
from io import open
from typing import List, Tuple


[docs]class InputExample(ABC): """Base class for a single training/dev/test example """ def __init__(self, guid: str, text, label=None): self.guid = guid self.text = text self.text_a = text # for compatibility with trasformer library self.label = label
[docs]class DataProcessor(object): """Base class for data converters for sequence/token classification data sets."""
[docs] def get_train_examples(self): """Gets a collection of `InputExample`s for the train set.""" raise NotImplementedError()
[docs] def get_dev_examples(self): """Gets a collection of `InputExample`s for the dev set.""" raise NotImplementedError()
[docs] def get_test_examples(self): """Gets a collection of `InputExample`s for the test set.""" raise NotImplementedError()
[docs] def get_labels(self): """Gets the list of labels for this data set.""" raise NotImplementedError()
[docs]class Task: """A task definition class Args: name (str): the name of the task processor (DataProcessor): a DataProcessor class containing a dataset loader data_dir (str): path to the data source task_type (str): the task type (classification/regression/tagging) """ def __init__(self, name: str, processor: DataProcessor, data_dir: str, task_type: str): self.name = name self.processor = processor self.data_dir = data_dir self.task_type = task_type
[docs] def get_train_examples(self): return self.processor.get_train_examples(self.data_dir)
[docs] def get_dev_examples(self): return self.processor.get_dev_examples(self.data_dir)
[docs] def get_test_examples(self): return self.processor.get_test_examples(self.data_dir)
[docs] def get_labels(self): return self.processor.get_labels()
[docs]def read_tsv(input_file, quotechar=None): """Reads a tab separated value file.""" with open(input_file, "r", encoding="utf-8-sig") as f: reader = csv.reader(f, delimiter="\t", quotechar=quotechar) lines = [] for line in reader: if sys.version_info[0] == 2: line = list(str(cell, "utf-8") for cell in line) # noqa: F821 lines.append(line) return lines
[docs]def read_column_tagged_file(filename: str, tag_col: int = -1, ignore_token: str = None): """Reads column tagged (CONLL) style file (tab separated and token per line) tag_col is the column number to use as tag of the token (defualts to the last in line) Args: filename (str): input file path tag_col (int): the column contains the labels ignore_token (str): a str token to exclude return format : [ ['token', 'TAG'], ['token', 'TAG2'],... ] """ data = [] sentence = [] labels = [] with open(filename) as fp: for line in fp: line = line.strip() if len(line) == 0: if len(sentence) > 0: data.append((sentence, labels)) sentence = [] labels = [] continue splits = line.split() token = splits[0] if token != ignore_token: sentence.append(token) labels.append(splits[tag_col]) if len(sentence) > 0: data.append((sentence, labels)) return data
[docs]def write_column_tagged_file(filename: str, data: List[Tuple]): file_dir = "{}".format(os.sep).join(filename.split(os.sep)[:-1]) if not os.path.exists(file_dir): raise FileNotFoundError with open(filename, "w", encoding="utf-8") as fw: for sen in data: cols = len(sen) items = len(sen[0]) for i in range(items): line = "\t".join([sen[c][i] for c in range(cols)]) + "\n" fw.write(line) fw.write("\n")
[docs]def sample_label_unlabeled(samples: List[InputExample], no_labeled: int, no_unlabeled: int): """ Randomly sample 2 sets of samples from a given collection of InputExamples (used for semi-supervised models) """ num_of_examples = len(samples) assert no_labeled > 0 and no_unlabeled > 0, "Must provide no_samples > 0" assert ( num_of_examples >= no_labeled + no_unlabeled ), "num of total samples smaller than requested sub sets" all_indices = list(range(num_of_examples)) labeled_indices = random.sample(all_indices, no_labeled) remaining_indices = list(set(all_indices).difference(set(labeled_indices))) unlabeled_indices = random.sample(remaining_indices, no_unlabeled) label_samples = [samples[i] for i in labeled_indices] unlabel_samples = [samples[i] for i in unlabeled_indices] return label_samples, unlabel_samples
[docs]def split_column_dataset( first_count: int, second_count: int, out_folder, dataset, first_filename, second_filename, tag_col=-1, ): """ Splits a single column tagged dataset into two files according to the amount of examples requested to be included in each file. first_count (int) : the amount of examples to include in the first split file second_count (int) : the amount of examples to include in the second split file out_folder (str) : the folder in which the result files will be stored dataset (str) : the path to the original data file first_filename (str) : the name of the first split file second_filename (str) : the name of the second split file tag_col (int) : the index of the tag column """ lines = read_column_tagged_file(dataset, tag_col=tag_col) num_of_examples = len(lines) assert first_count + second_count <= num_of_examples and first_count > 0 selected_lines = random.sample(lines, first_count + second_count) first_data = selected_lines[:first_count] second_data = selected_lines[first_count:] write_column_tagged_file(out_folder + os.sep + first_filename, first_data) if second_count != 0: write_column_tagged_file(out_folder + os.sep + second_filename, second_data)
[docs]def get_cached_filepath(data_dir, model_name, seq_length, task_name, set_type="train"): """get cached file name Arguments: data_dir {str} -- data directory string model_name {str} -- model name seq_length {int} -- max sequence length task_name {str} -- name of task Keyword Arguments: set_type {str} -- set type (choose from train/dev/test) (default: {"train"}) Returns: str -- cached filename """ cached_features_file = os.path.join( data_dir, "cached_{}_{}_{}_{}".format( set_type, list(filter(None, model_name.split("/"))).pop(), str(seq_length), str(task_name), ), ) return cached_features_file