diff --git a/stanza/Dockerfile b/stanza/Dockerfile index f923b83..c3d1c2b 100644 --- a/stanza/Dockerfile +++ b/stanza/Dockerfile @@ -1,7 +1,15 @@ -ARG VERSION +ARG VERSION=dev FROM instituutnederlandsetaal/taggers-dockerized-base:$VERSION + +# Install requirements COPY requirements.txt ./ RUN pip install --no-cache-dir -r requirements.txt -RUN python -c 'import stanza; stanza.Pipeline(lang="nl", processors="tokenize,lemma,pos,depparse")' -COPY --link process.py / -COPY --link tei2trankit.py / \ No newline at end of file + +# Install stanza model +# alpino + conll02 NER +RUN python -c 'import stanza; stanza.Pipeline(lang="nl", processors={"tokenize": "alpino","lemma": "alpino","pos": "alpino","depparse": "alpino","ner": "conll02"})' +# lassysmall + wiki-ner +RUN python -c 'import stanza; stanza.Pipeline(lang="nl", processors={"tokenize": "lassysmall","lemma": "lassysmall","pos": "lassysmall","depparse": "lassysmall","ner": "wikiner"})' + +# tagger specific python source +COPY --link process.py conllu_tei_helper.py ./ diff --git a/stanza/conllu_tei_helper.py b/stanza/conllu_tei_helper.py new file mode 100644 index 0000000..7f15bc5 --- /dev/null +++ b/stanza/conllu_tei_helper.py @@ -0,0 +1,250 @@ +import sys +import xml.etree.ElementTree as ET +from conllu import parse_incr + + +class TEIConversionException(Exception): + def __init__(self, message) -> None: + self.message = message + super().__init__(self.message) + + +def localname(element: ET.Element) -> str: + """ + Get the localname of an element, i.e. without the namespace. + """ + _, _, tag = element.tag.rpartition("}") + return tag + + +def has_sentences(element: ET.Element) -> bool: + return len(get_sentences(element)) > 0 + + +def has_tokens(element: ET.Element) -> bool: + return len(get_tokens(element)) > 0 + + +def get_sentences(element: ET.Element) -> list[ET.Element]: + # Xpath doesn't seem to handle the namespaces well, so we do it brute force + ret = [] + for descendant in element.iter(): + if localname(descendant) == "s": + ret.append(descendant) + return ret + + +def get_tokens(element: ET.Element) -> list[ET.Element]: + # it would be beautiful to use the Xpath element.findall(".//[w or pc]") here + # but 'or' is not supported in elementTree + # see https://docs.python.org/3/library/xml.etree.elementtree.html#xpath-support + # therefore we just iterate the tree ourselves + ret = [] + for descendant in element.iter(): + if localname(descendant) == "w" or localname(descendant) == "pc": + ret.append(descendant) + return ret + + +def get_token_literals(element: ET.Element) -> list[str]: + words = get_tokens(element) + tokens = list(map(lambda w: "".join(w.itertext()), words)) + # itertext() generates empty strings for self-closing-tags like + # so we filter them out + return [t for t in tokens if t] # '' is falsy + + +def get_tree(filename: str) -> ET.ElementTree: + parser = ET.XMLParser(encoding="utf-8") + return ET.parse(filename, parser=parser) + + +def get_text_elements(element: ET.Element) -> list[ET.Element]: + ret = [] + for descendant in element.iter(): + if localname(descendant) == "s": + ret.append(descendant) + return ret + + +def parse_tei(filename: str) -> list[list[str]]: + """ + Convert a TEI file to a list of pretokenized strings, to be used by spacy or the like. + Example: + + Hello world ! + Goodbye world ! + + will be converted to: + [ + ["Hello", "world", "!"], + ["Goodbye", "world", "!"] + ] + """ + tree = get_tree(filename).getroot() + if not has_tokens(tree): + raise TEIConversionException("TEI document does not contain nor -tags") + if not has_sentences(tree): + raise TEIConversionException("TEI document does not contain -tags") + + texts = get_text_elements(tree) + + ret = [] + for text in texts: + for sentence in get_sentences(text): + literals = get_token_literals(sentence) + ret.append(literals) + + if len(ret) == 0: + raise TEIConversionException( + "TEI document contains no sentences of processable size" + ) + return ret + + +def conllu_to_tei(conllu_path, tei_path): + """ + Generate a new TEI file with the conllu layer annotations. + """ + root = ET.Element("TEI") + text = ET.SubElement(root, "text") + body = ET.SubElement(text, "body") + word_id = 1 + + for conllu_sentence in conllu_sentence_generator(conllu_path): + + paragraph = ET.SubElement(body, "p") + tei_sentence = ET.SubElement(paragraph, "s") + linkgroup = create_linkgroup(tei_sentence) + + for conllu_word in conllu_words_generator(conllu_sentence): + # create + element_type = "pc" if conllu_word["upos"] == "PUNCT" else "w" + tei_word = ET.SubElement(tei_sentence, element_type) + # add text + tei_word.text = conllu_word["form"] + # add id + tei_word.set("id", f"w.{word_id}") + # add lemma and pos + merge_lemma_pos(tei_word, conllu_word) + word_id += 1 + + # now add the deprel and head + tei_words = get_tokens(tei_sentence) + for i, conllu_word in enumerate(conllu_words_generator(conllu_sentence)): + tei_word = tei_words[i] + merge_deprel_head(tei_sentence, tei_words, linkgroup, tei_word, conllu_word) + + # export the xml tree + tree = ET.ElementTree(root) + ET.indent(tree, space="\t", level=0) + tree.write(tei_path, encoding="utf-8", xml_declaration=True) + + +def conllu_sentence_generator(conllu_path): + data_file = open(conllu_path, "r", encoding="utf-8") + for sentence in parse_incr(data_file): + yield sentence + + +def conllu_words_generator(sentence): + for token in sentence: + yield token + + +def merge_tei_with_conllu_layer(conllu_path, tei_path): + """ + Outputs the original TEI file with the conllu layer annotations added. + """ + tree = get_tree(tei_path).getroot() + texts = get_text_elements(tree) + + conllu_sentences = conllu_sentence_generator(conllu_path) + + for text in texts: + for tei_sentence in get_sentences(text): + tei_words: list[ET.Element] = get_tokens(tei_sentence) + + linkgroup = create_linkgroup(tei_sentence) + + conllu_sentence = conllu_sentences.__next__() + + conllu_words = conllu_words_generator(conllu_sentence) + + for tei_word in tei_words: + # Handle empty tokens + token_text: str = "".join(tei_word.itertext()) + if token_text == "": + print(f"Warning: Empty token in {tei_path}", file=sys.stderr) + if "lemma" in tei_word.attrib: + del tei_word.attrib["lemma"] + if "type" in tei_word.attrib: + del tei_word.attrib["type"] + continue + + # Handle mismatched sentence lengths + try: + conllu_word = conllu_words.__next__() + except StopIteration: + print( + f"Warning: CoNLL-U sentence is shorter than TEI sentence in {tei_path}", + file=sys.stderr, + ) + continue # keep the rest of the sentence as is + + ###### lemma & pos ###### + merge_lemma_pos(tei_word, conllu_word) + + ###### deprel & head ###### + merge_deprel_head( + tei_sentence, tei_words, linkgroup, tei_word, conllu_word + ) + + # export the xml tree + return ET.tostring(tree.getroot(), encoding="utf-8", method="xml") + + +def create_linkgroup(tei_sentence): + linkgroup = ET.Element("linkGrp") + linkgroup.set("type", "UD-SYN") + linkgroup.set("targFunc", "head argument") + tei_sentence.append(linkgroup) + return linkgroup + + +def getTEIid(element): + if element.get("{http://www.w3.org/XML/1998/namespace}id") is not None: + return element.get("{http://www.w3.org/XML/1998/namespace}id") + return element.get("id") + + +def merge_lemma_pos(tei_word, conllu_word): + if conllu_word["lemma"]: + tei_word.set("lemma", conllu_word["lemma"]) + if conllu_word["xpos"]: + tei_word.set("type", conllu_word["xpos"]) + if conllu_word["upos"]: + tei_word.set("pos", conllu_word["upos"]) + if conllu_word["feats"]: + feats = "|".join([f"{k}={v}" for k, v in conllu_word["feats"].items()]) + tei_word.set("msd", feats) + + +def merge_deprel_head(tei_sentence, tei_words, linkgroup, tei_word, conllu_word): + if conllu_word["deprel"] and conllu_word["head"]: + link = ET.Element("link") + deprel = conllu_word["deprel"] + # Prepend the deprel with "ud-syn:" to match the parlamint format + deprel = "ud-syn:" + deprel + link.set("ana", deprel) + linkgroup.append(link) + + head = None + if conllu_word["head"] == 0: # root + head = tei_sentence + else: + head = tei_words[conllu_word["head"] - 1] + link.set( + "target", + "#" + str(getTEIid(head)) + " " + "#" + str(getTEIid(tei_word)), + ) diff --git a/stanza/process.py b/stanza/process.py index 2f10ebc..3979d5f 100644 --- a/stanza/process.py +++ b/stanza/process.py @@ -1,16 +1,12 @@ -from io import TextIOWrapper -from unittest import result -from xmlrpc.client import Boolean -import stanza -from tei2trankit import tei2trankit +# standard +import os import xml.etree.ElementTree as ET -from tqdm import tqdm +# third-party +import stanza -""" -Initialize the tagger if needed and process input files by calling the specific tagger implementation -and ensuring the output is written to the expected file. -""" +# local +from conllu_tei_helper import parse_tei # The extension of output files produced by the tagger. OUTPUT_EXTENSION = ".conllu" @@ -30,55 +26,56 @@ def init() -> None: """ global xml_nlp xml_nlp = stanza.Pipeline( - lang="nl", tokenize_pretokenized=True, processors="tokenize,lemma,pos,ner,depparse" + lang="nl", + tokenize_pretokenized=True, + processors="tokenize,lemma,pos,ner,depparse", ) global txt_nlp - txt_nlp = stanza.Pipeline(lang="nl", processors="tokenize,lemma,pos,ner,depparse") + stanza_model = os.getenv("STANZA_MODEL") + if stanza_model == "alpino": + txt_nlp = stanza.Pipeline( + lang="nl", + processors={ + "tokenize": "alpino", + "lemma": "alpino", + "pos": "alpino", + "depparse": "alpino", + "ner": "conll02", + }, + ) + elif stanza_model == "lassysmall": + txt_nlp = stanza.Pipeline( + lang="nl", + processors={ + "tokenize": "lassysmall", + "lemma": "lassysmall", + "pos": "lassysmall", + "depparse": "lassysmall", + "ner": "wikiner", + }, + ) def process(in_file: str, out_file: str) -> None: """ Process the file at path "in_file" and write the result to path "out_file". """ - process_all(in_file, out_file) - -def process_by_line(in_file, out_file) -> None: - - with open(out_file, "x", encoding="utf-8") as f_out: - with open(in_file, "r", encoding="utf-8") as f_in: - - is_xml = is_file_xml(in_file) - nlp = xml_nlp if is_xml else txt_nlp - doc = tei2trankit(in_file) if is_xml else parse_txt(f_in) - - for line in tqdm(doc): - if is_xml: - line = " ".join(line) - result = nlp(line) - f_out.write("{:C}".format(result)) - f_out.write("\n") - -def process_all(in_file, out_file) -> None: - with open(out_file, "x", encoding="utf-8") as f_out: with open(in_file, "r", encoding="utf-8") as f_in: is_xml = is_file_xml(in_file) nlp = xml_nlp if is_xml else txt_nlp - doc = tei2trankit(in_file) if is_xml else f_in.read() + doc = parse_tei(in_file) if is_xml else f_in.read() result = nlp(doc) f_out.write("{:C}".format(result)) f_out.write("\n") + def is_file_xml(in_file: str) -> bool: try: ET.parse(in_file) return True except: return False - - -def parse_txt(f_in: TextIOWrapper) -> list[str]: - return [line.strip() for line in f_in if not line.isspace() and line] diff --git a/stanza/requirements.txt b/stanza/requirements.txt index 7eb43fc..58ed064 100644 --- a/stanza/requirements.txt +++ b/stanza/requirements.txt @@ -1,2 +1,2 @@ -stanza -tqdm \ No newline at end of file +stanza==1.9.2 +conllu==6.0.0 \ No newline at end of file diff --git a/stanza/tei2trankit.py b/stanza/tei2trankit.py deleted file mode 100644 index edb79e8..0000000 --- a/stanza/tei2trankit.py +++ /dev/null @@ -1,140 +0,0 @@ -# Only the following type of TEI-files are supported -# - Containing extensively and tags; will be processed as pretokenized on both levels - -import sys -import xml.etree.ElementTree as ET - -MAXIMUM_SENTENCE_SIZE = 100 - -class TEIConversionException(Exception): - def __init__(self, message) -> None: - self.message = message - super().__init__(self.message) - -def _tag_without_namespace(element): - _, _, tag = element.tag.rpartition('}') # strip ns - return tag - -def has_sentences(element): - return len(get_sentences(element)) > 0 - -def has_tokens(element): - return len(get_tokens(element)) > 0 - -def get_sentences(element): - # Xpath doesn't seem to handle the namespaces well, so we do it brute force - ret = [] - for descendant in element.iter(): - if _tag_without_namespace(descendant) == "s": - ret.append(descendant) - return ret - -def get_tokens(element): - # it would be beautiful to use the Xpath element.findall(".//[w or pc]") here - # but 'or' is not supported in elementTree - # see https://docs.python.org/3/library/xml.etree.elementtree.html#xpath-support - # therefore we just iterate the tree ourselves - ret = [] - for descendant in element.iter(): - if _tag_without_namespace(descendant) == "w" or _tag_without_namespace(descendant) == "pc": - ret.append(descendant) - return ret - -def get_token_literals(element): - words = get_tokens(element) - tokens = list(map( lambda w : "".join(w.itertext()), words)) - # itertext() generates empty strings for self-closing-tags like - return [t for t in tokens if t] # '' is falsy - -def get_tree(filename): - parser = ET.XMLParser(encoding="utf-8") - return ET.parse(filename, parser=parser) - -def get_text_elements(tree): - root = tree.getroot() - ret = [] - for descendant in root.iter(): - if _tag_without_namespace(descendant) == "s": - ret.append(descendant) - return ret - -def tei2trankit(filename): - tree = get_tree(filename) - if not has_tokens(tree): - raise TEIConversionException("TEI document does not contain nor -tags") - if not has_sentences(tree): - raise TEIConversionException("TEI document does not contain -tags") - - texts = get_text_elements(tree) - - ret = [] - for text in texts: - for sentence in get_sentences(text): - literals = get_token_literals(sentence) - ret.append(literals) - - if len(ret) == 0: - raise TEIConversionException("TEI document contains no sentences of processable size") - return ret - -def trankit2tei(data, filename): - tree = get_tree(filename) - texts = get_text_elements(tree) - def trankit_sentence_generator(): - for sentence in data['sentences']: - yield sentence - trankit_sentences = trankit_sentence_generator() - - def getTEIid(element): - # TODO replace namespace with {*}? - if element.get('{http://www.w3.org/XML/1998/namespace}id') is not None: - return element.get('{http://www.w3.org/XML/1998/namespace}id') - return element.get('id') - - for text in texts: - for sentence in get_sentences(text): - tokens = get_tokens(sentence) - if len(tokens) > MAXIMUM_SENTENCE_SIZE: - continue # sentence is too long, skip it - - linkgroup = ET.Element("linkGrp") - linkgroup.set("type", "UD-SYN") - linkgroup.set("targFunc", "head argument") - sentence.append(linkgroup) - - trankit_sentence = trankit_sentences.__next__() - - def trankit_words_generator(): - for token in trankit_sentence['tokens']: - yield token - trankit_words = trankit_words_generator() - - for token in tokens: - trankit_word = trankit_words.__next__() - - link = ET.Element("link") - # The deprel seems to be missing sometimes - # Therefore we initialize it to a default value - deprel = "" - if 'deprel' in trankit_word: - deprel = trankit_word['deprel'] - # Prepend the deprel with "ud-syn:" to match the parlamint format - deprel = "ud-syn:" + deprel - link.set("ana", deprel) - linkgroup.append(link) - - head = None - if trankit_word['head'] == 0: # root - head = sentence - else: - head = tokens[trankit_word['head'] - 1] # trankit starts at index 1 - link.set("target", "#" + str(getTEIid(head)) + " " + "#" + str(getTEIid(token))) - - # export the xml tree - return ET.tostring(tree.getroot(), encoding='utf-8', method='xml') - -if __name__ == "__main__": - if(len(sys.argv)) != 2: - print(f"Usage: {sys.argv[0]} FILENAME") - sys.exit(0) - print(tei2trankit(sys.argv[1]))