Source code for nlp_architect.models.absa.train.acquire_terms

# ******************************************************************************
# Copyright 2017-2018 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ******************************************************************************
import copy
import re
import sys
from os import PathLike

from tqdm import tqdm

from nlp_architect.models.absa import TRAIN_LEXICONS, LEXICONS_OUT
from nlp_architect.models.absa import GENERIC_OP_LEX
from nlp_architect.models.absa.inference.data_types import Polarity
from nlp_architect.models.absa.train.data_types import (
    AspectTerm,
    DepRelation,
    DepRelationTerm,
    LoadOpinionStopLists,
    LoadAspectStopLists,
    OpinionTerm,
    QualifiedTerm,
)
from nlp_architect.models.absa.train.rules import rule_1, rule_2, rule_3, rule_4, rule_5, rule_6
from nlp_architect.models.absa.utils import (
    _load_parsed_docs_from_dir,
    _write_final_opinion_lex,
    _load_lex_as_list_from_csv,
    read_generic_lex_from_file,
)


[docs]class AcquireTerms(object): """ Lexicon acquisition. produce opinion lexicon and an aspect lexicon based on input dataset. Attributes: opinion_candidate_list_curr_iter (dict): candidate opinion terms in the current iteration opinion_candidate_list_prev_iter (dict): opinion candidates list of previous iteration opinion_candidate_list (dict): opinion terms learned across all iterations opinion_candidates_list_final (list): final opinion candidates list opinion_candidate_list_raw (dict): all instances of candidate opinion terms across all iterations aspect_candidate_list_curr_iter (dict): candidate terms in the current iteration aspects_candidate_list_prev_iter(list): Aspect candidates list of previous iteration aspect_candidate_list (list): aspect terms learned across all iterations aspect_candidates_list_final (list): final aspect candidates list aspect_candidate_list_raw (dict): all instances of candidate aspect terms across all iterations """ generic_opinion_lex_path = GENERIC_OP_LEX acquired_opinion_terms_path = LEXICONS_OUT / "generated_opinion_lex.csv" acquired_aspect_terms_path = LEXICONS_OUT / "generated_aspect_lex.csv" GENERIC_OPINION_LEX = _load_lex_as_list_from_csv(GENERIC_OP_LEX) GENERAL_ADJECTIVES_LEX = _load_lex_as_list_from_csv(TRAIN_LEXICONS / "GeneralAdjectivesLex.csv") GENERIC_QUANTIFIERS_LEX = _load_lex_as_list_from_csv( TRAIN_LEXICONS / "GenericQuantifiersLex.csv" ) GEOGRAPHICAL_ADJECTIVES_LEX = _load_lex_as_list_from_csv( TRAIN_LEXICONS / "GeographicalAdjectivesLex.csv" ) INTENSIFIERS_LEX = _load_lex_as_list_from_csv(TRAIN_LEXICONS / "IntensifiersLex.csv") TIME_ADJECTIVE_LEX = _load_lex_as_list_from_csv(TRAIN_LEXICONS / "TimeAdjectiveLex.csv") ORDINAL_NUMBERS_LEX = _load_lex_as_list_from_csv(TRAIN_LEXICONS / "OrdinalNumbersLex.csv") PREPOSITIONS_LEX = _load_lex_as_list_from_csv(TRAIN_LEXICONS / "PrepositionsLex.csv") PRONOUNS_LEX = _load_lex_as_list_from_csv(TRAIN_LEXICONS / "PronounsLex.csv") COLORS_LEX = _load_lex_as_list_from_csv(TRAIN_LEXICONS / "ColorsLex.csv") DETERMINERS_LEX = _load_lex_as_list_from_csv(TRAIN_LEXICONS / "DeterminersLex.csv") NEGATION_LEX = _load_lex_as_list_from_csv(TRAIN_LEXICONS / "NegationLex.csv") AUXILIARIES_LEX = _load_lex_as_list_from_csv(TRAIN_LEXICONS / "AuxiliariesLex.csv") OPINION_STOP_LIST = LoadOpinionStopLists( DETERMINERS_LEX, GENERAL_ADJECTIVES_LEX, GENERIC_QUANTIFIERS_LEX, GEOGRAPHICAL_ADJECTIVES_LEX, INTENSIFIERS_LEX, TIME_ADJECTIVE_LEX, ORDINAL_NUMBERS_LEX, PREPOSITIONS_LEX, COLORS_LEX, NEGATION_LEX, ) ASPECT_STOP_LIST = LoadAspectStopLists( GENERIC_OPINION_LEX, DETERMINERS_LEX, GENERAL_ADJECTIVES_LEX, GENERIC_QUANTIFIERS_LEX, GEOGRAPHICAL_ADJECTIVES_LEX, INTENSIFIERS_LEX, TIME_ADJECTIVE_LEX, ORDINAL_NUMBERS_LEX, PREPOSITIONS_LEX, PRONOUNS_LEX, COLORS_LEX, NEGATION_LEX, AUXILIARIES_LEX, ) FILTER_PATTERNS = [re.compile(r".*\d+.*")] FLOAT_FORMAT = "{0:.3g}" # maximum number of iterations NUM_OF_SENTENCES_PER_OPINION_AND_ASPECT_TERM_INC = 35000 def __init__(self, asp_thresh=3, op_thresh=2, max_iter=1): self.opinion_candidate_list_prev_iter = read_generic_lex_from_file( AcquireTerms.generic_opinion_lex_path ) self.generic_sent_dict = copy.deepcopy(self.opinion_candidate_list_prev_iter) self.opinion_candidate_list = {} self.opinion_candidate_list_raw = {} self.opinion_candidate_list_curr_iter = {} self.opinion_candidates_list_final = [] self.aspect_candidate_list_raw = {} self.aspect_candidate_list = list() self.aspect_candidate_list_curr_iter = {} self.aspect_candidates_list_final = [] self.init_aspect_dict = list() self.aspects_candidate_list_prev_iter = list() self.min_freq_aspect_candidate = asp_thresh self.min_freq_opinion_candidate = op_thresh self.max_num_of_iterations = max_iter
[docs] def extract_terms_from_doc(self, parsed_doc): """Extract candidate terms for sentences in parsed document. Args: parsed_doc (ParsedDocument): Input parsed document. """ for text, parsed_sent in parsed_doc.sent_iter(): relations = _get_rel_list(parsed_sent) for rel_entry in relations: if rel_entry.rel != "root": gov_seen = self.opinion_candidate_list_prev_iter.get(rel_entry.gov.text) dep_seen = self.opinion_candidate_list_prev_iter.get(rel_entry.dep.text) opinions = [] aspects = [] # =========================== acquisition rules ============================== if bool(gov_seen) ^ bool(dep_seen): opinions.append(rule_1(rel_entry, gov_seen, dep_seen, text)) if not gov_seen and dep_seen: opinions.append(rule_2(rel_entry, relations, dep_seen, text)) aspects.append(rule_3(rel_entry, relations, text)) aspects.append(rule_4(rel_entry, relations, text)) if ( self.aspects_candidate_list_prev_iter and AspectTerm.from_token(rel_entry.gov) in self.aspects_candidate_list_prev_iter and AspectTerm.from_token(rel_entry.dep) not in self.aspects_candidate_list_prev_iter ): opinions.append(rule_5(rel_entry, text)) aspects.append(rule_6(rel_entry, relations, text)) self._add_opinion_term(opinions) self._add_aspect_term(aspects)
[docs] def extract_opinion_and_aspect_terms(self, parsed_document_iter, num_of_docs): """Extract candidate terms from parsed document iterator. Args: parsed_document_iter (Iterator): Parsed document iterator. num_of_docs (int): number of documents on iterator. """ for parsed_document in tqdm(parsed_document_iter, total=num_of_docs, file=sys.stdout): self.extract_terms_from_doc(parsed_document)
def _is_valid_term(self, cand_term): """Validates a candidate term. Args: cand_term (CandidateTerm): candidate terms list. """ term = str(cand_term) for pattern in self.FILTER_PATTERNS: if pattern.match(term): return False if self.OPINION_STOP_LIST.is_in_stop_list(term): return False if term.lower() != term and term.upper() != term: return False return True def _add_aspect_term(self, terms): """ add new aspect term to table. Args: terms (list of CandidateTerm): candidate terms list """ for term in terms: if term: term_entry = AspectTerm(term.term, term.pos, term.lemma) if ( term_entry not in self.init_aspect_dict and term_entry not in self.aspect_candidate_list and not self.ASPECT_STOP_LIST.is_in_stop_list(term.term[0]) and len(term.term[0]) > 1 ): _insert_new_term_to_table(term, self.aspect_candidate_list_curr_iter) return True def _add_opinion_term(self, terms): """ Add new opinion term to table Args: terms (list of CandidateTerm): candidate term """ for term in terms: if term and self._is_valid_term(term): if str(term.term[0]) not in self.generic_sent_dict.keys(): if str(term.term[0]) not in self.opinion_candidate_list: if len(str(term.term[0])) > 1: if any(c.isalnum() for c in str(term.term[0])): _insert_new_term_to_table( term, self.opinion_candidate_list_curr_iter ) def _insert_new_terms_to_tables(self): """ Insert new terms to tables clear candidates lists from previous iteration """ self.opinion_candidate_list_prev_iter = {} self.opinion_candidate_list_raw = _merge_tables( self.opinion_candidate_list_raw, self.opinion_candidate_list_curr_iter ) for cand_term_list in self.opinion_candidate_list_curr_iter.values(): if len(cand_term_list) >= self.min_freq_opinion_candidate: new_opinion_term = _set_opinion_term_polarity(cand_term_list) self.opinion_candidate_list_prev_iter[str(new_opinion_term)] = new_opinion_term self.opinion_candidate_list_curr_iter = {} self.opinion_candidate_list = { **self.opinion_candidate_list, **self.opinion_candidate_list_prev_iter, } self.aspects_candidate_list_prev_iter = list() self.aspect_candidate_list_raw = _merge_tables( self.aspect_candidate_list_raw, self.aspect_candidate_list_curr_iter ) for extracted_aspect_list in self.aspect_candidate_list_curr_iter.values(): if len(extracted_aspect_list) >= self.min_freq_aspect_candidate: first = extracted_aspect_list[0] new_aspect_entry = AspectTerm(first.term, first.pos, first.lemma) if new_aspect_entry not in self.aspects_candidate_list_prev_iter: self.aspects_candidate_list_prev_iter.append(new_aspect_entry) self.aspect_candidate_list_curr_iter = {} self.aspect_candidate_list = ( self.aspect_candidate_list + self.aspects_candidate_list_prev_iter ) def _write_candidate_opinion_lex(self): """ write generated lexicons to csv files """ LEXICONS_OUT.mkdir(parents=True, exist_ok=True) _write_final_opinion_lex( self.opinion_candidates_list_final, self.acquired_opinion_terms_path )
[docs] def acquire_lexicons(self, parsed_dir: str or PathLike): """Acquire new opinion and aspect lexicons. Args: parsed_dir (PathLike): Path to parsed documents folder. """ parsed_docs = _load_parsed_docs_from_dir(parsed_dir) dataset_sentence_len = 0 for parsed_doc in parsed_docs.values(): dataset_sentence_len += len(parsed_doc.sentences) add_to_thresholds = int( dataset_sentence_len / self.NUM_OF_SENTENCES_PER_OPINION_AND_ASPECT_TERM_INC ) self.min_freq_opinion_candidate += add_to_thresholds self.min_freq_aspect_candidate += add_to_thresholds for iteration_num in range(self.max_num_of_iterations): if ( len(self.opinion_candidate_list_prev_iter) == 0 and len(self.aspects_candidate_list_prev_iter) == 0 ): break print("\n#Iteration: {}".format(iteration_num + 1)) self.extract_opinion_and_aspect_terms(iter(parsed_docs.values()), len(parsed_docs)) self._insert_new_terms_to_tables() self.opinion_candidates_list_final = generate_final_opinion_candidates_list( self.opinion_candidate_list_raw, self.opinion_candidates_list_final, self.min_freq_opinion_candidate, ) self.aspect_candidates_list_final = _generate_final_aspect_candidates_list( self.aspect_candidate_list_raw, self.aspect_candidates_list_final, self.min_freq_aspect_candidate, ) self._write_candidate_opinion_lex() aspect_dict = _add_lemmas_aspect_lex(self.aspect_candidates_list_final) return aspect_dict
def _add_lemmas_aspect_lex(aspect_candidates_list_final): aspect_dict = {} for cand_term in aspect_candidates_list_final: lemma = "" if cand_term.term[0] != cand_term.lemma[0]: lemma = cand_term.lemma[0] aspect_dict[cand_term.term[0]] = lemma # unify aspect with aspect lemmas lemma_to_erase = [] for _, lemma in aspect_dict.items(): if lemma != "" and lemma in aspect_dict: lemma_to_erase.append(lemma) # delete all duplicates (aspects that are lemmas of other aspects) for lemma in lemma_to_erase: if lemma in aspect_dict: del aspect_dict[lemma] return aspect_dict def _get_rel_list(parsed_sentence): res = [] gen_toks = [] for tok in parsed_sentence: gen_toks.append( DepRelationTerm( tok.get("text"), tok.get("lemma"), tok.get("pos"), tok.get("ner"), tok.get("start") ) ) for gen_tok, tok in zip(gen_toks, parsed_sentence): gov_idx = tok["gov"] if gov_idx != -1: res.append(DepRelation(gen_toks[gov_idx], gen_tok, tok["rel"])) return res def _merge_tables(d1, d2): """ Merge dictionaries Args: d1 (dict): first dict to merge d2 (dict): second dict to merge """ for key, l in d2.items(): if key in d1: for item in l: if item not in d1[key]: d1[key].append(item) else: d1[key] = l return d1 def _insert_new_term_to_table(term, curr_table): """ Insert term to table of lists. Args: term (term): term to be inserted curr_table (dict): input table """ table_key_word = str(term) if table_key_word: if table_key_word in curr_table and term not in curr_table[table_key_word]: curr_table[table_key_word].append(term) else: curr_table[table_key_word] = [term] def _set_opinion_term_polarity(terms_list): """Set opinion term polarity. Args: terms_list (list): list of opinion terms """ first = terms_list[0] new_term = first.term positive_pol = 0 negative_pol = 0 pol = None for term in terms_list: try: pol = term.term_polarity except Exception as e: print("extracted_term missing term_polarity: " + str(e)) if pol is not None: if pol == Polarity.POS: positive_pol = positive_pol + 1 if pol == Polarity.NEG: negative_pol = negative_pol + 1 new_term_polarity = Polarity.UNK if positive_pol >= negative_pol and positive_pol > 0: new_term_polarity = Polarity.POS elif negative_pol >= positive_pol and negative_pol > 0: new_term_polarity = Polarity.NEG return OpinionTerm(new_term, new_term_polarity) def _generate_final_aspect_candidates_list( aspect_candidate_list_raw, final_aspect_candidates_list, frequency_threshold ): """ generate final aspect candidates list from map Args: aspect_candidate_list_raw (dict): key = term, value = lists of candidate terms. final_aspect_candidates_list (list): list of final aspect candidates frequency_threshold (int): minimum freq. for qualifying term """ term_polarity = Polarity.UNK for extracted_term_list in aspect_candidate_list_raw.values(): if len(extracted_term_list) >= frequency_threshold: term = extracted_term_list[0] qualified_term = QualifiedTerm( term.term, term.lemma, term.pos, len(extracted_term_list), term_polarity ) final_aspect_candidates_list.append(qualified_term) return final_aspect_candidates_list
[docs]def generate_final_opinion_candidates_list( opinion_candidate_list_raw, final_opinion_candidates_list, frequency_threshold ): """ generate final opinion candidates list from raw opinion candidate list Args: opinion_candidate_list_raw (dict): key = term, value = lists of extracted terms. final_opinion_candidates_list (list): list of final opinion candidates frequency_threshold (int): minimum freq. for qualifying term """ for candidate_list in opinion_candidate_list_raw.values(): positive_pol = 0 negative_pol = 0 if len(candidate_list) >= frequency_threshold: for candidate in candidate_list: pol = candidate.term_polarity if pol is not None: if pol == Polarity.POS: positive_pol = positive_pol + 1 if pol == Polarity.NEG: negative_pol = negative_pol + 1 term_polarity = Polarity.UNK if positive_pol > negative_pol and positive_pol > 0: term_polarity = Polarity.POS elif negative_pol >= positive_pol and negative_pol > 0: term_polarity = Polarity.NEG term = candidate_list[0] qualified_term = QualifiedTerm( term.term, term.term, term.pos, len(candidate_list), term_polarity ) final_opinion_candidates_list.append(qualified_term) return final_opinion_candidates_list