# ******************************************************************************
# Copyright 2017-2018 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ******************************************************************************
import re
import string
import sys
from os import path
from typing import List, Tuple
import spacy
from nltk import WordNetLemmatizer
from nltk.stem.snowball import EnglishStemmer
from spacy.cli.download import download as spacy_download
from spacy.lang.en import LEMMA_EXC, LEMMA_INDEX, LEMMA_RULES
from spacy.lemmatizer import Lemmatizer
from nlp_architect.utils.generic import license_prompt
from joblib import Parallel, delayed
from functools import partial
from spacy.util import minibatch
from nlp_architect.common.core_nlp_doc import CoreNLPDoc
from pathlib import Path
[docs]class Vocabulary:
    """
    A vocabulary that maps words to ints (storing a vocabulary)
    """
    def __init__(self, start=0, include_oov=True):
        self._vocab = {}
        self._rev_vocab = {}
        self.include_oov = include_oov
        if include_oov:
            self._vocab["<UNK>"] = start
            self.oov_id = start
            self._rev_vocab[start] = "<UNK>"
            self.next = start + 1
        else:
            self.next = start
[docs]    def add(self, word):
        """
        Add word to vocabulary
        Args:
            word (str): word to add
        Returns:
            int: id of added word
        """
        if word not in self._vocab.keys():
            self._vocab[word] = self.next
            self._rev_vocab[self.next] = word
            self.next += 1
        return self._vocab.get(word) 
[docs]    def word_id(self, word):
        """
        Get the word_id of given word
        Args:
            word (str): word from vocabulary
        Returns:
            int: int id of word
        """
        if hasattr(self, "oov_id"):
            return self._vocab.get(word, self.oov_id)
        return self._vocab.get(word, None) 
    def __getitem__(self, item):
        """
        Get the word_id of given word (same as `word_id`)
        """
        return self.word_id(item)
    def __len__(self):
        vocab_size = len(self._vocab)
        if hasattr(self, "include_oov") and self.include_oov:
            vocab_size += 1
        return vocab_size
    def __iter__(self):
        for word in self.vocab.keys():
            yield word
    @property
    def max(self):
        return self.next
[docs]    def id_to_word(self, wid):
        """
        Word-id to word (string)
        Args:
            wid (int): word id
        Returns:
            str: string of given word id
        """
        return self._rev_vocab.get(wid) 
    @property
    def vocab(self):
        """
        dict: get the dict object of the vocabulary
        """
        return self._vocab
[docs]    def add_vocab_offset(self, offset):
        """
        Adds an offset to the ints of the vocabulary
        Args:
            offset (int): an int offset
        """
        new_vocab = {}
        for k, v in self.vocab.items():
            new_vocab[k] = v + offset
        self.next += offset
        self._vocab = new_vocab
        self._rev_vocab = {v: k for k, v in new_vocab.items()} 
[docs]    def reverse_vocab(self):
        """
        Return the vocabulary as a reversed dict object
        Returns:
            dict: reversed vocabulary object
        """
        return self._rev_vocab  
all_letters = string.ascii_letters + " .,;'"
n_letters = len(all_letters)
[docs]def char_to_id(c):
    """return int id of given character
        OOV char = len(all_letter) + 1
    Args:
        c (str): string character
    Returns:
        int: int value of given char
    """
    char_idx = all_letters.find(c)
    if char_idx == -1:
        char_idx = n_letters
    return char_idx 
[docs]def id_to_char(c_id):
    """return character of given char id"""
    if c_id < n_letters:
        return all_letters[c_id]
    return None 
[docs]def try_to_load_spacy(model_name):
    try:
        spacy.load(model_name)
        return True
    except OSError:
        return False 
