diff --git a/.gitignore b/.gitignore index 013ab917..f937b437 100644 --- a/.gitignore +++ b/.gitignore @@ -75,9 +75,14 @@ mmif/ver mmif/res mmif/vocabulary ./VERSION* +VERSION .hypothesis # Documentation build artifacts documentation/cli_help.rst documentation/whatsnew.rst docs-test + +# environments +.venv* +venv* diff --git a/mmif/serialize/mmif.py b/mmif/serialize/mmif.py index 9e94496d..c6fa4c62 100644 --- a/mmif/serialize/mmif.py +++ b/mmif/serialize/mmif.py @@ -15,7 +15,7 @@ import warnings from collections import defaultdict from datetime import datetime -from typing import List, Union, Optional, Dict, cast, Iterator +from typing import Any, List, Union, Optional, Dict, cast, Iterator import jsonschema.validators @@ -487,11 +487,11 @@ def get_documents_in_view(self, vid: Optional[str] = None) -> List[Document]: else: return [] - def get_documents_by_type(self, doc_type: Union[str, DocumentTypes]) -> List[Document]: + def get_documents_by_type(self, doc_type: Any) -> List[Document]: """ Method to get all documents where the type matches a particular document type, which should be one of the CLAMS document types. - :param doc_type: the type of documents to search for, must be one of ``Document`` type defined in the CLAMS vocabulary. + :param doc_type: the type of documents to search for, must be one of ``Document`` types defined in the CLAMS vocabulary. :return: a list of documents matching the requested type, or an empty list if none found. """ docs = [] diff --git a/mmif/utils/cli/README.md b/mmif/utils/cli/README.md new file mode 100644 index 00000000..6d04438d --- /dev/null +++ b/mmif/utils/cli/README.md @@ -0,0 +1,71 @@ +# MMIF CLI Scripts + +This directory contains CLI scripts like `source` and `rewind` that can be called from the command line. These scripts are called as subcommands of the `mmif` CLI script, for example `mmif source --help`. + + +## Adding another CLI script + +To add a CLI script all you need to do is add a python module to `mmif/utils/cli` and make sure it has the following three methods: + +1. `prep_argparser(**kwargs)` to define and return an instance of `argparse.ArgumentParser`. + +2. `describe_argparser()` to return a pair of strings that describe the script. The first string is a one-line description of the argument parser and the second a more verbose description. These will be shown for `mmif --help` and `mmif subcommand --help` respectively. + +3. `main(args)` to do the actual work of running the code + +See the current CLI scripts for examples. + + +## Some background + +The mmif-python package has a particular way to deal with CLI utility scripts. All scripts live in the mmif.utils.cli package. The `mmif/__init__.py` module has the `cli()` function which illustrates the requirements on utility scripts: + +```python +def cli(): + parser, subparsers = prep_argparser_and_subcmds() + cli_modules = {} + for cli_module in find_all_modules('mmif.utils.cli'): + cli_module_name = cli_module.__name__.rsplit('.')[-1] + cli_modules[cli_module_name] = cli_module + subcmd_parser = cli_module.prep_argparser(add_help=False) + subparsers.add_parser(cli_module_name, parents=[subcmd_parser], + help=cli_module.describe_argparser()[0], + description=cli_module.describe_argparser()[1], + formatter_class=argparse.RawDescriptionHelpFormatter) + if len(sys.argv) == 1: + parser.print_help(sys.stderr) + sys.exit(1) + args = parser.parse_args() + if args.subcmd not in cli_modules: + parser.print_help(sys.stderr) + else: + cli_modules[args.subcmd].main(args) +``` + + + +You can see the invocations of the three functions mentioned above. + +The `prep_argparser()` function uses `find_all_modules()`, which finds modules in the top-level of the cli package. That module could have all the code needed for the CLI to work, but it could refer to other modules as well. For example, the `summary.py` script is in `cli`, but it imports the summary utility from `mmif.utls`. + +In the setup.py script there is this passage towards the end of the file: + +```python + entry_points={ + 'console_scripts': [ + 'mmif = mmif.__init__:cli', + ], + }, +``` + +This leaves it up to the `cli()` method to find the scripts and this is why just adding a submodule as mentioned above works. Note that the initialization file of the cli package imports two of the commandline related scripts: + +```python +from mmif.utils.cli import rewind +from mmif.utils.cli import source +``` + +These may be used somewhere, but they are not necessary to run MMIF CLI scripts. + diff --git a/mmif/utils/cli/summarize.py b/mmif/utils/cli/summarize.py new file mode 100644 index 00000000..06c1afae --- /dev/null +++ b/mmif/utils/cli/summarize.py @@ -0,0 +1,31 @@ +import sys +import argparse + +from mmif.utils.summarizer.summary import Summary + + + +def describe_argparser() -> tuple: + """ + Returns two strings: a one-line description of the argparser and additional + material, which will be shown for `mmif --help` and `mmif summarize --help`, + respectively. For now they return the same string. The retun value should + still be a tuple because mmif.cli() depends on it. + """ + oneliner = 'Create a JSON Summary for a MMIF file' + return oneliner, oneliner + + +def prep_argparser(**kwargs): + parser = argparse.ArgumentParser( + description=describe_argparser()[1], + formatter_class=argparse.RawDescriptionHelpFormatter, + **kwargs) + parser.add_argument("-i", metavar='MMIF_FILE', help='input MMIF file', required=True) + parser.add_argument("-o", metavar='OUTPUT_FILE', help='output JSON summary file', required=True) + return parser + + +def main(args): + mmif_summary = Summary(args.i) + mmif_summary.report(outfile=args.o) diff --git a/mmif/utils/summarizer/__init__.py b/mmif/utils/summarizer/__init__.py new file mode 100644 index 00000000..1122d449 --- /dev/null +++ b/mmif/utils/summarizer/__init__.py @@ -0,0 +1,40 @@ + +import argparse + +from mmif.utils.summarizer.summary import Summary + + +def argparser(): + parser = argparse.ArgumentParser(description='Create a JSON Summary for a MMIF file') + parser.add_argument('-i', metavar='MMIF_FILE', help='input MMIF file', required=True) + parser.add_argument('-o', metavar='JSON_FILE', help='output JSON summary file', required=True) + return parser + + +def pp_args(args): + for a, v in args.__dict__.items(): + print(f'{a:12s} --> {v}') + + +def main(): + parser = argparser() + args = parser.parse_args() + #pp_args(args) + mmif_summary = Summary(args.i) + mmif_summary.report(outfile=args.o) + + +""" +There used to be an option to process a whole directory, but I never used it and decided +that if needed it would better be done by an extra script or a separate function. + +The code for when there was a -d option is here just in case. + +if args.d: + for mmif_file in pathlib.Path(args.d).iterdir(): + if mmif_file.is_file() and mmif_file.name.endswith('.mmif'): + print(mmif_file) + json_file = str(mmif_file)[:-4] + 'json' + mmif_summary = Summary(mmif_file.read_text()) + mmif_summary.report(outfile=json_file) +""" \ No newline at end of file diff --git a/mmif/utils/summarizer/config.py b/mmif/utils/summarizer/config.py new file mode 100644 index 00000000..f972bd97 --- /dev/null +++ b/mmif/utils/summarizer/config.py @@ -0,0 +1,69 @@ + +from mmif.vocabulary import DocumentTypes +from mmif.vocabulary import AnnotationTypes + + +# The name of CLAMS applications, used to select views and to determine whether +# the summarizer is appropriate for the app version. +# TODO: this now requires an exhaustive listing of all allowed apps and their +# versions, we need a more maintainable system. + +KALDI = [ + # The first two use MMIF 0.4 and should probably be retired + 'http://apps.clams.ai/aapb-pua-kaldi-wrapper/0.2.2', + 'http://apps.clams.ai/aapb-pua-kaldi-wrapper/0.2.3', + 'http://apps.clams.ai/aapb-pua-kaldi-wrapper/v3'] + +WHISPER = [ + 'http://apps.clams.ai/whisper-wrapper/v7', + 'http://apps.clams.ai/whisper-wrapper/v8', + 'http://apps.clams.ai/whisper-wrapper/v8-3-g737e280'] + +CAPTIONER = [ + 'http://apps.clams.ai/llava-captioner/v1.2-6-gc824c97', + 'http://apps.clams.ai/smolvlm2-captioner'] + +NER = [ + 'http://apps.clams.ai/spacy-wrapper/v1.1', + 'http://apps.clams.ai/spacy-wrapper/v2.1'] + +SEGMENTER = 'http://apps.clams.ai/audio-segmenter' + + +# When a named entity occurs 20 times we do not want to generate 20 instances of +# it. If the start of the next entity occurs within the below number of +# milliseconds after the end of the previous, then it is just added to the +# previous one. Taking one minute as the default so two mentions in a minute end +# up being the same instance. This setting can be changed with the 'granularity' +# parameter. +# TODO: this seems broken + +GRANULARITY = 1000 + + +# Properties used for the summary for various tags + +DOC_PROPS = ('id', 'type', 'location') +VIEW_PROPS = ('id', 'timestamp', 'app') +TF_PROPS = ('id', 'start', 'end', 'frameType') +E_PROPS = ('id', 'group', 'cat', 'tag', 'video-start', 'video-end', 'coordinates') + + +# Names of types + +TEXT_DOCUMENT = DocumentTypes.TextDocument.shortname +VIDEO_DOCUMENT = DocumentTypes.VideoDocument.shortname +TIME_FRAME = AnnotationTypes.TimeFrame.shortname +BOUNDING_BOX = AnnotationTypes.BoundingBox.shortname +ALIGNMENT = AnnotationTypes.Alignment.shortname + +ANNOTATION = 'Annotation' +TOKEN = 'Token' +SENTENCE = 'Sentence' +PARAGRAPH = 'Paragraph' +NAMED_ENTITY = 'NamedEntity' +NOUN_CHUNK = 'NounChunk' +VERB_CHUNK = 'VerbChunk' + +TIME_BASED_INTERVALS = {TIME_FRAME} +SPAN_BASED_INTERVALS = {TOKEN, SENTENCE, PARAGRAPH, NAMED_ENTITY, NOUN_CHUNK, VERB_CHUNK} diff --git a/mmif/utils/summarizer/graph.py b/mmif/utils/summarizer/graph.py new file mode 100644 index 00000000..55c38ffd --- /dev/null +++ b/mmif/utils/summarizer/graph.py @@ -0,0 +1,229 @@ +import sys, json +from collections import defaultdict +from operator import itemgetter +from pathlib import Path +import argparse + +from typing import Any +from mmif import Mmif + +from mmif.utils.summarizer import config +from mmif.utils.summarizer.utils import compose_id, normalize_id +from mmif.utils.summarizer.nodes import Node, Nodes, EntityNode, TimeFrameNode + + +class Graph(object): + + """Graph implementation for a MMIF document. Each node contains an annotation + or document. Alignments are stored separately. Edges between nodes are created + from the alignments and added to the Node.targets property. The first edge added + to Node.targets is the document that the Node points to (if there is one). + + The goal for the graph is to store all useful annotation and to have simple ways + to trace nodes all the way up to the primary data.""" + + def __init__(self, mmif: Any): + # TODO: the type hint should really be "MMif | str", but pytype did not + # like that. + self.mmif = mmif if type(mmif) is Mmif else Mmif(mmif) + self.documents = [] + self.nodes = {} + self.alignments = [] + self._init_nodes() + self._init_edges() + # Third pass to add links between text elements, in particular from + # entities to tokens, adding lists of tokens to entities. + tokens = self.get_nodes(config.TOKEN) + entities = self.get_nodes(config.NAMED_ENTITY) + self.token_idx = TokenIndex(tokens) + #self.token_idx.pp() + for e in entities: + #print('>>>', e, e.anchors) + e.tokens = self.token_idx.get_tokens_for_node(e) + + def _init_nodes(self): + # The top-level documents are added as nodes, but they are also put in + # the documents list. + for doc in self.mmif.documents: + self.add_node(None, doc) + self.documents.append(doc) + # First pass over all annotations and documents in all views and save + # them in the graph. + doc_ids = [d.id for d in self.documents] + for view in self.mmif.views: + for annotation in view.annotations: + normalize_id(doc_ids, view, annotation) + if annotation.at_type.shortname == config.ALIGNMENT: + # alignments are not added as nodes, but we do keep them around + self.alignments.append((view, annotation)) + else: + self.add_node(view, annotation) + + def _init_edges(self): + # Second pass over the alignments so we create edges. + for view, alignment in self.alignments: + self.add_edge(view, alignment) + + def __str__(self): + return "" % len(self.nodes) + + def add_node(self, view, annotation): + """Add an annotation as a node to the graph.""" + node = Nodes.new(self, view, annotation) + self.nodes[node.identifier] = node + + def add_edge(self, view, alignment): + source_id = alignment.properties['source'] + target_id = alignment.properties['target'] + #print(alignment.id, source_id, target_id) + source = self.get_node(source_id) + target = self.get_node(target_id) + if source is None or target is None: + print('WARNING: could not add edge ', + 'because the source and/or target does not extst') + else: + # make sure the direction goes from token or textdoc to annotation + if target.annotation.at_type.shortname in (config.TOKEN, config.TEXT_DOCUMENT): + source, target = target, source + source.targets.append(target) + source.add_anchors_from_alignment(target) + target.add_anchors_from_alignment(source) + + def get_node(self, node_id) -> Node | None: + return self.nodes.get(node_id) + + # def get_nodes(self, short_at_type: str, view_id : str = None): + # replaced the above because the code coverage is picky on type hints + def get_nodes(self, short_at_type: str, view_id=None): + """Get all nodes for an annotation type, using the short form. If a view + identifier is provided then only include nodes from that view.""" + return [node for node in self.nodes.values() + if (node.at_type.shortname == short_at_type + and (view_id is None or node.view.id == view_id))] + + def statistics(self): + stats = defaultdict(int) + for node in self.nodes.values(): + stats[f'{str(node.view_id):4} {node.at_type.shortname}'] += 1 + return stats + + def trim(self, start: int, end: int): + """Trim the graph and keep only those nodes that are included in the graph + between two timepoints (both in milliseconds). This assumes that all nodes + are anchored on the time in the audio or video stream. At the moment it + keeps all nodes that are not explicitly anchored.""" + remove = set() + for node_id, node in self.nodes.items(): + if 'time-point' in node.anchors: + if not start <= node.anchors['time-point'] <= end: + remove.add(node_id) + if 'time-offsets' in node.anchors: + p1, p2 = node.anchors['time-offsets'] + if not (start <= p1 <= end and start <= p2 <= end): + remove.add(node_id) + new_nodes = [n for n in self.nodes.values() if not n.identifier in remove] + self.nodes = { node.identifier: node for node in new_nodes } + + def pp(self, fname=None, skip_timepoints=False): + fh = sys.stdout if fname is None else open(fname, 'w') + fh.write("%s\n" % self) + for view in self.mmif.views: + fh.write(" \n" % (view.id, str(view.metadata['app']))) + for node_id, node in self.nodes.items(): + if node.at_type.shortname == 'TimePoint': + continue + fh.write(" %-40s" % node) + targets = [str(t) for t in node.targets] + fh.write(' --> [%s]\n' % ' '.join(targets)) + + def pp_statistics(self): + stats = self.statistics() + for at_type in sorted(stats): + print(f'{at_type:20} {stats[at_type]:>5}') + + +class TokenIndex(object): + + """ + The tokens are indexed on the identifier on the TextDocument that they occur + in and for each text document we have a list of pairs + + {'v_4:td1': [ + ((0, 5), ), + ((5, 6), ), + ... + } + """ + + # TODO: + # - Benchmark get_tokens_for_node(). I may want to use something like this + # to determine enclosed nodes and enclosing nodes and that may blow up since + # that would be O(n^2). If it does matter, probably start using binary search + # or add an index from character offset to nodes. + # - It is also not sure whether we still need this since the new spaCy gives + # targets to tokens. + + def __init__(self, tokens): + self.tokens = {} + self.token_count = len(tokens) + for t in tokens: + tup = ((t.properties['start'], t.properties['end']), t) + self.tokens.setdefault(t.document.identifier, []).append(tup) + # Make sure the tokens for each document are ordered. + for document, token_list in self.tokens.items(): + self.tokens[document] = sorted(token_list, key=itemgetter(0)) + # In some cases there are two tokens with identical offset (for example + # with tokenization from both Kaldi and spaCy, not sure what to do with + # these, but should probably be more careful on what views to access + + def __len__(self): + return self.token_count + + def __str__(self): + return f'' + + def get_tokens_for_node(self, node: Node): + """Return all tokens included in the span of a node.""" + doc = node.document.identifier + try: + start = node.properties['start'] + end = node.properties['end'] + except KeyError: + start, end = node.anchors['text-offsets'] + tokens = [] + for (t_start, t_end), token in self.tokens.get(doc, []): + if t_start >= start and t_end <= end: + tokens.append(token) + return tokens + + def pp(self, fname=None): + fh = sys.stdout if fname is None else open(fname, 'w') + for document in self.tokens: + fh.write("\n[%s] -->\n" % document) + for t in self.tokens[document]: + fh.write(' %s %s\n' % (t[0], t[1])) + + + +if __name__ == '__main__': + + graph = Graph(open(sys.argv[1]).read()) + print(graph) + #graph.pp() + #graph.nodes['v_7:st12'].pp() + #graph.nodes['v_2:s1'].pp() + #graph.nodes['v_4:tf1'].pp() + exit() + for node in graph.nodes.values(): + print(node.at_type.shortname, node.identifier, node.anchors) + + +''' + +Printing some graphs: + +uv run graph.py -i examples/input-v9.mmif -e dot -f png -o examples/dot-v9-1-full -p -a -v +uv run graph.py -i examples/input-v9.mmif -e dot -f png -o examples/dot-v9-2-no-view-links -p -a +uv run graph.py -i examples/input-v9.mmif -e dot -f png -o examples/dot-v9-3-no-anchor-to-doc -p + +''' diff --git a/mmif/utils/summarizer/nodes.py b/mmif/utils/summarizer/nodes.py new file mode 100644 index 00000000..53201022 --- /dev/null +++ b/mmif/utils/summarizer/nodes.py @@ -0,0 +1,370 @@ +import json + +from typing import Any + +from mmif.utils.summarizer import config + + + +class Node(object): + + def __init__(self, graph, view, annotation): + self.graph = graph + self.view = view + self.view_id = None if self.view is None else self.view.id + self.annotation = annotation + # copy some information from the Annotation + self.at_type = annotation.at_type + self.identifier = annotation.id + self.properties = json.loads(str(annotation.properties)) + # get the document from the view or the properties + self.document = self._get_document() + # The targets property contains a list of annotations or documents that + # the node content points to. This includes the document the annotation + # points to as well as the alignment from a token or text document to a + # bounding box or time frame (which is added later). + # TODO: the above does not seem to be true since there is no evidence of + # data from alignments being added. + self.targets = [] if self.document is None else [self.document] + self.anchors = {} + self.add_local_anchors() + self.add_anchors_from_targets() + + def __str__(self): + anchor = '' + if self.at_type.shortname == config.TOKEN: + anchor = " %s:%s '%s'" % (self.properties['start'], + self.properties['end'], + self.properties.get('text','').replace('\n', '\\n')) + return "<%s %s%s>" % (self.at_type.shortname, self.identifier, anchor) + + def add_local_anchors(self): + """Get the anchors that you can get from the annotation itself, which + includes the start and end offsets, the coordinates, the timePoint of + a BoundingBox and any annotation with targets.""" + props = self.properties + attype = self.annotation.at_type.shortname + if 'start' in props and 'end' in props: + # TimeFrame is the only non-character based interval so this simple + # if-then-else should work + if attype == config.TIME_FRAME: + self.anchors['text-offsets'] = (props['start'], props['end']) + else: + self.anchors['time-offsets'] = (props['start'], props['end']) + if 'coordinates' in props: + self.anchors['coordinates'] = props['coordinates'] + if 'timePoint' in props: + self.anchors['time-point'] = props['timePoint'] + if 'targets' in props: + self.anchors['targets'] = props['targets'] + + def add_anchors_from_targets(self): + """Get start and end offsets or timePoints from the targets and add them to + the anchors, but only if there were no anchors on the node already. This has + two cases: one for TimeFrames and one for text intervals.""" + props = self.properties + attype = self.annotation.at_type.shortname + if 'targets' in props: + try: + t1 = self.graph.nodes[props['targets'][0]] + t2 = self.graph.nodes[props['targets'][-1]] + if attype == config.TIME_FRAME: + if not 'time-offsets' in props: + self.anchors['time-offsets'] = ( + t1.properties['timePoint'], t2.properties['timePoint']) + else: + if not 'text-offsets' in props: + self.anchors['text-offsets'] = ( + t1.properties['start'], t2.properties['end']) + except IndexError: + print(f'WARNING: Unexpected empty target list for {self.identifier}') + + def add_anchors_from_alignment(self, target: Any, debug=False): + if target is None: + return + source_attype = self.at_type.shortname + target_attype = target.at_type.shortname + if debug: + print('\n@ DEBUG SOURCE->TARGET ', source_attype, target_attype) + print('@ DEBUG SOURCE.PROPS ', list(self.properties.keys())) + print('@ DEBUG TARGET.PROPS ', list(target.properties.keys())) + print('@ DEBUG TARGET.ANCHORS ', target.anchors) + # If a TextDocument is aligned to a BoundingBox then we grab the coordinates + # TODO: how are we getting the time point? + if source_attype == 'TextDocument' and target_attype == 'BoundingBox': + if 'coordinates' in target.properties: + self.anchors['coordinates'] = target.properties['coordinates'] + #print(source_attype, self.anchors) + elif source_attype == 'BoundingBox' and target_attype == 'TextDocument': + pass + # If a TextDocument is aligned to a TimeFrame then we copy time anchors + # but also targets and representatives, the latter because some alignments + # are not precise + elif source_attype == 'TextDocument' and target_attype == 'TimeFrame': + if 'start' in target.properties and 'end' in target.properties: + self.anchors['time-offsets'] = (target.properties['start'], + target.properties['end']) + if 'time-offsets' in target.anchors: + # TODO: is this ever used? + self.anchors['time-offsets'] = target.anchors['time-offsets'] + if 'targets' in target.properties: + self.anchors['targets'] = target.properties['targets'] + if 'representatives' in target.properties: + self.anchors['representatives'] = target.properties['representatives'] + #print('-', source_attype, self.anchors, self, target) + elif source_attype == 'TimeFrame' and target_attype == 'TextDocument': + pass + # Simply copy the time point + elif source_attype == 'TextDocument' and target_attype == 'TimePoint': + self.anchors['time-point'] = target.anchors['time-point'] + if debug: + print('+ ADDED SOURCE.ANCHORS ', self.anchors) + # For Token-TimeFrame alignments all we need are the start and end time points + elif source_attype == 'Token' and target_attype == 'TimeFrame': + if 'start' in target.properties and 'end' in target.properties: + self.anchors['time-offsets'] = (target.properties['start'], + target.properties['end']) + #print(source_attype, self.anchors) + elif source_attype == 'TimeFrame' and target_attype == 'Token': + pass + # TODO: check whether some action is needed for the next options + elif source_attype == 'TextDocument' and target_attype == 'VideoDocument': + pass + elif source_attype == 'VideoDocument' and target_attype == 'TextDocument': + pass + elif source_attype == 'BoundingBox' and target_attype == 'TimePoint': + pass + elif source_attype =='TimePoint' and target_attype == 'BoundingBox': + pass + elif source_attype == 'BoundingBox' and target_attype in ('Token', 'Sentence', 'Paragraph'): + pass + elif source_attype in ('Token', 'Sentence', 'Paragraph') and target_attype == 'BoundingBox': + pass + elif source_attype == 'TextDocument' and target_attype == 'TimePoint': + pass + elif source_attype == 'TimePoint' and target_attype == 'TextDocument': + pass + else: + print('-', source_attype, target_attype) + #if debug: + # print('DEBUG', self.anchors) + + def _get_document(self): + """Return the document or annotation node that the annotation/document in + the node refers to via the document property. This could be a local property + or a metadata property if there is no such local property. Return None + if neither of those exist.""" + # try the local property + docid = self.properties.get('document') + if docid is not None: + # print('>>>', docid, self.graph.get_node(docid)) + return self.graph.get_node(docid) + # try the metadata property + if self.view is not None: + try: + metadata = self.view.metadata.contains[self.at_type] + docid = metadata['document'] + return self.graph.get_node(docid) + except KeyError: + return None + return None + + def summary(self): + """The default summary is just the identfier, this should typically be + overriden by sub classes.""" + return { 'id': self.identifier } + + def has_label(self): + """Only TimeFrameNodes can have labels so this returns False.""" + return False + + def pp(self, close=True): + print('-' * 80) + print(self) + print(f' document = {self.document}') + for prop in self.properties: + print(f' {prop} = {self.properties[prop]}') + print(' targets = ') + for target in self.targets: + print(' ', target) + print(' anchors = ') + for anchor in self.anchors: + print(f' {anchor} -> {self.anchors[anchor]}') + if close: + print('-' * 80) + + +class TimeFrameNode(Node): + + def __str__(self): + frame_type = ' ' + self.frame_type() if self.has_label() else '' + return ('' + % (self.identifier, self.start(), self.end(), frame_type)) + + def start(self): + return self.properties.get('start', -1) + + def end(self): + return self.properties.get('end', -1) + + def frame_type(self): + # TODO: rename this, uses old property since replaced by "label"" + # NOTE: this is still aloowing for the old property though + return self.properties.get('label') or self.properties.get('frameType') + + def has_label(self): + return self.frame_type() is not None + + def representatives(self) -> list: + """Return a list of the representative TimePoints.""" + # TODO: why could I not get this from the anchors? + rep_ids = self.properties.get('representatives', []) + reps = [self.graph.get_node(rep_id) for rep_id in rep_ids] + return reps + + def summary(self): + """The summary of a time frame just contains the identifier, start, end + and frame type.""" + return { 'id': self.identifier, + 'start': self.properties['start'], + 'end': self.properties['end'], + 'frameType': self.properties.get('frameType') } + + +class EntityNode(Node): + + def __init__(self, graph, view, annotation): + super().__init__(graph, view, annotation) + self.tokens = [] + self._paths = None + self._anchor = None + + def __str__(self): + try: + start = self.properties['start'] + end = self.properties['end'] + except KeyError: + start, end = self.anchors['text-offsets'] + return ("" + % (self.identifier, start, end, self.properties['text'])) + + def start_in_video(self): + #print('+++', self.document.properties) + try: + return self.document.anchors['time-point'] + except KeyError: + return -1 + #return self.anchor()['video-start'] + + def end_in_video(self): + return self.anchor().get('video-end') + + ''' + Commented this out because the type checking in the code coverage tests requires + the default vaue for the close parameter to be the same as on Node.pp(). + + def pp(self, close=False): + super().pp(close=close) + try: + for i, p in enumerate(self.paths_to_docs()): + print(' %s' % ' '.join([str(n) for n in p[1:]])) + except ValueError: + print(' WARNING: error in path_to_docs in NamedEntityNode.pp()') + print('-' * 80) + ''' + + def summary(self): + """The summary for entities needs to include where in the video or image + the entity occurs, it is not enough to just give the text document.""" + # TODO: in the old days this used an anchor() method which was fragile + # TODO: revamping it now + return { + 'id': self.identifier, + 'group': self.properties['group'], + 'cat': self.properties['category'], + 'document': self.document.identifier, + # Entities in a TextDocument that is a full transcript without any + # alignments do not have a TimePoint + #'time-point': self.document.anchors.get('time-point'), + #'text-offsets': self.anchors.get('text-offsets'), + 'time-point': self.document.anchors.get('time-point', -1), + 'text-offsets': self.anchors.get('text-offsets', (-1 ,-1)), + #'document': self._get_document_plus_span(), + #'video-start': anchor.get('video-start'), + #'video-end': anchor.get('video-end'), + #'coordinates': self._coordinates_as_string(anchor) + } + + def anchor(self) -> dict: + """The anchor is the position in the video that the entity is linked to. + This anchor cannot be found in the document property because that points + to a text document that was somehow derived from the video document. Some + graph traversal is needed to get the anchor, but we know that the anchor + is always a time frame or a bounding box. + """ + # TODO: deal with the case where the primary document is not a video + self.paths = self.paths_to_docs() + bbtf = self.find_boundingbox_or_timeframe() + # for path in paths: + # print('... [') + # for n in path: print(' ', n) + # print('===', bbtf) + if bbtf.at_type.shortname == config.BOUNDING_BOX: + return {'video-start': bbtf.properties['timePoint'], + 'coordinates': bbtf.properties['coordinates']} + elif bbtf.at_type.shortname == config.TIME_FRAME: + return {'video-start': bbtf.properties['start'], + 'video-end': bbtf.properties['end']} + else: + return {} + + def anchor2(self): + """The anchor is the position in the video that the entity is linked to. + This anchor cannot be found in the document property because that points + to a text document that was somehow derived from the video document. Some + graph traversal is needed to get the anchor, but we know that the anchor + is always a time frame or a bounding box. + """ + # TODO: with this version you get an error that the paths variable does + # not exist yet, must get a clearer picture on how to build a graph + # where nodes have paths to anchors + # TODO: deal with the case where the primary document is not a video + if self._anchor is None: + self._paths = self.paths_to_docs() + bbtf = self.find_boundingbox_or_timeframe() + # for path in self._paths: + # print('... [') + # for n in path: print(' ', n) + # print('===', bbtf) + if bbtf.at_type.shortname == config.BOUNDING_BOX: + self._anchor = {'video-start': bbtf.properties['timePoint'], + 'coordinates': bbtf.properties['coordinates']} + elif bbtf.at_type.shortname == config.TIME_FRAME: + self._anchor = {'video-start': bbtf.properties['start'], + 'video-end': bbtf.properties['end']} + return self._anchor + + def find_boundingbox_or_timeframe(self): + return self.paths[-1][-2] + + @staticmethod + def _coordinates_as_string(anchor): + if 'coordinates' not in anchor: + return None + return ','.join(["%s:%s" % (pair[0], pair[1]) + for pair in anchor['coordinates']]) + + +class Nodes(object): + + """Factory class for Node creation. Use Node for creation unless a special + class was registered for the kind of annotation we have.""" + + node_classes = { config.NAMED_ENTITY: EntityNode, + config.TIME_FRAME: TimeFrameNode } + + @classmethod + def new(cls, graph, view, annotation): + node_class = cls.node_classes.get(annotation.at_type.shortname, Node) + return node_class(graph, view, annotation) + diff --git a/mmif/utils/summarizer/summary.py b/mmif/utils/summarizer/summary.py new file mode 100644 index 00000000..a5c9bb07 --- /dev/null +++ b/mmif/utils/summarizer/summary.py @@ -0,0 +1,727 @@ +"""MMIF Summarizer + +MMIF consumer that creates a JSON summary from a MMIF file. + +Makes some simplifying assumptions, including: + +- There is one video in the MMIF documents list. All start and end properties + are pointing to that video. +- The time unit is assumed to be milliseconds. + +Other assumptions are listed with the options below. + + +USAGE: + + $ python summary.py [OPTIONS] + + Reads the MMIF file and creates a JSON summary file with the document list + and any requested extra information. + +Example: + + $ python summary -i input.mmif -o output.json --transcript + + Reads input.mmif and creates output.json with just transcript + information added to the documents list and the views. + +In all cases, the summarizer will summarize what is there and use the information +that is there, if the output of CLAMS is bad, then the results of the summarizer +will be bad (although it may hide a lot of the badness). In some rare cases some +information is added. For example if the ASR tool does not group tokens then the +summarizer will do that, but then only by simply grouping in equal chunks and not +trying to infer sentence-like groupings. + +The summary always includes the MMIF version, the list of documents and a summary +of the metadata of all views (identifier, CLAMS app, timestamp, total number of +annotations and number of annotations per type, it does not show parameters and +application configuration). + + +OPTIONS: + +-i INFILE -o OUTFILE + +Run the summarizer over a single MMIF file and write the JSON summary to OUTFILE. + +-- timeframes + +Shows basic information of all timeframes. This groups the timeframes according to +the apps it was found in. + +--transcript + +Shows the text from the transcript in pseudo sentences. + +The transcript is taken from the last non-warning ASR view, so only the last added +transcript will be summarized. It is assumed that Tokens in the view are ordered on +text occurrence. + +--captions + +Shows captions from the Llava captioner app. + +--entities + +Include entities from spaCy or other NER. + +--full + +Include all the above. + +""" + +# TODO: +# - For the time unit we should really update get_start(), get_end() and other methods. + + +import os, sys, io, json, argparse, pathlib +from collections import defaultdict + +from mmif.serialize import Mmif +from mmif.vocabulary import DocumentTypes + +from mmif.utils.summarizer import config +from mmif.utils.summarizer.utils import CharacterList +from mmif.utils.summarizer.utils import get_aligned_tokens, timestamp +from mmif.utils.summarizer.utils import get_transcript_view, get_last_segmenter_view, get_captions_view +from mmif.utils.summarizer.graph import Graph + + +VERSION = '0.2.0' + + +DEBUG = False + +def debug(*texts): + if DEBUG: + for text in texts: + sys.stderr.write(f'{text}\n') + + +class SummaryException(Exception): + pass + + +class Summary(object): + + """Implements the summary of a MMIF file. + + fname - name of the input mmif file + mmif - instance of mmif.serialize.Mmif + graph - instance of graph.Graph + documents - instance of Documents + views - instance of Views + transcript - instance of Transcript + timeframes - instance of TimeFrames + entities - instance of Entities + captions - instance of get_captions_view + + """ + + def __init__(self, mmif_file): + self.fname = mmif_file + #self.mmif = mmif if type(mmif) is Mmif else Mmif(mmif) + self.mmif = Mmif(pathlib.Path(mmif_file).read_text()) + self.warnings = [] + self.graph = Graph(self.mmif) + self.mmif_version = self.mmif.metadata['mmif'] + self.documents = Documents(self) + self.annotations = Annotations(self) + self.document = Document(self) + self.views = Views(self) + self.timeframes = TimeFrames(self) + self.timeframe_stats = TimeFrameStats(self) + self.transcript = Transcript(self) + self.captions = Captions(self) + self.entities = Entities(self) + self.validate() + self.print_warnings() + + def add_warning(self, warning: str): + self.warnings.append(warning) + + def validate(self): + """Minimal validation of the input. Mostly a place holder because all it + does now is to check how many video documents there are.""" + if len(self.video_documents()) > 1: + raise SummaryException("More than one video document in MMIF file") + + def video_documents(self): + return self.mmif.get_documents_by_type(DocumentTypes.VideoDocument) + + def report(self, outfile=None): + json_obj = { + 'mmif_version': self.mmif.metadata.mmif, + 'document': self.document.data, + 'documents': self.documents.data, + 'annotations': self.annotations.data, + 'views': self.views.data, + 'transcript': self.transcript.data, + 'captions': self.captions.as_json(), + 'timeframes': self.timeframes.as_json(), + 'timeframe_stats': self.timeframe_stats.data, + 'entities': self.entities.as_json() + } + report = json.dumps(json_obj, indent=2) + if outfile is None: + return report + else: + with open(outfile, 'w') as fh: + fh.write(report) + + def print_warnings(self): + for warning in self.warnings: + print(f'WARNING: {warning}') + + def pp(self): + self.documents.pp() + self.views.pp() + self.transcript.pp() + self.timeframes.pp() + self.entities.pp() + print() + + +class Documents(object): + + """Contains a list of document summaries, which are dictionaries with just + the id, type and location properties.""" + + def __init__(self, summary: Summary): + self.data = [self.summary(doc) for doc in summary.graph.documents] + + def __len__(self): + return len(self.data) + + @staticmethod + def summary(doc): + return { 'id': doc.id, + 'type': doc.at_type.shortname, + 'location': doc.location } + + def pp(self): + print('\nDocuments -> ') + for d in self.data: + print(' %s %s' % (d['type'], d['location'])) + + +class Annotations(object): + + """Contains a dictionary of Annotation object summaries, indexed on view + identifiers.""" + + def __init__(self, summary): + self.data = defaultdict(list) + # summary.graph.get_nodes(config.ANNOTATION, view_id=view.id) + for anno in summary.graph.get_nodes(config.ANNOTATION): + self.data[anno.view.id].append(anno.properties) + + def get(self, item): + return self.data.get(item, []) + + def get_all_annotations(self): + annotations = [] + for annos in self.data.values(): + annotations.extend(annos) + return annotations + + +class Document(object): + + """Collects some document-level information, including MMIF version, size of + the MMIF file and some information from the SWT document annotation.""" + + def __init__(self, summary): + self.data = { + 'mmif_version': summary.mmif_version, + 'size': os.path.getsize(summary.fname) } + annotations = summary.annotations.get_all_annotations() + if annotations: + # TODO: this if fragile because it assumes that the annotation we want + # (which is the one from SWT) is always the first + doc_level_annotation = annotations[0] + if 'fps' in doc_level_annotation: + self.data['fps'] = doc_level_annotation['fps'] + if 'frameCount' in doc_level_annotation: + self.data['frames'] = doc_level_annotation['frameCount'] + if 'duration' in doc_level_annotation: + duration = doc_level_annotation['duration'] + # both in milliseconds and as a timestamp + self.data['duration_ms'] = duration + self.data['duration_ts'] = timestamp(duration) + + +class Views(object): + + """Contains a list of view summaries, which are dictionaries with just + the id, app and timestamp properties.""" + + def __init__(self, summary): + self.summary = summary + self.data = [self.get_view_summary(view) for view in summary.mmif.views] + + def __getitem__(self, i): + return self.data[i] + + def __len__(self): + return len(self.data) + + #@staticmethod + def get_view_summary(self, view): + annotation_types = defaultdict(int) + for annotation in view.annotations: + annotation_types[annotation.at_type.shortname] += 1 + basic_info = { + 'id': view.id, + 'app': view.metadata.app, + 'timestamp': view.metadata.timestamp, + 'contains': [str(k) for k in view.metadata.contains.keys()], + 'annotation_count': len(view.annotations), + 'annotation_types': dict(annotation_types), + 'parameters': view.metadata.parameters, + 'appConfiguration': view.metadata.appConfiguration } + if view.metadata.warnings: + basic_info['warnings'] = view.metadata.warnings + if view.metadata.error: + basic_info['error'] = view.metadata.error + return basic_info + + def pp(self): + print('\nViews -> ') + for v in self.data: + print(' %s' % v['app']) + + +class Transcript(object): + + """The transcript contains the string value from the first text document in the + last ASR view. It issues a warning if there is more than one text document in + the view.""" + + def __init__(self, summary): + self.summary = summary + self.data = [] + view = get_transcript_view(summary.mmif.views) + if view is not None: + documents = view.get_documents() + if len(documents) > 1: + summary.add_warning(f'More than one TextDocument in ASR view {view.id}') + t_nodes = summary.graph.get_nodes(config.TOKEN, view_id=view.id) + s_nodes = summary.graph.get_nodes(config.SENTENCE, view_id=view.id) + if not t_nodes: + return + if s_nodes: + # Whisper has Sentence nodes + sentences = self.collect_targets(s_nodes) + sentence_ids = [n.identifier for n in s_nodes] + else: + # But Kaldi does not + sentences = self.create_sentences(t_nodes) + sentence_ids = [None] * len(sentences) + # initialize the transcripts with all blanks, most blanks will be + # overwrite with characters from the tokens + transcript = CharacterList(self.transcript_size(sentences)) + for s_id, s in zip(sentence_ids, sentences): + transcript_element = TranscriptElement(s_id, s, transcript) + self.data.append(transcript_element.as_json()) + + def __str__(self): + return str(self.data) + + @staticmethod + def transcript_size(sentences): + try: + return sentences[-1][-1].properties['end'] + except IndexError: + return 0 + + def collect_targets(self, s_nodes): + """For each node (in this context a sentence node), collect all target nodes + (which are tokens) and return them as a list of lists, with one list for each + node.""" + targets = [] + for node in s_nodes: + node_target_ids = node.properties['targets'] + node_targets = [self.summary.graph.get_node(stid) for stid in node_target_ids] + targets.append(node_targets) + return targets + + def create_sentences(self, t_nodes, sentence_size=12): + """If there is no sentence structure then we create it just by chopping th + input into slices of some pre-determined length.""" + # TODO: perhaps the size paramater should be set in the config file or via a + # command line option. + return [t_nodes[i:i + sentence_size] + for i in range(0, len(t_nodes), sentence_size)] + + +class TranscriptElement: + + """Utility class to handle data associated with an element from a transcript, + which is created from a sentence which is a list of Token Nodes. Initialization + has the side effect of populating the full transcript which is an instance of + CharacterList and which is also accessed here.""" + + def __init__(self, identifier: str, sentence: list, transcript: CharacterList): + for t in sentence: + # this adds the current token to the transcript + start = t.properties['start'] + end = t.properties['end'] + word = t.properties['word'] + transcript.set_chars(word, start, end) + self.id = identifier + self.start = sentence[0].anchors['time-offsets'][0] + self.end = sentence[-1].anchors['time-offsets'][1] + self.start_offset = sentence[0].properties['start'] + self.end_offset = sentence[-1].properties['end'] + self.text = transcript.getvalue(self.start_offset, self.end_offset) + + def __str__(self): + text = self.text if len(self.text) <= 50 else self.text[:50] + '...' + return f'' + + def as_json(self): + json_obj = { + "start-time": self.start, + "end-time": self.end, + "text": self.text } + if self.id is not None: + json_obj["id"] = self.id + return json_obj + + +class Nodes(object): + + """Abstract class to store instances of subclasses of graph.Node. The + initialization methods of subclasses of Nodes can guard what nodes will + be allowed in, for example, as of July 2022 the TimeFrames class only + allowed time frames that had a frame type (thereby blocking the many + timeframes from Kaldi). + + Instance variables: + + summary - an instance of Summary + graph - an instance of graph.Graph, taken from the summary + nodes - list of instances of subclasses of graph.Node + + """ + + def __init__(self, summary): + self.summary = summary + self.graph = summary.graph + self.nodes = [] + + def __getitem__(self, i): + return self.nodes[i] + + def __len__(self): + return len(self.nodes) + + def add(self, node): + self.nodes.append(node) + + def get_nodes(self, **props): + """Return all the nodes that match the given properties.""" + def prop_check(p, v, props_given): + return v == props_given.get(p) if p in props_given else False + return [n for n in self + if all([prop_check(p, v, n.annotation.properties) + for p, v in props.items()])] + + +class TimeFrames(Nodes): + + """For now, we take only the TimeFrames that have a frame type, which rules out + all the frames we got from Kaldi.""" + + def __init__(self, summary): + super().__init__(summary) + # a dictionary mapping app names to lists of timeframe summaries + self.data = defaultdict(list) + for tf_node in self.graph.get_nodes(config.TIME_FRAME): + if tf_node.has_label(): + self.add(tf_node) + self._collect_timeframe_summaries() + self._sort_timeframe_summaries() + + def _collect_timeframe_summaries(self): + for tf in self.nodes: + label = tf.frame_type() + try: + start, end = tf.anchors['time-offsets'] + except KeyError: + # TODO: + # - this defies the notion of using the anchors for this, but + # maybe in this case we should go straight to the start/end + # - this code below also raises an error if there are no start + # and end properties + start = tf.properties['start'] + end = tf.properties['end'] + representatives = tf.representatives() + rep_tps = [rep.properties['timePoint'] for rep in representatives] + score = tf.properties.get('classification', {}).get(label) + app = tf.view.metadata.app + self.data[app].append( + { 'identifier': tf.identifier, 'label': label, 'score': score, + 'start-time': start, 'end-time': end, 'representatives': rep_tps }) + + def _sort_timeframe_summaries(self): + """Sort the data on their start time, do this for all apps.""" + for app in self.data: + sort_function = lambda x: x['start-time'] + self.data[app] = list(sorted(self.data[app], key=sort_function)) + + def as_json(self): + return self.data + + def pp(self): + print('\nTimeframes -> ') + for tf in self.nodes: + summary = tf.summary() + print(' %s:%s %s' % (summary['start'], summary['end'], + summary['frameType'])) + + +class TimeFrameStats(object): + + def __init__(self, summary): + # a dictionary mapping app names to frameType->duration dictionaries, + # where the duration is cumulative over all instances + self.timeframes = summary.timeframes + self.data = {} + self._collect_durations() + self._collect_other_morsels() + + def _collect_durations(self): + timeframes = self.timeframes.data + for app in timeframes: + self.data[app] = {} + for tf in timeframes[app]: + label = tf.get('label') + if label not in self.data[app]: + self.data[app][label] = {'count': 0, 'duration': 0} + self.data[app][label]['count'] += 1 + duration = tf['end-time'] - tf['start-time'] + if label is not None: + # TODO: these gave weird values for duration + #print('---',app, label, duration) + self.data[app][label]['duration'] += duration + duration = self.data[app][label]['duration'] + count = self.data[app][label]['count'] + self.data[app][label]['average'] = duration // count + + def _collect_other_morsels(self): + # First we want everything grouped by app and label + timeframes = self.timeframes.data + grouped_timeframes = defaultdict(lambda: defaultdict(list)) + for app in timeframes: + for tf in timeframes[app]: + label = tf.get('label') + grouped_timeframes[app][label].append(tf) + # The we pick the morsels for each label + for app in grouped_timeframes: + for label in grouped_timeframes[app]: + tfs = grouped_timeframes[app][label] + sort_on_start = lambda tf: tf['start-time'] + sort_on_length = lambda tf: tf['end-time'] - tf['start-time'] + first_tf = list(sorted(tfs, key=sort_on_start))[0] + longest_tf = list(sorted(tfs, key=sort_on_length, reverse=True))[0] + self.data[app][label]['first'] = first_tf['start-time'] + self.data[app][label]['longest'] = longest_tf['start-time'] + + +class Entities(Nodes): + + """Collecting instances of graph.EntityNode. + + nodes_idx - lists of instances of graph.EntityNode, indexed on entity text + { entity-string ==> list of graph.EntityNode } + bins - an instance of Bins + + """ + + def __init__(self, summary): + super().__init__(summary) + self.nodes_idx = {} + self.bins = None + for ent in self.graph.get_nodes(config.NAMED_ENTITY): + self.add(ent) + self._create_node_index() + self._group() + + def __str__(self): + return f'' + + def _create_node_index(self): + """Put all the entities from self.nodes in self.node_idx. This first puts + the nodes into the dictionary indexed on text string and then sorts the + list of nodes for each string on video position.""" + for ent in self: + self.nodes_idx.setdefault(ent.properties['text'], []).append(ent) + for text, entities in self.nodes_idx.items(): + self.nodes_idx[text] = sorted(entities, + key=(lambda e: e.start_in_video())) + + def _group(self): + """Groups all the nodes on the text and sorts them on position in the video, + for the latter it will also create bins of entities that occur close to each + other in the text.""" + # create the bins, governed by the summary's granularity + self.bins = Bins(self.summary) + for text, entities in self.nodes_idx.items(): + self.bins.current_bin = None + for entity in entities: + self.bins.add_entity(text, entity) + self.bins.mark_entities() + + def _add_tags(self, tags): + for tag in tags: + tag_doc = tag.properties['document'] + tag_p1 = tag.properties['start'] + tag_p2 = tag.properties['end'] + entities = self.nodes_idx.get(tag.properties['text'], []) + for entity in entities: + props = entity.properties + doc = props['document'] + p1 = props['start'] + p2 = props['end'] + if tag_doc == doc and tag_p1 == p1 and tag_p2 == p2: + entity.properties['tag'] = tag.properties['tagName'] + + def as_json(self): + json_obj = [] + for text in self.nodes_idx: + entity = {"text": text, "instances": []} + json_obj.append(entity) + for e in self.nodes_idx[text]: + entity["instances"].append(e.summary()) # e.summary(), E_PROPS) + return json_obj + + def pp(self): + print('\nEntities -> ') + for e in self.nodes_idx: + print(' %s' % e) + for d in self.nodes_idx[e]: + props = ["%s=%s" % (p, v) for p, v in d.summary().items()] + print(' %s' % ' '.join(props)) + + def print_groups(self): + for key in sorted(self.nodes_idx): + print(key) + for e in self.nodes_idx[key]: + print(' ', e, e.start_in_video()) + + +class Captions(Nodes): + + def __init__(self, summary): + super().__init__(summary) + self.captions = [] + view = get_captions_view(summary.mmif.views) + if view is not None: + for doc in self.graph.get_nodes(config.TEXT_DOCUMENT, view_id=view.id): + text = doc.properties['text']['@value'].split('[/INST]')[-1] + debug( + f'>>> DOC {doc}', + f'>>> PROPS {list(doc.properties.keys())}', + f'>>> TEXT ' + text.replace("\n", "")[:100], + f'>>> ANCHORS {doc.anchors}') + if 'time-offsets' in doc.anchors and 'representatives' in doc.anchors: + # For older LLava-style captions + # http://apps.clams.ai/llava-captioner/v1.2-6-gc824c97 + # NOTE: probably obsolete, at least the link above is dead + tp_id = doc.anchors["representatives"][0] + tp = summary.graph.get_node(tp_id) + if tp is not None: + self.captions.append( + { 'identifier': doc.identifier, + 'time-point': tp.properties['timePoint'], + 'text': text }) + if 'time-point' in doc.anchors: + # For newer SmolVLM-style captions + # http://apps.clams.ai/smolvlm2-captioner + self.captions.append( + { 'identifier': doc.identifier, + 'time-point': doc.anchors['time-point'], + 'text': text }) + + def as_json(self): + return self.captions + #return [(ident, p1, p2, text) for ident, p1, p2, text in self.captions] + + +class Bins(object): + + def __init__(self, summary): + self.summary = summary + self.bins = {} + self.current_bin = None + self.current_text = None + + def __str__(self): + return f'' + + def __len__(self): + return len(self.bins) + + def add_entity(self, text, entity): + """Add an entity instance to the appropriate bin.""" + if self.current_bin is None: + # Add the first instance of a new entity (as defined by the text), + # since it is the first a new bin will be created. + self.current_text = text + self.current_bin = Bin(entity) + self.bins[text] = [self.current_bin] + else: + # For following entities with the same text, a new bin may be + # created depending on the positions and the granularity. + p1 = self.current_bin[-1].start_in_video() + p2 = entity.start_in_video() + # p3 = entity.end_in_video() + if p2 - p1 < config.GRANULARITY: + # TODO: should add p3 here + self.current_bin.add(entity) + else: + self.current_bin = Bin(entity) + self.bins[self.current_text].append(self.current_bin) + + def mark_entities(self): + """Marks all entities with the bin that they occur in. This is done to export + the grouping done with the bins to the entities and this way the bins never need + to be touched again.""" + # TODO: maybe use the bins when we create the output + for entity_bins in self.bins.values(): + for i, e_bin in enumerate(entity_bins): + for entity in e_bin: + entity.properties['group'] = i + + def print_bins(self): + for text in self.bins: + print(text) + text_bins = self.bins[text] + for i, text_bin in enumerate(text_bins): + text_bin.print_nodes(i) + print() + + +class Bin(object): + + def __init__(self, node): + # TODO: we are not using these yet, but a bin should have a begin and + # end in the video which should be derived from the start and end of + # entities in the video. The way we put things in bins now is a bit + # fragile since it depends on the start or end of the last element. + self.start = 0 + self.end = 0 + self.nodes = [node] + + def __getitem__(self, i): + return self.nodes[i] + + def add(self, node): + self.nodes.append(node) + + def print_nodes(self, i): + for node in self.nodes: + print(' ', i, node) diff --git a/mmif/utils/summarizer/utils.py b/mmif/utils/summarizer/utils.py new file mode 100644 index 00000000..61c3bc8b --- /dev/null +++ b/mmif/utils/summarizer/utils.py @@ -0,0 +1,266 @@ +"""Utility methods + +""" + +import io +from pathlib import Path +from xml.sax.saxutils import quoteattr, escape +from collections import UserList + +from mmif import View, Annotation +from mmif.utils.summarizer.config import KALDI, WHISPER, CAPTIONER, SEGMENTER +from mmif.utils.summarizer.config import TOKEN, ALIGNMENT, TIME_FRAME + + +def compose_id(view_id, anno_id): + """Composes the view identifier with the annotation identifier.""" + return anno_id if ':' in anno_id else view_id + ':' + anno_id + + +def type_name(annotation): + """Return the short name of the type.""" + return annotation.at_type.split('/')[-1] + + +def get_transcript_view(views): + """Return the last Whisper or Kaldi view that is not a warnings view.""" + # TODO: this now has a simplified idea of how to find a view, should at least + # move towards doing some regular expression matching on the WHISPER config + # setting. The same holds for other functions to get views. + for view in reversed(views): + if view.metadata.app in KALDI + WHISPER: + if view.metadata.warnings: + continue + return view + return None + + +def get_captions_view(views): + """Return the last view created by the captioner.""" + for view in reversed(views): + if view.metadata.app in CAPTIONER: + if view.metadata.warnings: + continue + return view + return None + + +def get_last_segmenter_view(views): + for view in reversed(views): + # print(f'>>> {view.metadata.app}') + if view.metadata.app.startswith(SEGMENTER): + return view + return None + + +def get_aligned_tokens(view): + """Get a list of tokens from an ASR view where for each token we add a timeframe + properties which has the start and end points of the aligned timeframe.""" + idx = AnnotationsIndex(view) + for alignment in idx.get_annotations(ALIGNMENT).values(): + token = idx[TOKEN].get(alignment.properties['target']) + frame = idx[TIME_FRAME].get(alignment.properties['source']) + if token and frame: + # add a timeframe to the token, we can do this now that we do not + # freeze MMIF annotations anymore + token.properties['timeframe'] = (frame.properties['start'], + frame.properties['end']) + return idx.tokens + + +def timestamp(milliseconds: int, format='hh:mm:ss'): + # sometimes the milliseconds are not a usable float + if milliseconds in (None, -1): + return 'nil' + milliseconds = int(milliseconds) + seconds = milliseconds // 1000 + minutes = seconds // 60 + hours = minutes // 60 + ms = milliseconds % 1000 + s = seconds % 60 + m = minutes % 60 + if format == 'hh:mm:ss:mmm': + return f'{hours}:{m:02d}:{s:02d}.{ms:03d}' + elif format == 'hh:mm:ss': + return f'{hours}:{m:02d}:{s:02d}' + elif format == 'mm:ss': + return f'{m:02d}:{s:02d}' + elif format == 'mm:ss:mmm': + return f'{m:02d}:{s:02d}.{ms:03d}' + else: + return f'{hours}:{m:02d}:{s:02d}.{ms:03d}' + + + +class AnnotationsIndex: + + """Creates an index on the annotations list for a view, where each annotation type + is indexed on its identifier. Tokens are special and get their own list.""" + + def __init__(self, view): + self.view = view + self.idx = {} + self.tokens = [] + for annotation in view.annotations: + shortname = annotation.at_type.shortname + if shortname == TOKEN: + self.tokens.append(annotation) + self.idx.setdefault(annotation.at_type.shortname, {}) + self.idx[shortname][annotation.properties.id] = annotation + + def __str__(self): + return f'' + + def __getitem__(self, item): + return self.idx[item] + + def get_annotations(self, at_type): + return self.idx.get(at_type, {}) + + +class CharacterList(UserList): + + """Auxiliary datastructure to help print a list of tokens. It allows you to + back-engineer a sentence from the text and character offsets of the tokens.""" + + def __init__(self, n: int, char=' '): + self.size = n + self.char = char + self.data = n * [char] + + def __str__(self): + return f'' + + def __len__(self): + return self.size + + def __setitem__(self, key, value): + try: + self.data[key] = value + except IndexError: + for i in range(len(self), key + 1): + self.data.append(self.char) + self.data[key] = value + + def set_chars(self, text: str, start: int, end: int): + self.data[start:end] = text + + def getvalue(self, start: int, end: int): + return ''.join(self.data[start:end]) + + +def xml_tag(tag, subtag, objs, props, indent=' ') -> str: + """Return an XML string for a list of instances of subtag, grouped under tag.""" + s = io.StringIO() + s.write(f'{indent}<{tag}>\n') + for obj in objs: + s.write(xml_empty_tag(subtag, indent + ' ', obj, props)) + s.write(f'{indent}\n') + return s.getvalue() + + +def xml_empty_tag(tag_name: str, indent: str, obj: dict, props: tuple) -> str: + """Return an XML tag to an instance of io.StringIO(). Only properties from obj + that are in the props tuple are printed.""" + pairs = [] + for prop in props: + if prop in obj: + if obj[prop] is not None: + #pairs.append("%s=%s" % (prop, xml_attribute(obj[prop]))) + pairs.append(f'{prop}={xml_attribute(obj[prop])}') + attrs = ' '.join(pairs) + return f'{indent}<{tag_name} {attrs}/>\n' + + +def write_tag(s, tagname: str, indent: str, obj: dict, props: tuple): + """Write an XML tag to an instance of io.StringIO(). Only properties from obj + that are in the props tuple are printed.""" + pairs = [] + for prop in props: + if prop in obj: + if obj[prop] is not None: + pairs.append("%s=%s" % (prop, xml_attribute(obj[prop]))) + s.write('%s<%s %s/>\n' + % (indent, tagname, ' '.join(pairs))) + + +def xml_attribute(attr): + """Return attr as an XML attribute.""" + return quoteattr(str(attr)) + + +def xml_data(text): + """Return text as XML data.""" + return escape(str(text)) + + +def normalize_id(doc_ids: list, view: View, annotation: Annotation): + """Change identifiers to include the view identifier if it wasn't included, + do nothing otherwise. This applies to the Annotation id, target, source, + document, targets and representatives properties. Note that timePoint is + not included because the value is an integer and not an identifier.""" + # TODO: this seems somewhat fragile + # TODO: spell out what doc_ids is for (to exclude source documents I think) + debug = False + attype = annotation.at_type.shortname + props = annotation.properties + if ':' not in annotation.id and view is not None: + if annotation.id not in doc_ids: + newid = f'{view.id}:{annotation.id}' + annotation.properties['id'] = newid + if 'document' in props: + doc_id = props['document'] + if ':' not in doc_id and view is not None: + if doc_id not in doc_ids: + props['document'] = f'{view.id}:{doc_id}' + if 'targets' in props: + new_targets = [] + for target in props['targets']: + if ':' not in target and view is not None: + if target not in doc_ids: + new_targets.append(f'{view.id}:{target}') + else: + new_targets.append(target) + props['targets'] = new_targets + if 'representatives' in props: + new_representatives = [] + for rep in props['representatives']: + if ':' not in rep and view is not None: + new_representatives.append(f'{view.id}:{rep}') + else: + new_representatives.append(rep) + props['representatives'] = new_representatives + if attype == 'Alignment': + if ':' not in props['source'] and view is not None: + if props['source'] not in doc_ids: + props['source'] = f'{view.id}:{props["source"]}' + if ':' not in props['target'] and view is not None: + if props['target'] not in doc_ids: + props['target'] = f'{view.id}:{props["target"]}' + if debug: + print('===', annotation) + + +def get_annotations_from_view(view, annotation_type): + """Return all annotations from a view that match the short name of the + annotation type.""" + # Note: there is method mmif.View.get_annotations() where you can give + # at_type as a parameter, but it requires a full match. + return [a for a in view.annotations + if a.at_type.shortname == annotation_type] + + +def find_matching_tokens(tokens, ne): + matching_tokens = [] + ne_start = ne.properties["start"] + ne_end = ne.properties["end"] + start_token = None + end_token = None + for token in tokens: + if token.properties['start'] == ne_start: + start_token = token + if token.properties['end'] == ne_end: + end_token = token + return start_token, end_token + +