Source code for nlp_architect.models.absa.train.rerank_terms

# ******************************************************************************
# Copyright 2017-2018 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ******************************************************************************
import csv
import pickle
import numpy as np
import tensorflow
from os import PathLike
from pathlib import Path

from nlp_architect.models.absa.utils import _read_generic_lex_for_similarity
from nlp_architect.models.absa import TRAIN_OUT, TRAIN_LEXICONS, GENERIC_OP_LEX, LEXICONS_OUT

from scipy.spatial.distance import cosine
from sklearn.model_selection import StratifiedKFold

# pylint: disable=import-error
from tensorflow.keras.layers import Dense, Dropout
from tensorflow.keras.models import Sequential, load_model

from nlp_architect.utils.embedding import load_word_embeddings


[docs]class RerankTerms(object): model_dir = TRAIN_OUT / "reranking_model" train_rerank_data_path = TRAIN_LEXICONS / "RerankTrainingData.csv" PREDICTION_THRESHOLD = 0.7 def __init__( self, vector_cache=True, rerank_model: PathLike = None, emb_model_path: PathLike = None ): # model and training params self.embeddings_len = 300 self.activation_1 = "relu" self.activation_2 = "relu" self.activation_3 = "sigmoid" self.loss = "binary_crossentropy" self.optimizer = "rmsprop" self.epochs_and_batch_size = [(10, 2)] self.seeds = [3] self.threshold = 0.5 self.sim_lexicon = TRAIN_LEXICONS / "RerankSentSimLex.csv" self.generic_lexicon = GENERIC_OP_LEX self.vector_cache = vector_cache self.word_vectors_dict = {} self.vectors_sim_dict = {} self.rerank_model_path = rerank_model self.emb_model_path = emb_model_path LEXICONS_OUT.mkdir(parents=True, exist_ok=True) tensorflow.logging.set_verbosity(tensorflow.logging.ERROR)
[docs] def calc_cosine_similarity(self, word_1, word_2, embedding_dict): """ calculate cosine similarity scores between 2 terms Args: word_1 (str): 1st input word word_2 (str): 2nd input word embedding_dict (dict): embedding dictionary Returns: vectors_sim_dict[key] (float): similarity scores between the 2 input words """ key = str(sorted([word_1, word_2])) if not self.vector_cache or key not in self.vectors_sim_dict: vector_1 = embedding_dict.get(word_1) vector_2 = embedding_dict.get(word_2) # check if both words have vectors if np.count_nonzero(vector_1) > 0 and np.count_nonzero(vector_2) > 0: sim_score = cosine(vector_1, vector_2) else: sim_score = None self.vectors_sim_dict[key] = sim_score return self.vectors_sim_dict[key]
[docs] def calc_similarity_scores_for_all_terms(self, terms, generic_terms, embedding_dict): """ calculate similarity scores between each term and each off the generic terms Args: terms: candidate terms generic_terms: generic opinion terms embedding_dict: embedding dictionary Returns: neg_all: similarity scores between each cand term and neg generic term pos_all: similarity scores between each cand term and pos generic term """ print("\nComputing similarity scores...\n") neg_all = [] pos_all = [] for term in terms: polarity_sim_dic = {"NEG": [], "POS": []} for generic_term, polarity in generic_terms.items(): sim_score = self.calc_cosine_similarity(term, generic_term, embedding_dict) if sim_score is not None: polarity_sim_dic[polarity].append(sim_score) else: polarity_sim_dic[polarity].append(float(0)) neg_all.append(polarity_sim_dic["NEG"]) pos_all.append(polarity_sim_dic["POS"]) return neg_all, pos_all
[docs] @staticmethod def load_terms_and_polarities(filename): """ load terms and polarities from file Args: filename: feature table file full path Returns: terms: candidate terms polarities: opinion polarity per term """ print("Loading training data from {} ...".format(filename)) table = np.genfromtxt(filename, delimiter=",", skip_header=1, dtype=str) if table.size == 0: raise ValueError("Error: Term file is empty, no terms to re-rank.") try: terms = table[:, 1] except Exception as e: print("\n\nError converting str to float in training table: {}".format(e)) polarities = table[:, 3].astype(str) if len(terms) != len(polarities): raise ValueError( "Count of opinion terms is different than the count of loaded polarities." ) polarities = {terms[i]: polarities[i] for i in range(len(terms))} print(str(terms.shape[0]) + " features loaded from CSV file") return terms, polarities
[docs] @staticmethod def load_terms_and_y_labels(filename): """Load terms and Y labels from feature file. Args: filename: feature table file full path Returns: x: feature vector y: labels vector terms: candidate terms polarities: opinion polarity per term """ print("Loading basic features from {} ...".format(filename)) table = np.genfromtxt(filename, delimiter=",", skip_header=1, dtype=str) if table.size == 0: raise ValueError("Error: Terms file is empty, no terms to re-rank.") try: terms = table[:, 1] except Exception as e: print("\n\nError converting str to float in training table: {}".format(e)) y = table[:, 0].astype(int) polarities = None print(str(terms.shape[0]) + " features loaded from CSV file") return y, terms, polarities
[docs] @staticmethod def concat_sim_scores_and_features(x, neg_sim, pos_sim): """ concatenate similarity scores to features Args: x: feature vector neg_sim: similarity scores between cand terms and neg opinion terms pos_sim: similarity scores between cand terms and pos opinion terms Returns: x: concatenated features and similarity scores """ neg = np.array(neg_sim) pos = np.array(pos_sim) neg_avg = np.mean(neg, axis=1, keepdims=True) neg_std = np.std(neg, axis=1, keepdims=True) neg_min = np.min(neg, axis=1, keepdims=True) neg_max = np.max(neg, axis=1, keepdims=True) pos_avg = np.mean(pos, axis=1, keepdims=True) pos_std = np.std(pos, axis=1, keepdims=True) pos_min = np.min(pos, axis=1, keepdims=True) pos_max = np.max(pos, axis=1, keepdims=True) print("\nAdding polarity similarity features...") res_x = np.concatenate( (neg_avg, neg_std, neg_min, neg_max, pos_avg, pos_std, pos_min, pos_max, x), 1 ) return res_x
[docs] def generate_embbeding_features(self, terms, embedding_dict): """ concatenate word embedding to features Args: terms: candidate terms embedding_dict: embedding dictionary word_to_emb_idx: index to embedding dictionary Returns: x: concatenated features and word embs """ print("\nAdding word vector features...\n") vec_matrix = np.zeros((len(terms), self.embeddings_len)) j = 0 for term in terms: word_vector = embedding_dict.get(term) vec_matrix[j, :] = word_vector j += 1 x = vec_matrix[:j] return x
[docs] def load_terms_and_y_labels_and_generate_features(self, filename): """ load candidate terms with their basic features, Y labels and polarities from feature file Args: filename: feature table file path Returns: x: feature vector y: labels vector terms: candidate terms polarities: opinion polarity per term """ print("\nLoading feature table...\n") y, terms, polarities = self.load_terms_and_y_labels(filename) x, terms, polarities = self.generate_features(terms, polarities) y_vector = None if y is not None: y_vector = np.reshape(y, (y.shape[0], 1)) return x, y, y_vector, terms, polarities
[docs] def load_terms_and_generate_features(self, filename): """ load candidate terms with their basic features, Y labels and polarities from feature file Args: filename: feature table file path Returns: x: feature vector terms: candidate terms polarities: opinion polarity per term """ print("\nLoading feature table...\n") terms, polarities = self.load_terms_and_polarities(filename) x, terms, polarities = self.generate_features(terms, polarities) return x, terms, polarities
@staticmethod def _determine_unk_polarities(terms, polarities, neg, pos): for i, term in enumerate(terms): if np.average(pos[i]) <= np.average(neg[i]): polarities[term] = "POS" else: polarities[term] = "NEG" return polarities
[docs] def generate_features(self, terms, polarities): generic_terms = _read_generic_lex_for_similarity(self.generic_lexicon) # generate unified list of candidate terms and generic terms terms_list = [term for term in terms] for term in generic_terms.keys(): terms_list.append(term.strip("'\"")) print("\nLoading embedding model...\n") embedding_dict, _ = load_word_embeddings(self.emb_model_path, terms_list) x = self.generate_embbeding_features(terms, embedding_dict) neg, pos = self.calc_similarity_scores_for_all_terms(terms, generic_terms, embedding_dict) x = self.concat_sim_scores_and_features(x, neg, pos) polarities = self._determine_unk_polarities(terms, polarities, neg, pos) print("\nDimensions of X: " + str(x.shape)) return x, terms, polarities
[docs] def evaluate(self, model, x_test, y_test, terms): report = {} predictions = model.predict(x_test, verbose=0) tp = 0 fp = 0 tn = 0 fn = 0 for i, prediction in enumerate(predictions): y_true = y_test[i][0] if prediction[0] > self.threshold: y_pred = 1 else: y_pred = 0 report[terms[i]] = (prediction[0], y_pred, y_true) if y_pred == 1: if y_true == 1: tp = tp + 1 else: fp = fp + 1 elif y_true == 0: tn = tn + 1 else: fn = fn + 1 prec = 100 * tp / (tp + fp) rec = 100 * tp / (tp + fn) f1 = 2 * (prec * rec) / (prec + rec) return (prec, rec, f1), report
[docs] def generate_model(self, input_vector_dimension): """Generate MLP model. Args: input_vector_dimension (int): word emb vec length Returns: """ mlp_model = Sequential() mlp_model.add(Dense(128, activation=self.activation_1, input_dim=input_vector_dimension)) mlp_model.add(Dropout(0.5)) mlp_model.add(Dense(64, activation=self.activation_2)) mlp_model.add(Dropout(0.5)) mlp_model.add(Dense(1, activation=self.activation_3)) mlp_model.compile(metrics=["accuracy"], loss=self.loss, optimizer=self.optimizer) return mlp_model
[docs] def predict(self, input_table_file, generic_opinion_terms): """Predict classification class according to model. Args: input_table_file: feature(X) and labels(Y) table file generic_opinion_terms: generic opinion terms file name Returns: final_concat_opinion_lex: reranked_lex conctenated with generic lex """ x, terms, polarities = self.load_terms_and_generate_features(input_table_file) model = load_model(self.rerank_model_path) reranked_lexicon = model.predict(x, verbose=0) reranked_lex = {} for i, prediction in enumerate(reranked_lexicon): if not np.isnan(prediction[0]) and prediction[0] > self.PREDICTION_THRESHOLD: reranked_lex[terms[i]] = (prediction[0], polarities[terms[i]]) final_concat_opinion_lex = self._generate_concat_reranked_lex( reranked_lex, generic_opinion_terms ) return final_concat_opinion_lex
[docs] def rerank_train(self): """Class for training a reranking model.""" x, y, _, _, _ = self.load_terms_and_y_labels_and_generate_features( self.train_rerank_data_path ) try: print("\nModel training...") model = self.generate_model(x.shape[1]) e = self.epochs_and_batch_size[0][0] b = self.epochs_and_batch_size[0][1] model.fit(x, y, epochs=e, batch_size=b, verbose=0) RerankTerms.model_dir.mkdir(parents=True, exist_ok=True) model.save(str(RerankTerms.model_dir) + "/rerank_model.h5") print("\nSaved model to: " + str(RerankTerms.model_dir) + "/rerank_model.h5") except ZeroDivisionError: print("Division by zero, skipping test")
[docs] def cross_validation_training(self, verbose=False): """Perform k fold cross validation and evaluate the results.""" final_report = {} x, y, y_vector, terms, _ = self.load_terms_and_y_labels_and_generate_features( self.train_rerank_data_path ) for seed in self.seeds: np.random.seed(seed) for epochs, batch_size in self.epochs_and_batch_size: self.print_params(batch_size, epochs, seed) k_fold = StratifiedKFold(n_splits=5, shuffle=True, random_state=seed) f1_scores = [] precision_scores = [] recall_scores = [] try: for i, (train, test) in enumerate(k_fold.split(x, y)): model = self.generate_model(x.shape[1]) model.fit( x[train], y_vector[train], epochs=epochs, batch_size=batch_size, verbose=0, ) measures, report = self.evaluate( model, x[test], y_vector[test], terms[test] ) final_report.update(report) precision, recall, f1 = measures f1_scores.append(f1) precision_scores.append(precision) recall_scores.append(recall) if verbose: print("Fold " + str(i + 1) + ":") self.print_evaluation_results(precision, recall, f1) print("\nSummary:") self.print_evaluation_results(precision_scores, recall_scores, f1_scores) except ZeroDivisionError: print("Division by zero, skipping test") self.write_evaluation_report(final_report)
[docs] def print_params(self, batch_size, epochs, seed): """Print training params. Args: batch_size(int): batch size epochs(int): num of epochs seed(int): seed """ print( "\nModel Parameters: act_1= " + self.activation_1 + ", act_2= " + self.activation_2 + ", act_3= " + self.activation_3 + ", loss= " + self.loss + ", optimizer= " + self.optimizer + "\nseed= " + str(seed) + ", epochs= " + str(epochs) + ", batch_size= " + str(batch_size) + ", threshold= " + str(self.threshold) + ", use_complete_w2v= " + ", sim_lexicon= " + str(self.sim_lexicon) + "\n" )
[docs] def print_evaluation_results(self, precision, recall, f1): """Print evaluation results. Args: precision(list of float): precision recall(list of float): recall f1(list of float): f measure """ print() self.print_measure("Precision", precision) self.print_measure("Recall", recall) self.print_measure("F-measure", f1) print( "-------------------------------------------------------------------------" "------------------------------" )
[docs] @staticmethod def print_measure(measure, value): """Print single measure. Args: measure(str): measure type value(list of float): value """ print(measure + ": {:.2f}%".format(np.mean(value)), end="") if not np.isscalar(value): print(" (+/- {:.2f}%)".format(np.std(value)), end="") print()
@staticmethod def _generate_concat_reranked_lex(acquired_opinion_lex, generic_opinion_lex_file): print("Loading generic sentiment terms from {}...".format(generic_opinion_lex_file)) generics_table = np.genfromtxt( generic_opinion_lex_file, delimiter=",", skip_header=1, dtype=str ) print(str(generics_table.shape[0]) + " generic sentiment terms loaded") concat_opinion_dict = {} for key, value in acquired_opinion_lex.items(): concat_opinion_dict[key] = (value[0], value[1], "Y") for row in generics_table: concat_opinion_dict[row[0]] = (row[2], row[1], "N") return concat_opinion_dict @staticmethod def _write_prediction_results(concat_opinion_dict, out_override): out_dir = Path(out_override) if out_override else LEXICONS_OUT out_path = out_dir / "generated_opinion_lex_reranked.csv" with open(out_path, "w") as csv_file: writer = csv.writer(csv_file) writer.writerow(["Term", "Score", "Polarity", "isAcquired"]) for key, value in concat_opinion_dict.items(): writer.writerow([key, value[0], value[1], value[2]]) print("Reranked opinion lexicon written to {}".format(out_path))
[docs] @staticmethod def write_evaluation_report(report_dic): RerankTerms.model_dir.mkdir(parents=True, exist_ok=True) out_path = RerankTerms.model_dir / "rerank_classifier_results.csv" with open(out_path, "w", encoding="utf-8") as csv_file: writer = csv.writer(csv_file) writer.writerow(["term", "score", "y_pred", "y_true"]) for key, value in report_dic.items(): writer.writerow([key, value[0], value[1], value[2]]) print("Report written to {}" + str(out_path))
[docs] @staticmethod def load_word_vectors_dict(): try: with open(RerankTerms.model_dir / "word_vectors_dict.pickle", "rb") as f: ret = pickle.load(f) except OSError: ret = {} return ret