[docs]class SpacyInstance:
    """
    Spacy pipeline wrapper which prompts user for model download authorization.
    Args:
        model (str, optional): spacy model name (default: english small model)
        disable (list of string, optional): pipeline annotators to disable
            (default: [])
        display_prompt (bool, optional): flag to display/skip license prompt
        n_jobs (int, optional): maximum number of concurrent Python worker processes.
            If -1 all CPUs are used.
        batch_size (int, optional): number of docs per batch.
        spacy_doc (bool, optional): if True, parser outputs `spacy.tokens.doc`
            instead of `CoreNLPDoc`
        show_tok (bool, optional): include token text in `CoreNLPDoc` output
        show_dok (bool, optional): include document text in `CoreNLPDoc` output
        ptb_pos (bool, optional): convert spacy POS tags to Penn Treebank tags
    """
    def __init__(
        self,
        model="en",
        disable=None,
        display_prompt=True,
        n_jobs=8,
        batch_size=1500,
        spacy_doc=False,
        show_tok=True,
        show_doc=True,
        ptb_pos=False,
    ):
        if disable is None:
            disable = []
        try:
            self._parser = spacy.load(model, disable=disable)
        except OSError:
            url = "https://spacy.io/models"
            if display_prompt and license_prompt("Spacy {} model".format(model), url) is False:
                sys.exit(0)
            spacy_download(model)
            print("Spacy model installed, please rerun your command.")
            sys.exit(0)
        self.n_jobs = n_jobs
        self.batch_size = batch_size
        self.spacy_doc = spacy_doc
        self.show_tok = show_tok
        self.show_doc = show_doc
        self.ptb_pos = ptb_pos
    @property
    def parser(self):
        """return Spacy's instance parser"""
        return self._parser
[docs]    def parse(self, texts, output_dir=None):
        """
        Parse a list of documents. If more than 1 document is passed, use multi-processing.
        Args:
            texts (list of str): documents to parse
            output_dir (Path or str, optional): if given, parsed documents will be written here
        """
        if self.n_jobs == 1:
            return self.process_batch(texts, output_dir)
        partitions = minibatch(texts, size=self.batch_size)
        executor = Parallel(n_jobs=self.n_jobs, backend="multiprocessing", prefer="processes")
        do = delayed(partial(self.process_batch))
        tasks = (do(batch, output_dir, batch_i) for batch_i, batch in enumerate(partitions))
        return [doc for batch in executor(tasks) for doc in batch] 
[docs]    def process_batch(self, texts, output_dir=None, batch_id=0):
        parsed_docs = []
        for i, doc in enumerate(self.parser.pipe(texts)):
            parsed_doc = (
                doc
                if self.spacy_doc
                else CoreNLPDoc.from_spacy(doc, self.show_tok, self.show_doc, self.ptb_pos)
            )
            parsed_docs.append(parsed_doc)
            if output_dir:
                out_path = Path(output_dir) / ("{}.{}.json".format(batch_id, i))
                with open(out_path, "w", encoding="utf8") as f:
                    f.write(parsed_doc.pretty_json())
        return parsed_docs 
[docs]    def tokenize(self, text: str) -> List[str]:
        """
        Tokenize a sentence into tokens
        Args:
            text (str): text to tokenize
        Returns:
            list: a list of str tokens of input
        """
        # pylint: disable=not-callable
        return [t.text for t in self.parser(text)]  
stemmer = EnglishStemmer()
lemmatizer = WordNetLemmatizer()
spacy_lemmatizer = Lemmatizer(LEMMA_INDEX, LEMMA_EXC, LEMMA_RULES)
p = re.compile(r"[ \-,;.@&_]")
[docs]class Stopwords(object):
    """
    Stop words list class.
    """
    stop_words = []
[docs]    @staticmethod
    def get_words():
        if not Stopwords.stop_words:
            sw_path = path.join(path.dirname(path.realpath(__file__)), "resources", "stopwords.txt")
            with open(sw_path) as fp:
                stop_words = []
                for w in fp:
                    stop_words.append(w.strip().lower())
            Stopwords.stop_words = stop_words
        return Stopwords.stop_words  
[docs]def simple_normalizer(text):
    """
    Simple text normalizer. Runs each token of a phrase thru wordnet lemmatizer
    and a stemmer.
    """
    if not str(text).isupper() or not str(text).endswith("S") or not len(text.split()) == 1:
        tokens = list(filter(lambda x: len(x) != 0, p.split(text.strip())))
        text = " ".join([stemmer.stem(lemmatizer.lemmatize(t)) for t in tokens])
    return text 
