# ******************************************************************************
# Copyright 2017-2018 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ******************************************************************************
import math
from os import PathLike
from pathlib import Path, PosixPath
from typing import Union
from nlp_architect.common.core_nlp_doc import CoreNLPDoc
from nlp_architect.models.absa import INFERENCE_OUT
from nlp_architect.models.absa.inference.data_types import (
Term,
TermType,
Polarity,
SentimentDoc,
SentimentSentence,
LexiconElement,
)
from nlp_architect.models.absa.utils import (
_read_lexicon_from_csv,
load_opinion_lex,
_load_aspect_lexicon,
parse_docs,
parse_docs_bist,
_load_parsed_docs_from_dir,
)
from tqdm import tqdm
INTENSIFIER_FACTOR = 0.3
VERB_POS = {"VB", "VBD", "VBG", "VBN", "VBP", "VBZ"}
[docs]class SentimentInference:
"""Main class for sentiment inference execution.
Attributes:
opinion_lex: Opinion lexicon as outputted by TrainSentiment module.
aspect_lex: Aspect lexicon as outputted by TrainSentiment module.
intensifier_lex (dict): Pre-defined intensifier lexicon.
negation_lex (dict): Pre-defined negation lexicon.
"""
def __init__(
self,
aspect_lex: Union[str, PathLike],
opinion_lex: Union[str, PathLike, dict],
parse: bool = True,
parser="spacy",
spacy_model="en_core_web_sm",
):
"""Inits SentimentInference with given aspect and opinion lexicons."""
INFERENCE_OUT.mkdir(parents=True, exist_ok=True)
self.opinion_lex = (
opinion_lex if type(opinion_lex) is dict else load_opinion_lex(Path(opinion_lex))
)
self.aspect_lex = _load_aspect_lexicon(Path(aspect_lex))
self.intensifier_lex = _read_lexicon_from_csv("IntensifiersLex.csv")
self.negation_lex = _read_lexicon_from_csv("NegationSentLex.csv")
self.parser_name = parser
if parse:
if parser == "bist":
from nlp_architect.pipelines.spacy_bist import SpacyBISTParser
self.parser = SpacyBISTParser(spacy_model=spacy_model)
elif parser == "spacy":
from nlp_architect.utils.text import SpacyInstance
disable = [
"merge_noun_chunks",
"ner",
"entity_linker",
"textcat",
"entity_ruler",
"sentencizer",
"merge_entities",
]
self.parser = SpacyInstance(
model=spacy_model, disable=disable, ptb_pos=True, n_jobs=1
)
else:
self.parser = None
[docs] def parse_data(self, data: Union[PathLike, PosixPath], out_dir: Union[str, PathLike]):
if out_dir:
Path(out_dir).mkdir(parents=True, exist_ok=True)
parse_func = parse_docs_bist if self.parser_name == "bist" else parse_docs
parse_func(self.parser, data, out_dir=out_dir)
return out_dir
[docs] def run(self, doc: str = None, parsed_doc: CoreNLPDoc = None) -> SentimentDoc:
"""Run SentimentInference on a single document.
Returns:
The sentiment annotated document, which contains the detected events per sentence.
"""
if not parsed_doc:
if not self.parser:
raise RuntimeError("Parser not initialized (try parse=True at init)")
parsed_doc = self.parser.parse([doc])[0]
sentiment_doc = None
for sentence in parsed_doc.sentences:
events = []
scores = []
for aspect_row in self.aspect_lex:
_, asp_events = self._extract_event(aspect_row, sentence)
for asp_event in asp_events:
events.append(asp_event)
scores += [term.score for term in asp_event if term.type == TermType.ASPECT]
if events:
if not sentiment_doc:
sentiment_doc = SentimentDoc(parsed_doc.doc_text)
sentiment_doc.sentences.append(
SentimentSentence(
sentence[0]["start"],
sentence[-1]["start"] + sentence[-1]["len"] - 1,
events,
)
)
return sentiment_doc
[docs] def run_multiple(
self,
data: Union[str, PathLike] = None,
parsed_data: Union[str, PathLike] = None,
out_dir: Union[str, PathLike] = INFERENCE_OUT,
):
if not parsed_data:
if not self.parser:
raise RuntimeError("Parser not initialized (try parse=True at init)")
parsed_dir = Path(out_dir) / "parsed" / Path(data).stem
parsed_data = self.parse_data(data, out_dir=parsed_dir)
sentiment_docs = {}
for f, parsed_doc in tqdm(_load_parsed_docs_from_dir(out_dir)):
sentiment_doc = self.run(parsed_doc=parsed_doc)
sentiment_docs[f] = sentiment_doc
return sentiment_docs
def _extract_intensifier_terms(self, toks, sentiment_index, polarity, sentence):
"""Extract intensifier events from sentence."""
count = 0
terms = []
for intens_i, intens in [(i, x) for i, x in enumerate(toks) if x in self.intensifier_lex]:
if math.fabs(sentiment_index - intens_i) == 1:
score = self.intensifier_lex[intens].score
terms.append(
Term(
intens,
TermType.INTENSIFIER,
polarity,
score,
sentence[intens_i]["start"],
sentence[intens_i]["len"],
)
)
count += abs(score + float(INTENSIFIER_FACTOR))
return count if count != 0 else 1, terms
def _extract_neg_terms(self, toks: list, op_i: int, sentence: list) -> tuple:
"""Extract negation terms from sentence.
Args:
toks: Sentence text broken down to tokens (words).
op_i: Index of opinion term in sentence.
sentence: parsed sentence
Returns:
List of negation terms and its aggregated sign (positive or negative).
"""
sign = 1
terms = []
gov_op_i = sentence[op_i]["gov"]
dep_op_indices = [sentence.index(x) for x in sentence if x["gov"] == op_i]
for neg_i, negation in [(i, x) for i, x in enumerate(toks) if x in self.negation_lex]:
position = self.negation_lex[negation].position
dist = op_i - neg_i
before = position == "before" and (dist == 1 or neg_i in dep_op_indices)
after = position == "after" and (dist == -1 or neg_i == gov_op_i)
both = position == "both" and dist in (1, -1)
if before or after or both:
terms.append(
Term(
negation,
TermType.NEGATION,
Polarity.NEG,
self.negation_lex[negation].score,
sentence[toks.index(negation)]["start"],
sentence[toks.index(negation)]["len"],
)
)
sign *= self.negation_lex[negation].score
return terms, sign
def _extract_event(self, aspect_row: LexiconElement, parsed_sentence: list) -> tuple:
"""Extract opinion and aspect terms from sentence."""
event = []
sent_aspect_pair = None
real_aspect_indices = _consolidate_aspects(aspect_row.term, parsed_sentence)
aspect_key = aspect_row.term[0]
for aspect_index_range in real_aspect_indices:
for word_index in aspect_index_range:
sent_aspect_pair, event = self._detect_opinion_aspect_events(
word_index, parsed_sentence, aspect_key, aspect_index_range
)
if sent_aspect_pair:
break
return sent_aspect_pair, event
@staticmethod
def _modify_for_multiple_word(cur_tkn, parsed_sentence, index_range):
"""Modify multiple-word aspect tkn length and start index.
Args:
index_range: The index range of the multi-word aspect.
Returns:
The modified aspect token.
"""
if len(index_range) >= 2:
cur_tkn["start"] = parsed_sentence[index_range[0]]["start"]
cur_tkn["len"] = len(parsed_sentence[index_range[0]]["text"])
for i in index_range[1:]:
cur_tkn["len"] = int(cur_tkn["len"]) + len(parsed_sentence[i]["text"]) + 1
return cur_tkn
def _detect_opinion_aspect_events(self, aspect_index, parsed_sent, aspect_key, index_range):
"""Extract opinion-aspect events from sentence.
Args:
aspect_index: index of aspect in sentence.
parsed_sent: current sentence parse tree.
aspect_key: main aspect term serves as key in aspect dict.
index_range: The index range of the multi word aspect.
Returns:
List of aspect sentiment pair, and list of events extracted.
"""
all_pairs, events = [], []
sentence_text_list = [x["text"] for x in parsed_sent]
sentence_text = " ".join(sentence_text_list)
for tok_i, tok in enumerate(parsed_sent):
aspect_op_pair = []
terms = []
gov_i = tok["gov"]
gov = parsed_sent[gov_i]
gov_text = gov["text"]
tok_text = tok["text"]
# 1st order rules
# Is cur_tkn an aspect and gov an opinion?
if tok_i == aspect_index:
if gov_text.lower() in self.opinion_lex:
aspect_op_pair.append(
(self._modify_for_multiple_word(tok, parsed_sent, index_range), gov)
)
# Is gov an aspect and cur_tkn an opinion?
if gov_i == aspect_index and tok_text.lower() in self.opinion_lex:
aspect_op_pair.append(
(self._modify_for_multiple_word(gov, parsed_sent, index_range), tok)
)
# If not found, try 2nd order rules
if not aspect_op_pair and tok_i == aspect_index:
# 2nd order rule #1
for op_t in parsed_sent:
if op_t["gov"] == gov_i and op_t["text"].lower() in self.opinion_lex:
aspect_op_pair.append(
(self._modify_for_multiple_word(tok, parsed_sent, index_range), op_t)
)
# 2nd order rule #2
gov_gov = parsed_sent[parsed_sent[gov_i]["gov"]]
if gov_gov["text"].lower() in self.opinion_lex:
aspect_op_pair.append(
(self._modify_for_multiple_word(tok, parsed_sent, index_range), gov_gov)
)
# if aspect_tok found
for aspect, opinion in aspect_op_pair:
op_tok_i = parsed_sent.index(opinion)
score = self.opinion_lex[opinion["text"].lower()].score
neg_terms, sign = self._extract_neg_terms(sentence_text_list, op_tok_i, parsed_sent)
polarity = Polarity.POS if score * sign > 0 else Polarity.NEG
intensifier_score, intensifier_terms = self._extract_intensifier_terms(
sentence_text_list, op_tok_i, polarity, parsed_sent
)
over_all_score = score * sign * intensifier_score
terms.append(
Term(
aspect_key,
TermType.ASPECT,
polarity,
over_all_score,
aspect["start"],
aspect["len"],
)
)
terms.append(
Term(
opinion["text"],
TermType.OPINION,
polarity,
over_all_score,
opinion["start"],
opinion["len"],
)
)
if len(neg_terms) > 0:
terms = terms + neg_terms
if len(intensifier_terms) > 0:
terms = terms + intensifier_terms
all_pairs.append(
[aspect_key, opinion["text"], over_all_score, polarity, sentence_text]
)
events.append(terms)
return all_pairs, events
def _sentence_contains_after(sentence, index, phrase):
"""Returns sentence contains phrase after given index."""
for i in range(len(phrase)):
if len(sentence) <= index + i or phrase[i].lower() not in {
sentence[index + i][field].lower() for field in ("text", "lemma")
}:
return False
return True
def _consolidate_aspects(aspect_row, sentence):
"""Returns consolidated indices of aspect terms in sentence.
Args:
aspect_row: List of aspect terms which belong to the same aspect-group.
"""
indices = []
aspect_phrases: list = sorted(
[phrase.split(" ") for phrase in aspect_row], key=len, reverse=True
)
appeared = set()
for tok_i in range(len(sentence)):
for aspect_phrase in aspect_phrases:
if _sentence_contains_after(sentence, tok_i, aspect_phrase):
span = range(tok_i, tok_i + len(aspect_phrase))
if not appeared & set(span):
appeared |= set(span)
indices.append(list(span))
return indices