[docs]def spacy_normalizer(text, lemma=None):
    """
    Simple text normalizer using spacy lemmatizer. Runs each token of a phrase
    thru a lemmatizer and a stemmer.
    Arguments:
        text(string): the text to normalize.
        lemma(string): lemma of the given text. in this case only stemmer will
        run.
    """
    if not str(text).isupper() or not str(text).endswith("S") or not len(text.split()) == 1:
        tokens = list(filter(lambda x: len(x) != 0, p.split(text.strip())))
        if lemma:
            lemma = lemma.split(" ")
            text = " ".join([stemmer.stem(lem) for lem in lemma])
        else:
            text = " ".join([stemmer.stem(spacy_lemmatizer(t, "NOUN")[0]) for t in tokens])
    return text 
[docs]def read_sequential_tagging_file(file_path, ignore_line_patterns=None):
    """
    Read a tab separated sequential tagging file.
    Returns a list of list of tuple of tags (sentences, words)
    Args:
        file_path (str): input file path
        ignore_line_patterns (list, optional): list of string patterns to ignore
    Returns:
        list of list of tuples
    """
    if ignore_line_patterns:
        assert isinstance(ignore_line_patterns, list), "ignore_line_patterns must be a list"
    def _split_into_sentences(file_lines):
        sentences = []
        s = []
        for line in file_lines:
            if len(line) == 0:
                sentences.append(s)
                s = []
                continue
            s.append(line)
        if len(s) > 0:
            sentences.append(s)
        return sentences
    with open(file_path, encoding="utf-8") as fp:
        data = fp.readlines()
        data = [d.strip() for d in data]
        if ignore_line_patterns:
            for s in ignore_line_patterns:
                data = [d for d in data if s not in d]
        data = [tuple(d.split()) for d in data]
    return _split_into_sentences(data) 
[docs]def word_vector_generator(data, lower=False, start=0):
    """
    Word vector generator util.
    Transforms a list of sentences into numpy int vectors and returns the
    constructed vocabulary
    Arguments:
        data (list): list of list of strings
        lower (bool, optional): transform strings into lower case
        start (int, optional): vocabulary index start integer
    Returns:
        2D numpy array and Vocabulary of the detected words
    """
    vocab = Vocabulary(start)
    data_vec = []
    for sentence in data:
        sentence_vec = []
        for w in sentence:
            word = w
            if lower:
                word = word.lower()
            wid = vocab.add(word)
            sentence_vec.append(wid)
        data_vec.append(sentence_vec)
    return data_vec, vocab 
[docs]def character_vector_generator(data, start=0):
    """
    Character word vector generator util.
    Transforms a list of sentences into numpy int vectors of the characters
    of the words of the sentence, and returns the constructed vocabulary
    Arguments:
        data (list): list of list of strings
        start (int, optional): vocabulary index start integer
    Returns:
        np.array: a 2D numpy array
        Vocabulary: constructed vocabulary
    """
    vocab = Vocabulary(start)
    data_vec = []
    for sentence in data:
        sentence_vec = []
        for w in sentence:
            word_vec = []
            for char in w:
                cid = vocab.add(char)
                word_vec.append(cid)
            sentence_vec.append(word_vec)
        data_vec.append(sentence_vec)
    return data_vec, vocab 
[docs]def bio_to_spans(text: List[str], tags: List[str]) -> List[Tuple[int, int, str]]:
    """
    Convert BIO tagged list of strings into span starts and ends
    Args:
        text: list of words
        tags: list of tags
    Returns:
        tuple: list of start, end and tag of detected spans
    """
    pointer = 0
    starts = []
    for (
        i,
        t,
    ) in enumerate(tags):
        if t.startswith("B-"):
            starts.append((i, pointer))
        pointer += len(text[i]) + 1
    spans = []
    for s_i, s_char in starts:
        label_str = tags[s_i][2:]
        e = 0
        e_char = len(text[s_i + e])
        while len(tags) > s_i + e + 1 and tags[s_i + e + 1].startswith("I-"):
            e += 1
            e_char += 1 + len(text[s_i + e])
        spans.append((s_char, s_char + e_char, label_str))
    return spans