From 0c40bacf9130fa3314665105f14d337d85830599 Mon Sep 17 00:00:00 2001 From: Marc Verhagen Date: Tue, 23 Dec 2025 12:20:35 -0500 Subject: [PATCH 1/7] Importing the summarizer, fixing its imports and hooking it up with a MMIF CLI script --- .gitignore | 5 + mmif/utils/cli/summarize.py | 43 ++ mmif/utils/summarizer/__init__.py | 52 +++ mmif/utils/summarizer/config.py | 69 +++ mmif/utils/summarizer/graph.py | 596 ++++++++++++++++++++++++ mmif/utils/summarizer/summary.py | 731 ++++++++++++++++++++++++++++++ mmif/utils/summarizer/utils.py | 301 ++++++++++++ 7 files changed, 1797 insertions(+) create mode 100644 mmif/utils/cli/summarize.py create mode 100644 mmif/utils/summarizer/__init__.py create mode 100644 mmif/utils/summarizer/config.py create mode 100644 mmif/utils/summarizer/graph.py create mode 100644 mmif/utils/summarizer/summary.py create mode 100644 mmif/utils/summarizer/utils.py diff --git a/.gitignore b/.gitignore index 013ab917..f937b437 100644 --- a/.gitignore +++ b/.gitignore @@ -75,9 +75,14 @@ mmif/ver mmif/res mmif/vocabulary ./VERSION* +VERSION .hypothesis # Documentation build artifacts documentation/cli_help.rst documentation/whatsnew.rst docs-test + +# environments +.venv* +venv* diff --git a/mmif/utils/cli/summarize.py b/mmif/utils/cli/summarize.py new file mode 100644 index 00000000..8b88c53d --- /dev/null +++ b/mmif/utils/cli/summarize.py @@ -0,0 +1,43 @@ +import sys +import argparse + +from mmif.utils.summarizer.summary import Summary + + + +def describe_argparser() -> tuple: + """ + Returns two strings: a one-line description of the argparser and additional + material, which will be shown for `mmif --help` and `mmif summarize --help`, + respectively. For now they return the same string. The retun value should + still be a tuple because mmif.cli() depends on it. + """ + oneliner = 'provides a CLI to create a JSON Summary for a MMIF file' + return oneliner, oneliner + + +def prep_argparser(**kwargs): + parser = argparse.ArgumentParser( + description=describe_argparser()[1], + formatter_class=argparse.RawDescriptionHelpFormatter, + **kwargs) + parser.add_argument("-i", metavar='MMIF_FILE', help='input MMIF file', required=True) + parser.add_argument("-o", metavar='JSON_FILE', help='output JSON summary file', required=True) + parser.add_argument("--full", action="store_true", help="print full report") + parser.add_argument('--transcript', action='store_true', help='include transcript') + parser.add_argument('--captions', action='store_true', help='include Llava captions') + parser.add_argument('--timeframes', action='store_true', help='include all time frames') + parser.add_argument('--entities', action='store_true', help='include entities from transcript') + return parser + + +def main(args): + #print('>>>', args) + mmif_summary = Summary(args.i) + #print('>>>', mmif_summary) + mmif_summary.report( + outfile=args.o, full=args.full, + #timeframes=args.timeframes, transcript=args.transcript, + #captions=args.captions, entities=args.entities + ) + diff --git a/mmif/utils/summarizer/__init__.py b/mmif/utils/summarizer/__init__.py new file mode 100644 index 00000000..59a980fe --- /dev/null +++ b/mmif/utils/summarizer/__init__.py @@ -0,0 +1,52 @@ + +import argparse + +from mmif.utils.summarizer.summary import Summary + + +def argparser(): + parser = argparse.ArgumentParser(description='Create a JSON Summary for a MMIF file') + parser.add_argument('-i', metavar='MMIF_FILE', help='input MMIF file', required=True) + parser.add_argument('-o', metavar='JSON_FILE', help='output JSON summary file', required=True) + parser.add_argument('--full', action='store_true', help='create full report') + parser.add_argument('--transcript', action='store_true', help='include transcript') + parser.add_argument('--captions', action='store_true', help='include Llava captions') + parser.add_argument('--timeframes', action='store_true', help='include all time frames') + parser.add_argument('--entities', action='store_true', help='include entities from transcript') + return parser + + +def pp_args(args): + for a, v in args.__dict__.items(): + print(f'{a:12s} --> {v}') + + +def main(): + parser = argparser() + args = parser.parse_args() + #pp_args(args) + mmif_summary = Summary(args.i) + mmif_summary.report( + outfile=args.o, full=args.full, + timeframes=args.timeframes, transcript=args.transcript, + captions=args.captions, entities=args.entities) + + +""" + +There used to be an option to process a whole directory, but I never used it and decided +that if needed it would better be done by an extra script or a separate function. + +The code for when there was a -d option is here just in case. + +if args.d: + for mmif_file in pathlib.Path(args.d).iterdir(): + if mmif_file.is_file() and mmif_file.name.endswith('.mmif'): + print(mmif_file) + json_file = str(mmif_file)[:-4] + 'json' + mmif_summary = Summary(mmif_file.read_text()) + mmif_summary.report( + outfile=json_file, full=args.full, + timeframes=args.timeframes, transcript=args.transcript, + captions=args.captions, entities=args.entities) +""" \ No newline at end of file diff --git a/mmif/utils/summarizer/config.py b/mmif/utils/summarizer/config.py new file mode 100644 index 00000000..f972bd97 --- /dev/null +++ b/mmif/utils/summarizer/config.py @@ -0,0 +1,69 @@ + +from mmif.vocabulary import DocumentTypes +from mmif.vocabulary import AnnotationTypes + + +# The name of CLAMS applications, used to select views and to determine whether +# the summarizer is appropriate for the app version. +# TODO: this now requires an exhaustive listing of all allowed apps and their +# versions, we need a more maintainable system. + +KALDI = [ + # The first two use MMIF 0.4 and should probably be retired + 'http://apps.clams.ai/aapb-pua-kaldi-wrapper/0.2.2', + 'http://apps.clams.ai/aapb-pua-kaldi-wrapper/0.2.3', + 'http://apps.clams.ai/aapb-pua-kaldi-wrapper/v3'] + +WHISPER = [ + 'http://apps.clams.ai/whisper-wrapper/v7', + 'http://apps.clams.ai/whisper-wrapper/v8', + 'http://apps.clams.ai/whisper-wrapper/v8-3-g737e280'] + +CAPTIONER = [ + 'http://apps.clams.ai/llava-captioner/v1.2-6-gc824c97', + 'http://apps.clams.ai/smolvlm2-captioner'] + +NER = [ + 'http://apps.clams.ai/spacy-wrapper/v1.1', + 'http://apps.clams.ai/spacy-wrapper/v2.1'] + +SEGMENTER = 'http://apps.clams.ai/audio-segmenter' + + +# When a named entity occurs 20 times we do not want to generate 20 instances of +# it. If the start of the next entity occurs within the below number of +# milliseconds after the end of the previous, then it is just added to the +# previous one. Taking one minute as the default so two mentions in a minute end +# up being the same instance. This setting can be changed with the 'granularity' +# parameter. +# TODO: this seems broken + +GRANULARITY = 1000 + + +# Properties used for the summary for various tags + +DOC_PROPS = ('id', 'type', 'location') +VIEW_PROPS = ('id', 'timestamp', 'app') +TF_PROPS = ('id', 'start', 'end', 'frameType') +E_PROPS = ('id', 'group', 'cat', 'tag', 'video-start', 'video-end', 'coordinates') + + +# Names of types + +TEXT_DOCUMENT = DocumentTypes.TextDocument.shortname +VIDEO_DOCUMENT = DocumentTypes.VideoDocument.shortname +TIME_FRAME = AnnotationTypes.TimeFrame.shortname +BOUNDING_BOX = AnnotationTypes.BoundingBox.shortname +ALIGNMENT = AnnotationTypes.Alignment.shortname + +ANNOTATION = 'Annotation' +TOKEN = 'Token' +SENTENCE = 'Sentence' +PARAGRAPH = 'Paragraph' +NAMED_ENTITY = 'NamedEntity' +NOUN_CHUNK = 'NounChunk' +VERB_CHUNK = 'VerbChunk' + +TIME_BASED_INTERVALS = {TIME_FRAME} +SPAN_BASED_INTERVALS = {TOKEN, SENTENCE, PARAGRAPH, NAMED_ENTITY, NOUN_CHUNK, VERB_CHUNK} diff --git a/mmif/utils/summarizer/graph.py b/mmif/utils/summarizer/graph.py new file mode 100644 index 00000000..ae11b9be --- /dev/null +++ b/mmif/utils/summarizer/graph.py @@ -0,0 +1,596 @@ +import sys, json +from collections import defaultdict +from operator import itemgetter +from pathlib import Path +import argparse + +from mmif import Mmif + +from mmif.utils.summarizer import config +from mmif.utils.summarizer.utils import compose_id, normalize_id +#from summarizer.utils import compose_id, flatten_paths, normalize_id + + + +class Graph(object): + + """Graph implementation for a MMIF document. Each node contains an annotation + or document. Alignments are stored separately. Edges between nodes are created + from the alignments and added to the Node.targets property. The first edge added + to Node.targets is the document that the Node points to (if there is one). + + The goal for the graph is to store all useful annotation and to have simple ways + to trace nodes all the way up to the primary data.""" + + def __init__(self, mmif): + self.mmif = mmif if type(mmif) is Mmif else Mmif(mmif) + self.documents = [] + self.nodes = {} + self.alignments = [] + self._init_nodes() + self._init_edges() + # Third pass to add links between text elements, in particular from + # entities to tokens, adding lists of tokens to entities. + tokens = self.get_nodes(config.TOKEN) + entities = self.get_nodes(config.NAMED_ENTITY) + self.token_idx = TokenIndex(tokens) + #self.token_idx.pp() + for e in entities: + #print('>>>', e, e.anchors) + e.tokens = self.token_idx.get_tokens_for_node(e) + + def _init_nodes(self): + # The top-level documents are added as nodes, but they are also put in + # the documents list. + for doc in self.mmif.documents: + self.add_node(None, doc) + self.documents.append(doc) + # First pass over all annotations and documents in all views and save + # them in the graph. + doc_ids = [d.id for d in self.documents] + for view in self.mmif.views: + for annotation in view.annotations: + normalize_id(doc_ids, view, annotation) + if annotation.at_type.shortname == config.ALIGNMENT: + # alignments are not added as nodes, but we do keep them around + self.alignments.append((view, annotation)) + else: + self.add_node(view, annotation) + + def _init_edges(self): + # Second pass over the alignments so we create edges. + for view, alignment in self.alignments: + self.add_edge(view, alignment) + + def __str__(self): + return "" % len(self.nodes) + + def add_node(self, view, annotation): + """Add an annotation as a node to the graph.""" + node = Nodes.new(self, view, annotation) + self.nodes[node.identifier] = node + + def add_edge(self, view, alignment): + source_id = alignment.properties['source'] + target_id = alignment.properties['target'] + #print(alignment.id, source_id, target_id) + source = self.get_node(source_id) + target = self.get_node(target_id) + # make sure the direction goes from token or textdoc to annotation + if target.annotation.at_type.shortname in (config.TOKEN, config.TEXT_DOCUMENT): + source, target = target, source + source.targets.append(target) + source.add_anchors_from_alignment(target) + target.add_anchors_from_alignment(source) + + def get_node(self, node_id): + return self.nodes.get(node_id) + + def get_nodes(self, short_at_type: str, view_id : str = None): + """Get all nodes for an annotation type, using the short form. If a view + identifier is provided then only include nodes from that view.""" + return [node for node in self.nodes.values() + if (node.at_type.shortname == short_at_type + and (view_id is None or node.view.id == view_id))] + + def statistics(self): + stats = defaultdict(int) + for node in self.nodes.values(): + stats[f'{str(node.view_id):4} {node.at_type.shortname}'] += 1 + return stats + + def trim(self, start: int, end: int): + """Trim the graph and keep only those nodes that are included in the graph + between two timepoints (both in milliseconds). This assumes that all nodes + are anchored on the time in the audio or video stream. At the moment it + keeps all nodes that are not explicitly anchored.""" + remove = set() + for node_id, node in self.nodes.items(): + if 'time-point' in node.anchors: + if not start <= node.anchors['time-point'] <= end: + remove.add(node_id) + if 'time-offsets' in node.anchors: + p1, p2 = node.anchors['time-offsets'] + if not (start <= p1 <= end and start <= p2 <= end): + remove.add(node_id) + new_nodes = [n for n in self.nodes.values() if not n.identifier in remove] + self.nodes = { node.identifier: node for node in new_nodes } + + def pp(self, fname=None,skip_timepoints=False): + fh = sys.stdout if fname is None else open(fname, 'w') + fh.write("%s\n" % self) + for view in self.mmif.views: + fh.write(" \n" % (view.id, str(view.metadata['app']))) + for node_id, node in self.nodes.items(): + if node.at_type.shortname == 'TimePoint': + continue + fh.write(" %-40s" % node) + targets = [str(t) for t in node.targets] + fh.write(' --> [%s]\n' % ' '.join(targets)) + + def pp_statistics(self): + stats = self.statistics() + for at_type in sorted(stats): + print(f'{at_type:20} {stats[at_type]:>5}') + + +class TokenIndex(object): + + """ + The tokens are indexed on the identifier on the TextDocument that they occur + in and for each text document we have a list of pairs + + {'v_4:td1': [ + ((0, 5), ), + ((5, 6), ), + ... + } + """ + + # TODO: + # - Benchmark get_tokens_for_node(). I may want to use something like this + # to determine enclosed nodes and enclosing nodes and that may blow up since + # that would be O(n^2). If it does matter, probably start using binary search + # or add an index from character offset to nodes. + # - It is also not sure whether we still need this since the new spaCy gives + # targets to tokens. + + def __init__(self, tokens): + self.tokens = {} + self.token_count = len(tokens) + for t in tokens: + tup = ((t.properties['start'], t.properties['end']), t) + self.tokens.setdefault(t.document.identifier, []).append(tup) + # Make sure the tokens for each document are ordered. + for document, token_list in self.tokens.items(): + self.tokens[document] = sorted(token_list, key=itemgetter(0)) + # In some cases there are two tokens with identical offset (for example + # with tokenization from both Kaldi and spaCy, not sure what to do with + # these, but should probably be more careful on what views to access + + def __len__(self): + return self.token_count + + def __str__(self): + return f'' + + def get_tokens_for_node(self, node): + """Return all tokens included in the span of a node.""" + doc = node.document.identifier + try: + start = node.properties['start'] + end = node.properties['end'] + except KeyError: + start, end = node.anchors['text-offsets'] + tokens = [] + for (t_start, t_end), token in self.tokens.get(doc, []): + if t_start >= start and t_end <= end: + tokens.append(token) + return tokens + + def pp(self, fname=None): + fh = sys.stdout if fname is None else open(fname, 'w') + for document in self.tokens: + fh.write("\n[%s] -->\n" % document) + for t in self.tokens[document]: + fh.write(' %s %s\n' % (t[0], t[1])) + + +class Node(object): + + def __init__(self, graph, view, annotation): + self.graph = graph + self.view = view + self.view_id = None if self.view is None else self.view.id + self.annotation = annotation + # copy some information from the Annotation + self.at_type = annotation.at_type + self.identifier = annotation.id + self.properties = json.loads(str(annotation.properties)) + # get the document from the view or the properties + self.document = self._get_document() + # The targets property contains a list of annotations or documents that + # the node content points to. This includes the document the annotation + # points to as well as the alignment from a token or text document to a + # bounding box or time frame (which is added later). + # TODO: the above does not seem to be true since there is no evidence of + # data from alignments being added. + self.targets = [] if self.document is None else [self.document] + self.anchors = {} + self.add_local_anchors() + self.add_anchors_from_targets() + + def __str__(self): + anchor = '' + if self.at_type.shortname == config.TOKEN: + anchor = " %s:%s '%s'" % (self.properties['start'], + self.properties['end'], + self.properties.get('text','').replace('\n', '\\n')) + return "<%s %s%s>" % (self.at_type.shortname, self.identifier, anchor) + + def add_local_anchors(self): + """Get the anchors that you can get from the annotation itself, which + includes the start and end offsets, the coordinates, the timePoint of + a BoundingBox and any annotation with targets.""" + props = self.properties + attype = self.annotation.at_type.shortname + if 'start' in props and 'end' in props: + # TimeFrame is the only non-character based interval so this simple + # if-then-else should work + if attype == config.TIME_FRAME: + self.anchors['text-offsets'] = (props['start'], props['end']) + else: + self.anchors['time-offsets'] = (props['start'], props['end']) + if 'coordinates' in props: + self.anchors['coordinates'] = props['coordinates'] + if 'timePoint' in props: + self.anchors['time-point'] = props['timePoint'] + if 'targets' in props: + self.anchors['targets'] = props['targets'] + + def add_anchors_from_targets(self): + """Get start and end offsets or timePoints from the targets and add them to + the anchors, but only if there were no anchors on the node already. This has + two cases: one for TimeFrames and one for text intervals.""" + props = self.properties + attype = self.annotation.at_type.shortname + if 'targets' in props: + try: + t1 = self.graph.nodes[props['targets'][0]] + t2 = self.graph.nodes[props['targets'][-1]] + if attype == config.TIME_FRAME: + if not 'time-offsets' in props: + self.anchors['time-offsets'] = ( + t1.properties['timePoint'], t2.properties['timePoint']) + else: + if not 'text-offsets' in props: + self.anchors['text-offsets'] = ( + t1.properties['start'], t2.properties['end']) + except IndexError: + print(f'WARNING: Unexpected empty target list for {self.identifier}') + + def add_anchors_from_alignment(self, target: None, debug=False): + source_attype = self.at_type.shortname + target_attype = target.at_type.shortname + if debug: + print('\n@ DEBUG SOURCE->TARGET ', source_attype, target_attype) + print('@ DEBUG SOURCE.PROPS ', list(self.properties.keys())) + print('@ DEBUG TARGET.PROPS ', list(target.properties.keys())) + print('@ DEBUG TARGET.ANCHORS ', target.anchors) + # If a TextDocument is aligned to a BoundingBox then we grab the coordinates + # TODO: how are we getting the time point? + if source_attype == 'TextDocument' and target_attype == 'BoundingBox': + if 'coordinates' in target.properties: + self.anchors['coordinates'] = target.properties['coordinates'] + #print(source_attype, self.anchors) + elif source_attype == 'BoundingBox' and target_attype == 'TextDocument': + pass + # If a TextDocument is aligned to a TimeFrame then we copy time anchors + # but also targets and representatives, the latter because some alignments + # are not precise + elif source_attype == 'TextDocument' and target_attype == 'TimeFrame': + if 'start' in target.properties and 'end' in target.properties: + self.anchors['time-offsets'] = (target.properties['start'], + target.properties['end']) + if 'time-offsets' in target.anchors: + # TODO: is this ever used? + self.anchors['time-offsets'] = target.anchors['time-offsets'] + if 'targets' in target.properties: + self.anchors['targets'] = target.properties['targets'] + if 'representatives' in target.properties: + self.anchors['representatives'] = target.properties['representatives'] + #print('-', source_attype, self.anchors, self, target) + elif source_attype == 'TimeFrame' and target_attype == 'TextDocument': + pass + # Simply copy the time point + elif source_attype == 'TextDocument' and target_attype == 'TimePoint': + self.anchors['time-point'] = target.anchors['time-point'] + if debug: + print('+ ADDED SOURCE.ANCHORS ', self.anchors) + # For Token-TimeFrame alignments all we need are the start and end time points + elif source_attype == 'Token' and target_attype == 'TimeFrame': + if 'start' in target.properties and 'end' in target.properties: + self.anchors['time-offsets'] = (target.properties['start'], + target.properties['end']) + #print(source_attype, self.anchors) + elif source_attype == 'TimeFrame' and target_attype == 'Token': + pass + # TODO: check whether some action is needed for the next options + elif source_attype == 'TextDocument' and target_attype == 'VideoDocument': + pass + elif source_attype == 'VideoDocument' and target_attype == 'TextDocument': + pass + elif source_attype == 'BoundingBox' and target_attype == 'TimePoint': + pass + elif source_attype =='TimePoint' and target_attype == 'BoundingBox': + pass + elif source_attype == 'BoundingBox' and target_attype in ('Token', 'Sentence', 'Paragraph'): + pass + elif source_attype in ('Token', 'Sentence', 'Paragraph') and target_attype == 'BoundingBox': + pass + elif source_attype == 'TextDocument' and target_attype == 'TimePoint': + pass + elif source_attype == 'TimePoint' and target_attype == 'TextDocument': + pass + else: + print('-', source_attype, target_attype) + #if debug: + # print('DEBUG', self.anchors) + + def _get_document(self): + """Return the document or annotation node that the annotation/document in + the node refers to via the document property. This could be a local property + or a metadata property if there is no such local property. Return None + if neither of those exist.""" + # try the local property + docid = self.properties.get('document') + if docid is not None: + # print('>>>', docid, self.graph.get_node(docid)) + return self.graph.get_node(docid) + # try the metadata property + if self.view is not None: + try: + metadata = self.view.metadata.contains[self.at_type] + docid = metadata['document'] + return self.graph.get_node(docid) + except KeyError: + return None + return None + + def XXX_get_document_plus_span(self): + self.pp() + props = self.properties + return "%s:%s:%s" % (self.document.identifier, + props['start'], props['end']) + + def XXXpaths_to_docs(self): + """Return all the paths from the node to documents.""" + paths = self._paths_to_docs() + return flatten_paths(paths) + + def XXX_paths_to_docs(self): + paths = [] + if not self.targets: + return [[self]] + for t in self.targets: + paths.append([self]) + for i, target in enumerate(self.targets): + paths[i].extend(target._paths_to_docs()) + return paths + + def summary(self): + """The default summary is just the identfier, this should typically be + overriden by sub classes.""" + return { 'id': self.identifier } + + def pp(self, close=True): + print('-' * 80) + print(self) + print(f' document = {self.document}') + for prop in self.properties: + print(f' {prop} = {self.properties[prop]}') + print(' targets = ') + for target in self.targets: + print(' ', target) + print(' anchors = ') + for anchor in self.anchors: + print(f' {anchor} -> {self.anchors[anchor]}') + if close: + print('-' * 80) + + +class TimeFrameNode(Node): + + def __str__(self): + frame_type = ' ' + self.frame_type() if self.has_label() else '' + return ('' + % (self.identifier, self.start(), self.end(), frame_type)) + + def start(self): + return self.properties.get('start', -1) + + def end(self): + return self.properties.get('end', -1) + + def frame_type(self): + # TODO: rename this, uses old property since replaced by "label"" + # NOTE: this is still aloowing for the old property though + return self.properties.get('label') or self.properties.get('frameType') + + def has_label(self): + return self.frame_type() is not None + + def representatives(self) -> list: + """Return a list of the representative TimePoints.""" + # TODO: why could I not get this from the anchors? + rep_ids = self.properties.get('representatives', []) + reps = [self.graph.get_node(rep_id) for rep_id in rep_ids] + return reps + + def summary(self): + """The summary of a time frame just contains the identifier, start, end + and frame type.""" + return { 'id': self.identifier, + 'start': self.properties['start'], + 'end': self.properties['end'], + 'frameType': self.properties.get('frameType') } + + +class EntityNode(Node): + + def __init__(self, graph, view, annotation): + super().__init__(graph, view, annotation) + self.tokens = [] + self._paths = None + self._anchor = None + + def __str__(self): + try: + start = self.properties['start'] + end = self.properties['end'] + except KeyError: + start, end = self.anchors['text-offsets'] + return ("" + % (self.identifier, start, end, self.properties['text'])) + + def start_in_video(self): + #print('+++', self.document.properties) + try: + return self.document.anchors['time-point'] + except KeyError: + return -1 + #return self.anchor()['video-start'] + + def end_in_video(self): + return self.anchor().get('video-end') + + def pp(self): + super().pp(close=False) + try: + for i, p in enumerate(self.paths_to_docs()): + print(' %s' % ' '.join([str(n) for n in p[1:]])) + except ValueError: + print(' WARNING: error in path_to_docs in NamedEntityNode.pp()') + print('-' * 80) + + def summary(self): + """The summary for entities needs to include where in the video or image + the entity occurs, it is not enough to just give the text document.""" + # TODO: in the old days this used an anchor() method which was fragile + # TODO: revamping it now + + #anchor = self.anchor() + #self.document.pp() +# print('...', self.document.anchors + return { + 'id': self.identifier, + 'group': self.properties['group'], + 'cat': self.properties['category'], + 'document': self.document.identifier, + # Entities in a TextDocument that is a full transcript without any + # alignments do not have a TimePoint + #'time-point': self.document.anchors.get('time-point'), + #'text-offsets': self.anchors.get('text-offsets'), + 'time-point': self.document.anchors.get('time-point', -1), + 'text-offsets': self.anchors.get('text-offsets', (-1 ,-1)), + #'document': self._get_document_plus_span(), + #'video-start': anchor.get('video-start'), + #'video-end': anchor.get('video-end'), + #'coordinates': self._coordinates_as_string(anchor) + } + + def anchor(self): + """The anchor is the position in the video that the entity is linked to. + This anchor cannot be found in the document property because that points + to a text document that was somehow derived from the video document. Some + graph traversal is needed to get the anchor, but we know that the anchor + is always a time frame or a bounding box. + """ + # TODO: deal with the case where the primary document is not a video + self.paths = self.paths_to_docs() + bbtf = self.find_boundingbox_or_timeframe() + # for path in paths: + # print('... [') + # for n in path: print(' ', n) + # print('===', bbtf) + if bbtf.at_type.shortname == config.BOUNDING_BOX: + return {'video-start': bbtf.properties['timePoint'], + 'coordinates': bbtf.properties['coordinates']} + elif bbtf.at_type.shortname == config.TIME_FRAME: + return {'video-start': bbtf.properties['start'], + 'video-end': bbtf.properties['end']} + + def anchor2(self): + """The anchor is the position in the video that the entity is linked to. + This anchor cannot be found in the document property because that points + to a text document that was somehow derived from the video document. Some + graph traversal is needed to get the anchor, but we know that the anchor + is always a time frame or a bounding box. + """ + # TODO: with this version you get an error that the paths variable does + # not exist yet, must get a clearer picture on how to build a graph + # where nodes have paths to anchors + # TODO: deal with the case where the primary document is not a video + if self._anchor is None: + self._paths = self.paths_to_docs() + bbtf = self.find_boundingbox_or_timeframe() + # for path in self._paths: + # print('... [') + # for n in path: print(' ', n) + # print('===', bbtf) + if bbtf.at_type.shortname == config.BOUNDING_BOX: + self._anchor = {'video-start': bbtf.properties['timePoint'], + 'coordinates': bbtf.properties['coordinates']} + elif bbtf.at_type.shortname == config.TIME_FRAME: + self._anchor = {'video-start': bbtf.properties['start'], + 'video-end': bbtf.properties['end']} + return self._anchor + + def find_boundingbox_or_timeframe(self): + return self.paths[-1][-2] + + @staticmethod + def _coordinates_as_string(anchor): + if 'coordinates' not in anchor: + return None + return ','.join(["%s:%s" % (pair[0], pair[1]) + for pair in anchor['coordinates']]) + + +class Nodes(object): + + """Factory class for Node creation. Use Node for creation unless a special + class was registered for the kind of annotation we have.""" + + node_classes = { config.NAMED_ENTITY: EntityNode, + config.TIME_FRAME: TimeFrameNode } + + @classmethod + def new(cls, graph, view, annotation): + node_class = cls.node_classes.get(annotation.at_type.shortname, Node) + return node_class(graph, view, annotation) + + + +if __name__ == '__main__': + + graph = Graph(open(sys.argv[1]).read()) + print(graph) + #graph.pp() + #graph.nodes['v_7:st12'].pp() + #graph.nodes['v_2:s1'].pp() + #graph.nodes['v_4:tf1'].pp() + exit() + for node in graph.nodes.values(): + print(node.at_type.shortname, node.identifier, node.anchors) + + +''' + +Printing some graphs: + +uv run graph.py -i examples/input-v9.mmif -e dot -f png -o examples/dot-v9-1-full -p -a -v +uv run graph.py -i examples/input-v9.mmif -e dot -f png -o examples/dot-v9-2-no-view-links -p -a +uv run graph.py -i examples/input-v9.mmif -e dot -f png -o examples/dot-v9-3-no-anchor-to-doc -p + +''' diff --git a/mmif/utils/summarizer/summary.py b/mmif/utils/summarizer/summary.py new file mode 100644 index 00000000..b340f35b --- /dev/null +++ b/mmif/utils/summarizer/summary.py @@ -0,0 +1,731 @@ +"""MMIF Summarizer + +MMIF consumer that creates a JSON summary from a MMIF file. + +Makes some simplifying assumptions, including: + +- There is one video in the MMIF documents list. All start and end properties + are pointing to that video. +- The time unit is assumed to be milliseconds. + +Other assumptions are listed with the options below. + + +USAGE: + + $ python summary.py [OPTIONS] + + Reads the MMIF file and creates a JSON summary file with the document list + and any requested extra information. + +Example: + + $ python summary -i input.mmif -o output.json --transcript + + Reads input.mmif and creates output.json with just transcript + information added to the documents list and the views. + +In all cases, the summarizer will summarize what is there and use the information +that is there, if the output of CLAMS is bad, then the results of the summarizer +will be bad (although it may hide a lot of the badness). In some rare cases some +information is added. For example if the ASR tool does not group tokens then the +summarizer will do that, but then only by simply grouping in equal chunks and not +trying to infer sentence-like groupings. + +The summary always includes the MMIF version, the list of documents and a summary +of the metadata of all views (identifier, CLAMS app, timestamp, total number of +annotations and number of annotations per type, it does not show parameters and +application configuration). + + +OPTIONS: + +-i INFILE -o OUTFILE + +Run the summarizer over a single MMIF file and write the JSON summary to OUTFILE. + +-- timeframes + +Shows basic information of all timeframes. This groups the timeframes according to +the apps it was found in. + +--transcript + +Shows the text from the transcript in pseudo sentences. + +The transcript is taken from the last non-warning ASR view, so only the last added +transcript will be summarized. It is assumed that Tokens in the view are ordered on +text occurrence. + +--captions + +Shows captions from the Llava captioner app. + +--entities + +Include entities from spaCy or other NER. + +--full + +Include all the above. + +""" + +# TODO: +# - For the time unit we should really update get_start(), get_end() and other methods. + + +import os, sys, io, json, argparse, pathlib +from collections import defaultdict + +from mmif.serialize import Mmif +from mmif.vocabulary import DocumentTypes + +from mmif.utils.summarizer.utils import CharacterList +from mmif.utils.summarizer.utils import get_aligned_tokens, timestamp +from mmif.utils.summarizer.utils import get_transcript_view, get_last_segmenter_view, get_captions_view +from mmif.utils.summarizer.graph import Graph +from mmif.utils.summarizer import config + + +VERSION = '0.2.0' + + +DEBUG = False + +def debug(*texts): + if DEBUG: + for text in texts: + sys.stderr.write(f'{text}\n') + + +class SummaryException(Exception): + pass + + +class Summary(object): + + """Implements the summary of a MMIF file. + + fname - name of the input mmif file + mmif - instance of mmif.serialize.Mmif + graph - instance of graph.Graph + documents - instance of Documents + views - instance of Views + transcript - instance of Transcript + timeframes - instance of TimeFrames + entities - instance of Entities + captions - instance of get_captions_view + + """ + + def __init__(self, mmif_file): + self.fname = mmif_file + #self.mmif = mmif if type(mmif) is Mmif else Mmif(mmif) + self.mmif = Mmif(pathlib.Path(mmif_file).read_text()) + self.warnings = [] + self.graph = Graph(self.mmif) + self.mmif_version = self.mmif.metadata['mmif'] + self.documents = Documents(self) + self.annotations = Annotations(self) + self.document = Document(self) + self.views = Views(self) + self.timeframes = TimeFrames(self) + self.timeframe_stats = TimeFrameStats(self) + self.transcript = Transcript(self) + self.captions = Captions(self) + self.entities = Entities(self) + self.validate() + self.print_warnings() + + def add_warning(self, warning: str): + self.warnings.append(warning) + + def validate(self): + """Minimal validation of the input. Mostly a place holder because all it + does now is to check how many video documents there are.""" + if len(self.video_documents()) > 1: + raise SummaryException("More than one video document in MMIF file") + + def video_documents(self): + return self.mmif.get_documents_by_type(DocumentTypes.VideoDocument) + + def report(self, outfile=None, html=None, full=False, timeframes=False, + transcript=False, captions=False, entities=False): + json_obj = { + 'mmif_version': self.mmif.metadata.mmif, + 'document': self.document.data, + 'documents': self.documents.data, + 'annotations': self.annotations.data, + 'views': self.views.data} + if transcript or full: + json_obj['transcript'] = self.transcript.data + if captions or full: + json_obj['captions'] = self.captions.as_json() + if timeframes or full: + json_obj['timeframes'] = self.timeframes.as_json() + json_obj['timeframe_stats'] = self.timeframe_stats.data + if entities or full: + json_obj['entities'] = self.entities.as_json() + report = json.dumps(json_obj, indent=2) + if outfile is None: + return report + else: + with open(outfile, 'w') as fh: + fh.write(report) + + def print_warnings(self): + for warning in self.warnings: + print(f'WARNING: {warning}') + + def pp(self): + self.documents.pp() + self.views.pp() + self.transcript.pp() + self.timeframes.pp() + self.entities.pp() + print() + + +class Documents(object): + + """Contains a list of document summaries, which are dictionaries with just + the id, type and location properties.""" + + def __init__(self, summary: Summary): + self.data = [self.summary(doc) for doc in summary.graph.documents] + + def __len__(self): + return len(self.data) + + @staticmethod + def summary(doc): + return { 'id': doc.id, + 'type': doc.at_type.shortname, + 'location': doc.location } + + def pp(self): + print('\nDocuments -> ') + for d in self.data: + print(' %s %s' % (d['type'], d['location'])) + + +class Annotations(object): + + """Contains a dictionary of Annotation object summaries, indexed on view + identifiers.""" + + def __init__(self, summary): + self.data = defaultdict(list) + # summary.graph.get_nodes(config.ANNOTATION, view_id=view.id) + for anno in summary.graph.get_nodes(config.ANNOTATION): + self.data[anno.view.id].append(anno.properties) + + def get(self, item): + return self.data.get(item, []) + + def get_all_annotations(self): + annotations = [] + for annos in self.data.values(): + annotations.extend(annos) + return annotations + + +class Document(object): + + """Collects some document-level information, including MMIF version, size of + the MMIF file and some information from the SWT document annotation.""" + + def __init__(self, summary): + self.data = { + 'mmif_version': summary.mmif_version, + 'size': os.path.getsize(summary.fname) } + annotations = summary.annotations.get_all_annotations() + if annotations: + # TODO: this if fragile because it assumes that the annotation we want + # (which is the one from SWT) is always the first + doc_level_annotation = annotations[0] + if 'fps' in doc_level_annotation: + self.data['fps'] = doc_level_annotation['fps'] + if 'frameCount' in doc_level_annotation: + self.data['frames'] = doc_level_annotation['frameCount'] + if 'duration' in doc_level_annotation: + duration = doc_level_annotation['duration'] + # both in milliseconds and as a timestamp + self.data['duration_ms'] = duration + self.data['duration_ts'] = timestamp(duration) + + +class Views(object): + + """Contains a list of view summaries, which are dictionaries with just + the id, app and timestamp properties.""" + + def __init__(self, summary): + self.summary = summary + self.data = [self.get_view_summary(view) for view in summary.mmif.views] + + def __getitem__(self, i): + return self.data[i] + + def __len__(self): + return len(self.data) + + #@staticmethod + def get_view_summary(self, view): + annotation_types = defaultdict(int) + for annotation in view.annotations: + annotation_types[annotation.at_type.shortname] += 1 + basic_info = { + 'id': view.id, + 'app': view.metadata.app, + 'timestamp': view.metadata.timestamp, + 'contains': [str(k) for k in view.metadata.contains.keys()], + 'annotation_count': len(view.annotations), + 'annotation_types': dict(annotation_types), + 'parameters': view.metadata.parameters, + 'appConfiguration': view.metadata.appConfiguration } + if view.metadata.warnings: + basic_info['warnings'] = view.metadata.warnings + if view.metadata.error: + basic_info['error'] = view.metadata.error + return basic_info + + def pp(self): + print('\nViews -> ') + for v in self.data: + print(' %s' % v['app']) + + +class Transcript(object): + + """The transcript contains the string value from the first text document in the + last ASR view. It issues a warning if there is more than one text document in + the view.""" + + def __init__(self, summary): + self.summary = summary + self.data = [] + view = get_transcript_view(summary.mmif.views) + if view is not None: + documents = view.get_documents() + if len(documents) > 1: + summary.add_warning(f'More than one TextDocument in ASR view {view.id}') + t_nodes = summary.graph.get_nodes(config.TOKEN, view_id=view.id) + s_nodes = summary.graph.get_nodes(config.SENTENCE, view_id=view.id) + if not t_nodes: + return + if s_nodes: + # Whisper has Sentence nodes + sentences = self.collect_targets(s_nodes) + sentence_ids = [n.identifier for n in s_nodes] + else: + # But Kaldi does not + sentences = self.create_sentences(t_nodes) + sentence_ids = [None] * len(sentences) + # initialize the transcripts with all blanks, most blanks will be + # overwrite with characters from the tokens + transcript = CharacterList(self.transcript_size(sentences)) + for s_id, s in zip(sentence_ids, sentences): + transcript_element = TranscriptElement(s_id, s, transcript) + self.data.append(transcript_element.as_json()) + + def __str__(self): + return str(self.data) + + @staticmethod + def transcript_size(sentences): + try: + return sentences[-1][-1].properties['end'] + except IndexError: + return 0 + + def collect_targets(self, s_nodes): + """For each node (in this context a sentence node), collect all target nodes + (which are tokens) and return them as a list of lists, with one list for each + node.""" + targets = [] + for node in s_nodes: + node_target_ids = node.properties['targets'] + node_targets = [self.summary.graph.get_node(stid) for stid in node_target_ids] + targets.append(node_targets) + return targets + + def create_sentences(self, t_nodes, sentence_size=12): + """If there is no sentence structure then we create it just by chopping th + input into slices of some pre-determined length.""" + # TODO: perhaps the size paramater should be set in the config file or via a + # command line option. + return [t_nodes[i:i + sentence_size] + for i in range(0, len(t_nodes), sentence_size)] + + +class TranscriptElement: + + """Utility class to handle data associated with an element from a transcript, + which is created from a sentence which is a list of Token Nodes. Initialization + has the side effect of populating the full transcript which is an instance of + CharacterList and which is also accessed here.""" + + def __init__(self, identifier: str, sentence: list, transcript: CharacterList): + for t in sentence: + # this adds the current token to the transcript + start = t.properties['start'] + end = t.properties['end'] + word = t.properties['word'] + transcript.set_chars(word, start, end) + self.id = identifier + self.start = sentence[0].anchors['time-offsets'][0] + self.end = sentence[-1].anchors['time-offsets'][1] + self.start_offset = sentence[0].properties['start'] + self.end_offset = sentence[-1].properties['end'] + self.text = transcript.getvalue(self.start_offset, self.end_offset) + + def __str__(self): + text = self.text if len(self.text) <= 50 else self.text[:50] + '...' + return f'' + + def as_json(self): + json_obj = { + "start-time": self.start, + "end-time": self.end, + "text": self.text } + if self.id is not None: + json_obj["id"] = self.id + return json_obj + + +class Nodes(object): + + """Abstract class to store instances of subclasses of graph.Node. The + initialization methods of subclasses of Nodes can guard what nodes will + be allowed in, for example, as of July 2022 the TimeFrames class only + allowed time frames that had a frame type (thereby blocking the many + timeframes from Kaldi). + + Instance variables: + + summary - an instance of Summary + graph - an instance of graph.Graph, taken from the summary + nodes - list of instances of subclasses of graph.Node + + """ + + def __init__(self, summary): + self.summary = summary + self.graph = summary.graph + self.nodes = [] + + def __getitem__(self, i): + return self.nodes[i] + + def __len__(self): + return len(self.nodes) + + def add(self, node): + self.nodes.append(node) + + def get_nodes(self, **props): + """Return all the nodes that match the given properties.""" + def prop_check(p, v, props_given): + return v == props_given.get(p) if p in props_given else False + return [n for n in self + if all([prop_check(p, v, n.annotation.properties) + for p, v in props.items()])] + + +class TimeFrames(Nodes): + + """For now, we take only the TimeFrames that have a frame type, which rules out + all the frames we got from Kaldi.""" + + def __init__(self, summary): + super().__init__(summary) + # a dictionary mapping app names to lists of timeframe summaries + self.data = defaultdict(list) + for tf_node in self.graph.get_nodes(config.TIME_FRAME): + if tf_node.has_label(): + self.add(tf_node) + self._collect_timeframe_summaries() + self._sort_timeframe_summaries() + + def _collect_timeframe_summaries(self): + for tf in self.nodes: + label = tf.frame_type() + try: + start, end = tf.anchors['time-offsets'] + except KeyError: + # TODO: + # - this defies the notion of using the anchors for this, but + # maybe in this case we should go straight to the start/end + # - this code below also raises an error if there are no start + # and end properties + start = tf.properties['start'] + end = tf.properties['end'] + representatives = tf.representatives() + rep_tps = [rep.properties['timePoint'] for rep in representatives] + score = tf.properties.get('classification', {}).get(label) + app = tf.view.metadata.app + self.data[app].append( + { 'identifier': tf.identifier, 'label': label, 'score': score, + 'start-time': start, 'end-time': end, 'representatives': rep_tps }) + + def _sort_timeframe_summaries(self): + """Sort the data on their start time, do this for all apps.""" + for app in self.data: + sort_function = lambda x: x['start-time'] + self.data[app] = list(sorted(self.data[app], key=sort_function)) + + def as_json(self): + return self.data + + def pp(self): + print('\nTimeframes -> ') + for tf in self.nodes: + summary = tf.summary() + print(' %s:%s %s' % (summary['start'], summary['end'], + summary['frameType'])) + + +class TimeFrameStats(object): + + def __init__(self, summary): + # a dictionary mapping app names to frameType->duration dictionaries, + # where the duration is cumulative over all instances + self.timeframes = summary.timeframes + self.data = {} + self._collect_durations() + self._collect_other_morsels() + + def _collect_durations(self): + timeframes = self.timeframes.data + for app in timeframes: + self.data[app] = {} + for tf in timeframes[app]: + label = tf.get('label') + if label not in self.data[app]: + self.data[app][label] = {'count': 0, 'duration': 0} + self.data[app][label]['count'] += 1 + duration = tf['end-time'] - tf['start-time'] + if label is not None: + # TODO: these gave weird values for duration + #print('---',app, label, duration) + self.data[app][label]['duration'] += duration + duration = self.data[app][label]['duration'] + count = self.data[app][label]['count'] + self.data[app][label]['average'] = duration // count + + def _collect_other_morsels(self): + # First we want everything grouped by app and label + timeframes = self.timeframes.data + grouped_timeframes = defaultdict(lambda: defaultdict(list)) + for app in timeframes: + for tf in timeframes[app]: + label = tf.get('label') + grouped_timeframes[app][label].append(tf) + # The we pick the morsels for each label + for app in grouped_timeframes: + for label in grouped_timeframes[app]: + tfs = grouped_timeframes[app][label] + sort_on_start = lambda tf: tf['start-time'] + sort_on_length = lambda tf: tf['end-time'] - tf['start-time'] + first_tf = list(sorted(tfs, key=sort_on_start))[0] + longest_tf = list(sorted(tfs, key=sort_on_length, reverse=True))[0] + self.data[app][label]['first'] = first_tf['start-time'] + self.data[app][label]['longest'] = longest_tf['start-time'] + + +class Entities(Nodes): + + """Collecting instances of graph.EntityNode. + + nodes_idx - lists of instances of graph.EntityNode, indexed on entity text + { entity-string ==> list of graph.EntityNode } + bins - an instance of Bins + + """ + + def __init__(self, summary): + super().__init__(summary) + self.nodes_idx = {} + self.bins = None + for ent in self.graph.get_nodes(config.NAMED_ENTITY): + self.add(ent) + self._create_node_index() + self._group() + + def __str__(self): + return f'' + + def _create_node_index(self): + """Put all the entities from self.nodes in self.node_idx. This first puts + the nodes into the dictionary indexed on text string and then sorts the + list of nodes for each string on video position.""" + for ent in self: + self.nodes_idx.setdefault(ent.properties['text'], []).append(ent) + for text, entities in self.nodes_idx.items(): + self.nodes_idx[text] = sorted(entities, + key=(lambda e: e.start_in_video())) + + def _group(self): + """Groups all the nodes on the text and sorts them on position in the video, + for the latter it will also create bins of entities that occur close to each + other in the text.""" + # create the bins, governed by the summary's granularity + self.bins = Bins(self.summary) + for text, entities in self.nodes_idx.items(): + self.bins.current_bin = None + for entity in entities: + self.bins.add_entity(text, entity) + self.bins.mark_entities() + + def _add_tags(self, tags): + for tag in tags: + tag_doc = tag.properties['document'] + tag_p1 = tag.properties['start'] + tag_p2 = tag.properties['end'] + entities = self.nodes_idx.get(tag.properties['text'], []) + for entity in entities: + props = entity.properties + doc = props['document'] + p1 = props['start'] + p2 = props['end'] + if tag_doc == doc and tag_p1 == p1 and tag_p2 == p2: + entity.properties['tag'] = tag.properties['tagName'] + + def as_json(self): + json_obj = [] + for text in self.nodes_idx: + entity = {"text": text, "instances": []} + json_obj.append(entity) + for e in self.nodes_idx[text]: + entity["instances"].append(e.summary()) # e.summary(), E_PROPS) + return json_obj + + def pp(self): + print('\nEntities -> ') + for e in self.nodes_idx: + print(' %s' % e) + for d in self.nodes_idx[e]: + props = ["%s=%s" % (p, v) for p, v in d.summary().items()] + print(' %s' % ' '.join(props)) + + def print_groups(self): + for key in sorted(self.nodes_idx): + print(key) + for e in self.nodes_idx[key]: + print(' ', e, e.start_in_video()) + + +class Captions(Nodes): + + def __init__(self, summary): + super().__init__(summary) + self.captions = [] + view = get_captions_view(summary.mmif.views) + if view is not None: + for doc in self.graph.get_nodes(config.TEXT_DOCUMENT, view_id=view.id): + text = doc.properties['text']['@value'].split('[/INST]')[-1] + debug( + f'>>> DOC {doc}', + f'>>> PROPS {list(doc.properties.keys())}', + f'>>> TEXT ' + text.replace("\n", "")[:100], + f'>>> ANCHORS {doc.anchors}') + if 'time-offsets' in doc.anchors: + # For older LLava-style captions + # http://apps.clams.ai/llava-captioner/v1.2-6-gc824c97 + p1, p2 = doc.anchors['time-offsets'] + if 'representatives' in doc.anchors: + tp_id = doc.anchors["representatives"][0] + tp = summary.graph.get_node(tp_id) + self.captions.append( + { 'identifier': doc.identifier, + 'time-point': tp.properties['timePoint'], + 'text': text }) + if 'time-point' in doc.anchors: + # For newer SmolVLM-style captions + # http://apps.clams.ai/smolvlm2-captioner + self.captions.append( + { 'identifier': doc.identifier, + 'time-point': doc.anchors['time-point'], + 'text': text }) + + def as_json(self): + return self.captions + #return [(ident, p1, p2, text) for ident, p1, p2, text in self.captions] + + +class Bins(object): + + def __init__(self, summary): + self.summary = summary + self.bins = {} + self.current_bin = None + self.current_text = None + + def __str__(self): + return f'' + + def __len__(self): + return len(self.bins) + + def add_entity(self, text, entity): + """Add an entity instance to the appropriate bin.""" + if self.current_bin is None: + # Add the first instance of a new entity (as defined by the text), + # since it is the first a new bin will be created. + self.current_text = text + self.current_bin = Bin(entity) + self.bins[text] = [self.current_bin] + else: + # For following entities with the same text, a new bin may be + # created depending on the positions and the granularity. + p1 = self.current_bin[-1].start_in_video() + p2 = entity.start_in_video() + # p3 = entity.end_in_video() + if p2 - p1 < config.GRANULARITY: + # TODO: should add p3 here + self.current_bin.add(entity) + else: + self.current_bin = Bin(entity) + self.bins[self.current_text].append(self.current_bin) + + def mark_entities(self): + """Marks all entities with the bin that they occur in. This is done to export + the grouping done with the bins to the entities and this way the bins never need + to be touched again.""" + # TODO: maybe use the bins when we create the output + for entity_bins in self.bins.values(): + for i, e_bin in enumerate(entity_bins): + for entity in e_bin: + entity.properties['group'] = i + + def print_bins(self): + for text in self.bins: + print(text) + text_bins = self.bins[text] + for i, text_bin in enumerate(text_bins): + text_bin.print_nodes(i) + print() + + +class Bin(object): + + def __init__(self, node): + # TODO: we are not using these yet, but a bin should have a begin and + # end in the video which should be derived from the start and end of + # entities in the video. The way we put things in bins now is a bit + # fragile since it depends on the start or end of the last element. + self.start = 0 + self.end = 0 + self.nodes = [node] + + def __getitem__(self, i): + return self.nodes[i] + + def add(self, node): + self.nodes.append(node) + + def print_nodes(self, i): + for node in self.nodes: + print(' ', i, node) diff --git a/mmif/utils/summarizer/utils.py b/mmif/utils/summarizer/utils.py new file mode 100644 index 00000000..5920f8ce --- /dev/null +++ b/mmif/utils/summarizer/utils.py @@ -0,0 +1,301 @@ +"""Utility methods + +""" + +import io +from pathlib import Path +from xml.sax.saxutils import quoteattr, escape +from collections import UserList + +from mmif.utils.summarizer.config import KALDI, WHISPER, CAPTIONER, SEGMENTER +from mmif.utils.summarizer.config import TOKEN, ALIGNMENT, TIME_FRAME + + +def compose_id(view_id, anno_id): + """Composes the view identifier with the annotation identifier.""" + return anno_id if ':' in anno_id else view_id + ':' + anno_id + + +def type_name(annotation): + """Return the short name of the type.""" + return annotation.at_type.split('/')[-1] + + +def get_transcript_view(views): + """Return the last Whisper or Kaldi view that is not a warnings view.""" + # TODO: this now has a simplified idea of how to find a view, should at least + # move towards doing some regular expression matching on the WHISPER config + # setting. The same holds for other functions to get views. + for view in reversed(views): + if view.metadata.app in KALDI + WHISPER: + if view.metadata.warnings: + continue + return view + return None + + +def get_captions_view(views): + """Return the last view created by the captioner.""" + for view in reversed(views): + if view.metadata.app in CAPTIONER: + if view.metadata.warnings: + continue + return view + return None + + +def get_last_segmenter_view(views): + for view in reversed(views): + # print(f'>>> {view.metadata.app}') + if view.metadata.app.startswith(SEGMENTER): + return view + return None + + +def get_aligned_tokens(view): + """Get a list of tokens from an ASR view where for each token we add a timeframe + properties which has the start and end points of the aligned timeframe.""" + idx = AnnotationsIndex(view) + for alignment in idx.get_annotations(ALIGNMENT).values(): + token = idx[TOKEN].get(alignment.properties['target']) + frame = idx[TIME_FRAME].get(alignment.properties['source']) + if token and frame: + # add a timeframe to the token, we can do this now that we do not + # freeze MMIF annotations anymore + token.properties['timeframe'] = (frame.properties['start'], + frame.properties['end']) + return idx.tokens + + +def timestamp(milliseconds: int, format='hh:mm:ss'): + # sometimes the milliseconds are not a usable float + if milliseconds in (None, -1): + return 'nil' + milliseconds = int(milliseconds) + seconds = milliseconds // 1000 + minutes = seconds // 60 + hours = minutes // 60 + ms = milliseconds % 1000 + s = seconds % 60 + m = minutes % 60 + if format == 'hh:mm:ss:mmm': + return f'{hours}:{m:02d}:{s:02d}.{ms:03d}' + elif format == 'hh:mm:ss': + return f'{hours}:{m:02d}:{s:02d}' + elif format == 'mm:ss': + return f'{m:02d}:{s:02d}' + elif format == 'mm:ss:mmm': + return f'{m:02d}:{s:02d}.{ms:03d}' + else: + return f'{hours}:{m:02d}:{s:02d}.{ms:03d}' + + + +class AnnotationsIndex: + + """Creates an index on the annotations list for a view, where each annotation type + is indexed on its identifier. Tokens are special and get their own list.""" + + def __init__(self, view): + self.view = view + self.idx = {} + self.tokens = [] + for annotation in view.annotations: + shortname = annotation.at_type.shortname + if shortname == TOKEN: + self.tokens.append(annotation) + self.idx.setdefault(annotation.at_type.shortname, {}) + self.idx[shortname][annotation.properties.id] = annotation + + def __str__(self): + return f'' + + def __getitem__(self, item): + return self.idx[item] + + def get_annotations(self, at_type): + return self.idx.get(at_type, {}) + + +class CharacterList(UserList): + + """Auxiliary datastructure to help print a list of tokens. It allows you to + back-engineer a sentence from the text and character offsets of the tokens.""" + + def __init__(self, n: int, char=' '): + self.size = n + self.char = char + self.data = n * [char] + + def __str__(self): + return f'' + + def __len__(self): + return self.size + + def __setitem__(self, key, value): + try: + self.data[key] = value + except IndexError: + for i in range(len(self), key + 1): + self.data.append(self.char) + self.data[key] = value + + def set_chars(self, text: str, start: int, end: int): + self.data[start:end] = text + + def getvalue(self, start: int, end: int): + return ''.join(self.data[start:end]) + + +def xml_tag(tag, subtag, objs, props, indent=' ') -> str: + """Return an XML string for a list of instances of subtag, grouped under tag.""" + s = io.StringIO() + s.write(f'{indent}<{tag}>\n') + for obj in objs: + s.write(xml_empty_tag(subtag, indent + ' ', obj, props)) + s.write(f'{indent}\n') + return s.getvalue() + + +def xml_empty_tag(tag_name: str, indent: str, obj: dict, props: tuple) -> str: + """Return an XML tag to an instance of io.StringIO(). Only properties from obj + that are in the props tuple are printed.""" + pairs = [] + for prop in props: + if prop in obj: + if obj[prop] is not None: + #pairs.append("%s=%s" % (prop, xml_attribute(obj[prop]))) + pairs.append(f'{prop}={xml_attribute(obj[prop])}') + attrs = ' '.join(pairs) + return f'{indent}<{tag_name} {attrs}/>\n' + + +def write_tag(s, tagname: str, indent: str, obj: dict, props: tuple): + """Write an XML tag to an instance of io.StringIO(). Only properties from obj + that are in the props tuple are printed.""" + pairs = [] + for prop in props: + if prop in obj: + if obj[prop] is not None: + pairs.append("%s=%s" % (prop, xml_attribute(obj[prop]))) + s.write('%s<%s %s/>\n' + % (indent, tagname, ' '.join(pairs))) + + +def xml_attribute(attr): + """Return attr as an XML attribute.""" + return quoteattr(str(attr)) + + +def xml_data(text): + """Return text as XML data.""" + return escape(str(text)) + + +def XXXflatten_paths(paths): + """Take paths implemented as singly linked lists and return regular lists.""" + return [flatten_path(path) for path in paths] + + +def XXXflatten_path(path): + """Take a path implemented as singly linked lists and return a regular list.""" + while path: + if len(path) == 1: + return path + else: + #print('>>>', len(path)) + #for x in path: + # print(' ', x) + first, rest = path + return [first] + flatten_path(rest) + + +def XXXprint_paths(paths, indent=''): + """Print paths, which may be flattened.""" + for path in paths: + print(indent, end='') + print_path(path) + print() + + +def XXXprint_path(p): + if isinstance(p, list): + print('[', end=' ') + for e in p: + print_path(e) + print(']', end=' ') + else: + print(p, end=' ') + + +def normalize_id(doc_ids: list, view: 'View', annotation: 'Annotation'): + """Change identifiers to include the view identifier if it wasn't included, + do nothing otherwise. This applies to the Annotation id, target, source, + document, targets and representatives properties. Note that timePoint is + not included because the value is an integer and not an identifier.""" + # TODO: this seems somewhat fragile + # TODO: spell out what doc_ids is for (to exclude source documents I think) + debug = False + attype = annotation.at_type.shortname + props = annotation.properties + if ':' not in annotation.id and view is not None: + if annotation.id not in doc_ids: + newid = f'{view.id}:{annotation.id}' + annotation.properties['id'] = newid + if 'document' in props: + doc_id = props['document'] + if ':' not in doc_id and view is not None: + if doc_id not in doc_ids: + props['document'] = f'{view.id}:{doc_id}' + if 'targets' in props: + new_targets = [] + for target in props['targets']: + if ':' not in target and view is not None: + if target not in doc_ids: + new_targets.append(f'{view.id}:{target}') + else: + new_targets.append(target) + props['targets'] = new_targets + if 'representatives' in props: + new_representatives = [] + for rep in props['representatives']: + if ':' not in rep and view is not None: + new_representatives.append(f'{view.id}:{rep}') + else: + new_representatives.append(rep) + props['representatives'] = new_representatives + if attype == 'Alignment': + if ':' not in props['source'] and view is not None: + if props['source'] not in doc_ids: + props['source'] = f'{view.id}:{props["source"]}' + if ':' not in props['target'] and view is not None: + if props['target'] not in doc_ids: + props['target'] = f'{view.id}:{props["target"]}' + if debug: + print('===', annotation) + + +def get_annotations_from_view(view, annotation_type): + """Return all annotations from a view that match the short name of the + annotation type.""" + # Note: there is method mmif.View.get_annotations() where you can give + # at_type as a parameter, but it requires a full match. + return [a for a in view.annotations + if a.at_type.shortname == annotation_type] + + +def find_matching_tokens(tokens, ne): + matching_tokens = [] + ne_start = ne.properties["start"] + ne_end = ne.properties["end"] + start_token = None + end_token = None + for token in tokens: + if token.properties['start'] == ne_start: + start_token = token + if token.properties['end'] == ne_end: + end_token = token + return start_token, end_token + + From 4c7c6a6ecca02a501ffcafe846ea8f55f28e5ecf Mon Sep 17 00:00:00 2001 From: Marc Verhagen Date: Thu, 8 Jan 2026 12:09:03 -0500 Subject: [PATCH 2/7] Added notes on how to add a CLI script and added the summarizer to the doc modules --- documentation/modules.rst | 1 + mmif/utils/cli/README.md | 71 +++++++++++++++++++++++++++++++++++++ mmif/utils/cli/summarize.py | 2 +- 3 files changed, 73 insertions(+), 1 deletion(-) create mode 100644 mmif/utils/cli/README.md diff --git a/documentation/modules.rst b/documentation/modules.rst index 4bb9307d..e32ade5d 100644 --- a/documentation/modules.rst +++ b/documentation/modules.rst @@ -9,6 +9,7 @@ mmif package autodoc/mmif.serialize autodoc/mmif.vocabulary autodoc/mmif.utils + autodoc/mmif.utils.summarizer mmif_docloc_http package ======================== diff --git a/mmif/utils/cli/README.md b/mmif/utils/cli/README.md new file mode 100644 index 00000000..6d04438d --- /dev/null +++ b/mmif/utils/cli/README.md @@ -0,0 +1,71 @@ +# MMIF CLI Scripts + +This directory contains CLI scripts like `source` and `rewind` that can be called from the command line. These scripts are called as subcommands of the `mmif` CLI script, for example `mmif source --help`. + + +## Adding another CLI script + +To add a CLI script all you need to do is add a python module to `mmif/utils/cli` and make sure it has the following three methods: + +1. `prep_argparser(**kwargs)` to define and return an instance of `argparse.ArgumentParser`. + +2. `describe_argparser()` to return a pair of strings that describe the script. The first string is a one-line description of the argument parser and the second a more verbose description. These will be shown for `mmif --help` and `mmif subcommand --help` respectively. + +3. `main(args)` to do the actual work of running the code + +See the current CLI scripts for examples. + + +## Some background + +The mmif-python package has a particular way to deal with CLI utility scripts. All scripts live in the mmif.utils.cli package. The `mmif/__init__.py` module has the `cli()` function which illustrates the requirements on utility scripts: + +```python +def cli(): + parser, subparsers = prep_argparser_and_subcmds() + cli_modules = {} + for cli_module in find_all_modules('mmif.utils.cli'): + cli_module_name = cli_module.__name__.rsplit('.')[-1] + cli_modules[cli_module_name] = cli_module + subcmd_parser = cli_module.prep_argparser(add_help=False) + subparsers.add_parser(cli_module_name, parents=[subcmd_parser], + help=cli_module.describe_argparser()[0], + description=cli_module.describe_argparser()[1], + formatter_class=argparse.RawDescriptionHelpFormatter) + if len(sys.argv) == 1: + parser.print_help(sys.stderr) + sys.exit(1) + args = parser.parse_args() + if args.subcmd not in cli_modules: + parser.print_help(sys.stderr) + else: + cli_modules[args.subcmd].main(args) +``` + + + +You can see the invocations of the three functions mentioned above. + +The `prep_argparser()` function uses `find_all_modules()`, which finds modules in the top-level of the cli package. That module could have all the code needed for the CLI to work, but it could refer to other modules as well. For example, the `summary.py` script is in `cli`, but it imports the summary utility from `mmif.utls`. + +In the setup.py script there is this passage towards the end of the file: + +```python + entry_points={ + 'console_scripts': [ + 'mmif = mmif.__init__:cli', + ], + }, +``` + +This leaves it up to the `cli()` method to find the scripts and this is why just adding a submodule as mentioned above works. Note that the initialization file of the cli package imports two of the commandline related scripts: + +```python +from mmif.utils.cli import rewind +from mmif.utils.cli import source +``` + +These may be used somewhere, but they are not necessary to run MMIF CLI scripts. + diff --git a/mmif/utils/cli/summarize.py b/mmif/utils/cli/summarize.py index 8b88c53d..c8e384c0 100644 --- a/mmif/utils/cli/summarize.py +++ b/mmif/utils/cli/summarize.py @@ -12,7 +12,7 @@ def describe_argparser() -> tuple: respectively. For now they return the same string. The retun value should still be a tuple because mmif.cli() depends on it. """ - oneliner = 'provides a CLI to create a JSON Summary for a MMIF file' + oneliner = 'Create a JSON Summary for a MMIF file' return oneliner, oneliner From 7cc973d2576637c1e11292289dd4de9519f1fa8c Mon Sep 17 00:00:00 2001 From: Marc Verhagen Date: Thu, 8 Jan 2026 13:25:48 -0500 Subject: [PATCH 3/7] Removed some deprecated methods because they broke the coverage tests of the pull request --- mmif/utils/summarizer/utils.py | 36 ---------------------------------- 1 file changed, 36 deletions(-) diff --git a/mmif/utils/summarizer/utils.py b/mmif/utils/summarizer/utils.py index 5920f8ce..95b15b86 100644 --- a/mmif/utils/summarizer/utils.py +++ b/mmif/utils/summarizer/utils.py @@ -193,42 +193,6 @@ def xml_data(text): return escape(str(text)) -def XXXflatten_paths(paths): - """Take paths implemented as singly linked lists and return regular lists.""" - return [flatten_path(path) for path in paths] - - -def XXXflatten_path(path): - """Take a path implemented as singly linked lists and return a regular list.""" - while path: - if len(path) == 1: - return path - else: - #print('>>>', len(path)) - #for x in path: - # print(' ', x) - first, rest = path - return [first] + flatten_path(rest) - - -def XXXprint_paths(paths, indent=''): - """Print paths, which may be flattened.""" - for path in paths: - print(indent, end='') - print_path(path) - print() - - -def XXXprint_path(p): - if isinstance(p, list): - print('[', end=' ') - for e in p: - print_path(e) - print(']', end=' ') - else: - print(p, end=' ') - - def normalize_id(doc_ids: list, view: 'View', annotation: 'Annotation'): """Change identifiers to include the view identifier if it wasn't included, do nothing otherwise. This applies to the Annotation id, target, source, From 8b310ebeb78ecf51122880978c1d521296c25f82 Mon Sep 17 00:00:00 2001 From: Marc Verhagen Date: Thu, 8 Jan 2026 13:39:48 -0500 Subject: [PATCH 4/7] Type checker from the coverage test does not like string-valued type hints --- mmif/utils/summarizer/utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mmif/utils/summarizer/utils.py b/mmif/utils/summarizer/utils.py index 95b15b86..61c3bc8b 100644 --- a/mmif/utils/summarizer/utils.py +++ b/mmif/utils/summarizer/utils.py @@ -7,6 +7,7 @@ from xml.sax.saxutils import quoteattr, escape from collections import UserList +from mmif import View, Annotation from mmif.utils.summarizer.config import KALDI, WHISPER, CAPTIONER, SEGMENTER from mmif.utils.summarizer.config import TOKEN, ALIGNMENT, TIME_FRAME @@ -193,7 +194,7 @@ def xml_data(text): return escape(str(text)) -def normalize_id(doc_ids: list, view: 'View', annotation: 'Annotation'): +def normalize_id(doc_ids: list, view: View, annotation: Annotation): """Change identifiers to include the view identifier if it wasn't included, do nothing otherwise. This applies to the Annotation id, target, source, document, targets and representatives properties. Note that timePoint is From cca671496eb0e45db93d5e4281808bdcc662ec27 Mon Sep 17 00:00:00 2001 From: Marc Verhagen Date: Thu, 8 Jan 2026 14:55:30 -0500 Subject: [PATCH 5/7] More cleanup and fixes for code coverage tests --- mmif/utils/summarizer/graph.py | 29 ++--------------------------- 1 file changed, 2 insertions(+), 27 deletions(-) diff --git a/mmif/utils/summarizer/graph.py b/mmif/utils/summarizer/graph.py index ae11b9be..642db16a 100644 --- a/mmif/utils/summarizer/graph.py +++ b/mmif/utils/summarizer/graph.py @@ -357,27 +357,6 @@ def _get_document(self): return None return None - def XXX_get_document_plus_span(self): - self.pp() - props = self.properties - return "%s:%s:%s" % (self.document.identifier, - props['start'], props['end']) - - def XXXpaths_to_docs(self): - """Return all the paths from the node to documents.""" - paths = self._paths_to_docs() - return flatten_paths(paths) - - def XXX_paths_to_docs(self): - paths = [] - if not self.targets: - return [[self]] - for t in self.targets: - paths.append([self]) - for i, target in enumerate(self.targets): - paths[i].extend(target._paths_to_docs()) - return paths - def summary(self): """The default summary is just the identfier, this should typically be overriden by sub classes.""" @@ -464,8 +443,8 @@ def start_in_video(self): def end_in_video(self): return self.anchor().get('video-end') - def pp(self): - super().pp(close=False) + def pp(self, close=False): + super().pp(close=close) try: for i, p in enumerate(self.paths_to_docs()): print(' %s' % ' '.join([str(n) for n in p[1:]])) @@ -478,10 +457,6 @@ def summary(self): the entity occurs, it is not enough to just give the text document.""" # TODO: in the old days this used an anchor() method which was fragile # TODO: revamping it now - - #anchor = self.anchor() - #self.document.pp() -# print('...', self.document.anchors return { 'id': self.identifier, 'group': self.properties['group'], From d2498f8a65975b64de7fdd2619a146e6110f96f2 Mon Sep 17 00:00:00 2001 From: Marc Verhagen Date: Thu, 8 Jan 2026 15:08:02 -0500 Subject: [PATCH 6/7] More changes to satisfy typing requirements from the code coverage tests --- mmif/utils/summarizer/graph.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/mmif/utils/summarizer/graph.py b/mmif/utils/summarizer/graph.py index 642db16a..3f0b18c9 100644 --- a/mmif/utils/summarizer/graph.py +++ b/mmif/utils/summarizer/graph.py @@ -8,8 +8,6 @@ from mmif.utils.summarizer import config from mmif.utils.summarizer.utils import compose_id, normalize_id -#from summarizer.utils import compose_id, flatten_paths, normalize_id - class Graph(object): @@ -86,7 +84,9 @@ def add_edge(self, view, alignment): def get_node(self, node_id): return self.nodes.get(node_id) - def get_nodes(self, short_at_type: str, view_id : str = None): + # def get_nodes(self, short_at_type: str, view_id : str = None): + # replaced the above because the code coverage is picky on type hints + def get_nodes(self, short_at_type: str, view_id=None): """Get all nodes for an annotation type, using the short form. If a view identifier is provided then only include nodes from that view.""" return [node for node in self.nodes.values() @@ -443,6 +443,10 @@ def start_in_video(self): def end_in_video(self): return self.anchor().get('video-end') + ''' + Commented this out because the type checking in the code coverage tests requires + the default vaue for the close parameter to be the same as on Node.pp(). + def pp(self, close=False): super().pp(close=close) try: @@ -451,6 +455,7 @@ def pp(self, close=False): except ValueError: print(' WARNING: error in path_to_docs in NamedEntityNode.pp()') print('-' * 80) + ''' def summary(self): """The summary for entities needs to include where in the video or image From 51983c5f3d443d15749dd7986bae162429693c8a Mon Sep 17 00:00:00 2001 From: Marc Verhagen Date: Mon, 12 Jan 2026 14:10:14 -0500 Subject: [PATCH 7/7] Making code pass pytype in Python 3.10, 3.11 and 3.12 ; some cleanup. --- mmif/serialize/mmif.py | 6 +- mmif/utils/cli/summarize.py | 16 +- mmif/utils/summarizer/__init__.py | 16 +- mmif/utils/summarizer/graph.py | 385 ++---------------------------- mmif/utils/summarizer/nodes.py | 370 ++++++++++++++++++++++++++++ mmif/utils/summarizer/summary.py | 40 ++-- 6 files changed, 414 insertions(+), 419 deletions(-) create mode 100644 mmif/utils/summarizer/nodes.py diff --git a/mmif/serialize/mmif.py b/mmif/serialize/mmif.py index 9e94496d..c6fa4c62 100644 --- a/mmif/serialize/mmif.py +++ b/mmif/serialize/mmif.py @@ -15,7 +15,7 @@ import warnings from collections import defaultdict from datetime import datetime -from typing import List, Union, Optional, Dict, cast, Iterator +from typing import Any, List, Union, Optional, Dict, cast, Iterator import jsonschema.validators @@ -487,11 +487,11 @@ def get_documents_in_view(self, vid: Optional[str] = None) -> List[Document]: else: return [] - def get_documents_by_type(self, doc_type: Union[str, DocumentTypes]) -> List[Document]: + def get_documents_by_type(self, doc_type: Any) -> List[Document]: """ Method to get all documents where the type matches a particular document type, which should be one of the CLAMS document types. - :param doc_type: the type of documents to search for, must be one of ``Document`` type defined in the CLAMS vocabulary. + :param doc_type: the type of documents to search for, must be one of ``Document`` types defined in the CLAMS vocabulary. :return: a list of documents matching the requested type, or an empty list if none found. """ docs = [] diff --git a/mmif/utils/cli/summarize.py b/mmif/utils/cli/summarize.py index c8e384c0..06c1afae 100644 --- a/mmif/utils/cli/summarize.py +++ b/mmif/utils/cli/summarize.py @@ -22,22 +22,10 @@ def prep_argparser(**kwargs): formatter_class=argparse.RawDescriptionHelpFormatter, **kwargs) parser.add_argument("-i", metavar='MMIF_FILE', help='input MMIF file', required=True) - parser.add_argument("-o", metavar='JSON_FILE', help='output JSON summary file', required=True) - parser.add_argument("--full", action="store_true", help="print full report") - parser.add_argument('--transcript', action='store_true', help='include transcript') - parser.add_argument('--captions', action='store_true', help='include Llava captions') - parser.add_argument('--timeframes', action='store_true', help='include all time frames') - parser.add_argument('--entities', action='store_true', help='include entities from transcript') + parser.add_argument("-o", metavar='OUTPUT_FILE', help='output JSON summary file', required=True) return parser def main(args): - #print('>>>', args) mmif_summary = Summary(args.i) - #print('>>>', mmif_summary) - mmif_summary.report( - outfile=args.o, full=args.full, - #timeframes=args.timeframes, transcript=args.transcript, - #captions=args.captions, entities=args.entities - ) - + mmif_summary.report(outfile=args.o) diff --git a/mmif/utils/summarizer/__init__.py b/mmif/utils/summarizer/__init__.py index 59a980fe..1122d449 100644 --- a/mmif/utils/summarizer/__init__.py +++ b/mmif/utils/summarizer/__init__.py @@ -8,11 +8,6 @@ def argparser(): parser = argparse.ArgumentParser(description='Create a JSON Summary for a MMIF file') parser.add_argument('-i', metavar='MMIF_FILE', help='input MMIF file', required=True) parser.add_argument('-o', metavar='JSON_FILE', help='output JSON summary file', required=True) - parser.add_argument('--full', action='store_true', help='create full report') - parser.add_argument('--transcript', action='store_true', help='include transcript') - parser.add_argument('--captions', action='store_true', help='include Llava captions') - parser.add_argument('--timeframes', action='store_true', help='include all time frames') - parser.add_argument('--entities', action='store_true', help='include entities from transcript') return parser @@ -26,14 +21,10 @@ def main(): args = parser.parse_args() #pp_args(args) mmif_summary = Summary(args.i) - mmif_summary.report( - outfile=args.o, full=args.full, - timeframes=args.timeframes, transcript=args.transcript, - captions=args.captions, entities=args.entities) + mmif_summary.report(outfile=args.o) """ - There used to be an option to process a whole directory, but I never used it and decided that if needed it would better be done by an extra script or a separate function. @@ -45,8 +36,5 @@ def main(): print(mmif_file) json_file = str(mmif_file)[:-4] + 'json' mmif_summary = Summary(mmif_file.read_text()) - mmif_summary.report( - outfile=json_file, full=args.full, - timeframes=args.timeframes, transcript=args.transcript, - captions=args.captions, entities=args.entities) + mmif_summary.report(outfile=json_file) """ \ No newline at end of file diff --git a/mmif/utils/summarizer/graph.py b/mmif/utils/summarizer/graph.py index 3f0b18c9..55c38ffd 100644 --- a/mmif/utils/summarizer/graph.py +++ b/mmif/utils/summarizer/graph.py @@ -4,10 +4,12 @@ from pathlib import Path import argparse +from typing import Any from mmif import Mmif from mmif.utils.summarizer import config from mmif.utils.summarizer.utils import compose_id, normalize_id +from mmif.utils.summarizer.nodes import Node, Nodes, EntityNode, TimeFrameNode class Graph(object): @@ -20,7 +22,9 @@ class Graph(object): The goal for the graph is to store all useful annotation and to have simple ways to trace nodes all the way up to the primary data.""" - def __init__(self, mmif): + def __init__(self, mmif: Any): + # TODO: the type hint should really be "MMif | str", but pytype did not + # like that. self.mmif = mmif if type(mmif) is Mmif else Mmif(mmif) self.documents = [] self.nodes = {} @@ -74,14 +78,18 @@ def add_edge(self, view, alignment): #print(alignment.id, source_id, target_id) source = self.get_node(source_id) target = self.get_node(target_id) - # make sure the direction goes from token or textdoc to annotation - if target.annotation.at_type.shortname in (config.TOKEN, config.TEXT_DOCUMENT): - source, target = target, source - source.targets.append(target) - source.add_anchors_from_alignment(target) - target.add_anchors_from_alignment(source) - - def get_node(self, node_id): + if source is None or target is None: + print('WARNING: could not add edge ', + 'because the source and/or target does not extst') + else: + # make sure the direction goes from token or textdoc to annotation + if target.annotation.at_type.shortname in (config.TOKEN, config.TEXT_DOCUMENT): + source, target = target, source + source.targets.append(target) + source.add_anchors_from_alignment(target) + target.add_anchors_from_alignment(source) + + def get_node(self, node_id) -> Node | None: return self.nodes.get(node_id) # def get_nodes(self, short_at_type: str, view_id : str = None): @@ -116,7 +124,7 @@ def trim(self, start: int, end: int): new_nodes = [n for n in self.nodes.values() if not n.identifier in remove] self.nodes = { node.identifier: node for node in new_nodes } - def pp(self, fname=None,skip_timepoints=False): + def pp(self, fname=None, skip_timepoints=False): fh = sys.stdout if fname is None else open(fname, 'w') fh.write("%s\n" % self) for view in self.mmif.views: @@ -174,7 +182,7 @@ def __len__(self): def __str__(self): return f'' - def get_tokens_for_node(self, node): + def get_tokens_for_node(self, node: Node): """Return all tokens included in the span of a node.""" doc = node.document.identifier try: @@ -196,361 +204,6 @@ def pp(self, fname=None): fh.write(' %s %s\n' % (t[0], t[1])) -class Node(object): - - def __init__(self, graph, view, annotation): - self.graph = graph - self.view = view - self.view_id = None if self.view is None else self.view.id - self.annotation = annotation - # copy some information from the Annotation - self.at_type = annotation.at_type - self.identifier = annotation.id - self.properties = json.loads(str(annotation.properties)) - # get the document from the view or the properties - self.document = self._get_document() - # The targets property contains a list of annotations or documents that - # the node content points to. This includes the document the annotation - # points to as well as the alignment from a token or text document to a - # bounding box or time frame (which is added later). - # TODO: the above does not seem to be true since there is no evidence of - # data from alignments being added. - self.targets = [] if self.document is None else [self.document] - self.anchors = {} - self.add_local_anchors() - self.add_anchors_from_targets() - - def __str__(self): - anchor = '' - if self.at_type.shortname == config.TOKEN: - anchor = " %s:%s '%s'" % (self.properties['start'], - self.properties['end'], - self.properties.get('text','').replace('\n', '\\n')) - return "<%s %s%s>" % (self.at_type.shortname, self.identifier, anchor) - - def add_local_anchors(self): - """Get the anchors that you can get from the annotation itself, which - includes the start and end offsets, the coordinates, the timePoint of - a BoundingBox and any annotation with targets.""" - props = self.properties - attype = self.annotation.at_type.shortname - if 'start' in props and 'end' in props: - # TimeFrame is the only non-character based interval so this simple - # if-then-else should work - if attype == config.TIME_FRAME: - self.anchors['text-offsets'] = (props['start'], props['end']) - else: - self.anchors['time-offsets'] = (props['start'], props['end']) - if 'coordinates' in props: - self.anchors['coordinates'] = props['coordinates'] - if 'timePoint' in props: - self.anchors['time-point'] = props['timePoint'] - if 'targets' in props: - self.anchors['targets'] = props['targets'] - - def add_anchors_from_targets(self): - """Get start and end offsets or timePoints from the targets and add them to - the anchors, but only if there were no anchors on the node already. This has - two cases: one for TimeFrames and one for text intervals.""" - props = self.properties - attype = self.annotation.at_type.shortname - if 'targets' in props: - try: - t1 = self.graph.nodes[props['targets'][0]] - t2 = self.graph.nodes[props['targets'][-1]] - if attype == config.TIME_FRAME: - if not 'time-offsets' in props: - self.anchors['time-offsets'] = ( - t1.properties['timePoint'], t2.properties['timePoint']) - else: - if not 'text-offsets' in props: - self.anchors['text-offsets'] = ( - t1.properties['start'], t2.properties['end']) - except IndexError: - print(f'WARNING: Unexpected empty target list for {self.identifier}') - - def add_anchors_from_alignment(self, target: None, debug=False): - source_attype = self.at_type.shortname - target_attype = target.at_type.shortname - if debug: - print('\n@ DEBUG SOURCE->TARGET ', source_attype, target_attype) - print('@ DEBUG SOURCE.PROPS ', list(self.properties.keys())) - print('@ DEBUG TARGET.PROPS ', list(target.properties.keys())) - print('@ DEBUG TARGET.ANCHORS ', target.anchors) - # If a TextDocument is aligned to a BoundingBox then we grab the coordinates - # TODO: how are we getting the time point? - if source_attype == 'TextDocument' and target_attype == 'BoundingBox': - if 'coordinates' in target.properties: - self.anchors['coordinates'] = target.properties['coordinates'] - #print(source_attype, self.anchors) - elif source_attype == 'BoundingBox' and target_attype == 'TextDocument': - pass - # If a TextDocument is aligned to a TimeFrame then we copy time anchors - # but also targets and representatives, the latter because some alignments - # are not precise - elif source_attype == 'TextDocument' and target_attype == 'TimeFrame': - if 'start' in target.properties and 'end' in target.properties: - self.anchors['time-offsets'] = (target.properties['start'], - target.properties['end']) - if 'time-offsets' in target.anchors: - # TODO: is this ever used? - self.anchors['time-offsets'] = target.anchors['time-offsets'] - if 'targets' in target.properties: - self.anchors['targets'] = target.properties['targets'] - if 'representatives' in target.properties: - self.anchors['representatives'] = target.properties['representatives'] - #print('-', source_attype, self.anchors, self, target) - elif source_attype == 'TimeFrame' and target_attype == 'TextDocument': - pass - # Simply copy the time point - elif source_attype == 'TextDocument' and target_attype == 'TimePoint': - self.anchors['time-point'] = target.anchors['time-point'] - if debug: - print('+ ADDED SOURCE.ANCHORS ', self.anchors) - # For Token-TimeFrame alignments all we need are the start and end time points - elif source_attype == 'Token' and target_attype == 'TimeFrame': - if 'start' in target.properties and 'end' in target.properties: - self.anchors['time-offsets'] = (target.properties['start'], - target.properties['end']) - #print(source_attype, self.anchors) - elif source_attype == 'TimeFrame' and target_attype == 'Token': - pass - # TODO: check whether some action is needed for the next options - elif source_attype == 'TextDocument' and target_attype == 'VideoDocument': - pass - elif source_attype == 'VideoDocument' and target_attype == 'TextDocument': - pass - elif source_attype == 'BoundingBox' and target_attype == 'TimePoint': - pass - elif source_attype =='TimePoint' and target_attype == 'BoundingBox': - pass - elif source_attype == 'BoundingBox' and target_attype in ('Token', 'Sentence', 'Paragraph'): - pass - elif source_attype in ('Token', 'Sentence', 'Paragraph') and target_attype == 'BoundingBox': - pass - elif source_attype == 'TextDocument' and target_attype == 'TimePoint': - pass - elif source_attype == 'TimePoint' and target_attype == 'TextDocument': - pass - else: - print('-', source_attype, target_attype) - #if debug: - # print('DEBUG', self.anchors) - - def _get_document(self): - """Return the document or annotation node that the annotation/document in - the node refers to via the document property. This could be a local property - or a metadata property if there is no such local property. Return None - if neither of those exist.""" - # try the local property - docid = self.properties.get('document') - if docid is not None: - # print('>>>', docid, self.graph.get_node(docid)) - return self.graph.get_node(docid) - # try the metadata property - if self.view is not None: - try: - metadata = self.view.metadata.contains[self.at_type] - docid = metadata['document'] - return self.graph.get_node(docid) - except KeyError: - return None - return None - - def summary(self): - """The default summary is just the identfier, this should typically be - overriden by sub classes.""" - return { 'id': self.identifier } - - def pp(self, close=True): - print('-' * 80) - print(self) - print(f' document = {self.document}') - for prop in self.properties: - print(f' {prop} = {self.properties[prop]}') - print(' targets = ') - for target in self.targets: - print(' ', target) - print(' anchors = ') - for anchor in self.anchors: - print(f' {anchor} -> {self.anchors[anchor]}') - if close: - print('-' * 80) - - -class TimeFrameNode(Node): - - def __str__(self): - frame_type = ' ' + self.frame_type() if self.has_label() else '' - return ('' - % (self.identifier, self.start(), self.end(), frame_type)) - - def start(self): - return self.properties.get('start', -1) - - def end(self): - return self.properties.get('end', -1) - - def frame_type(self): - # TODO: rename this, uses old property since replaced by "label"" - # NOTE: this is still aloowing for the old property though - return self.properties.get('label') or self.properties.get('frameType') - - def has_label(self): - return self.frame_type() is not None - - def representatives(self) -> list: - """Return a list of the representative TimePoints.""" - # TODO: why could I not get this from the anchors? - rep_ids = self.properties.get('representatives', []) - reps = [self.graph.get_node(rep_id) for rep_id in rep_ids] - return reps - - def summary(self): - """The summary of a time frame just contains the identifier, start, end - and frame type.""" - return { 'id': self.identifier, - 'start': self.properties['start'], - 'end': self.properties['end'], - 'frameType': self.properties.get('frameType') } - - -class EntityNode(Node): - - def __init__(self, graph, view, annotation): - super().__init__(graph, view, annotation) - self.tokens = [] - self._paths = None - self._anchor = None - - def __str__(self): - try: - start = self.properties['start'] - end = self.properties['end'] - except KeyError: - start, end = self.anchors['text-offsets'] - return ("" - % (self.identifier, start, end, self.properties['text'])) - - def start_in_video(self): - #print('+++', self.document.properties) - try: - return self.document.anchors['time-point'] - except KeyError: - return -1 - #return self.anchor()['video-start'] - - def end_in_video(self): - return self.anchor().get('video-end') - - ''' - Commented this out because the type checking in the code coverage tests requires - the default vaue for the close parameter to be the same as on Node.pp(). - - def pp(self, close=False): - super().pp(close=close) - try: - for i, p in enumerate(self.paths_to_docs()): - print(' %s' % ' '.join([str(n) for n in p[1:]])) - except ValueError: - print(' WARNING: error in path_to_docs in NamedEntityNode.pp()') - print('-' * 80) - ''' - - def summary(self): - """The summary for entities needs to include where in the video or image - the entity occurs, it is not enough to just give the text document.""" - # TODO: in the old days this used an anchor() method which was fragile - # TODO: revamping it now - return { - 'id': self.identifier, - 'group': self.properties['group'], - 'cat': self.properties['category'], - 'document': self.document.identifier, - # Entities in a TextDocument that is a full transcript without any - # alignments do not have a TimePoint - #'time-point': self.document.anchors.get('time-point'), - #'text-offsets': self.anchors.get('text-offsets'), - 'time-point': self.document.anchors.get('time-point', -1), - 'text-offsets': self.anchors.get('text-offsets', (-1 ,-1)), - #'document': self._get_document_plus_span(), - #'video-start': anchor.get('video-start'), - #'video-end': anchor.get('video-end'), - #'coordinates': self._coordinates_as_string(anchor) - } - - def anchor(self): - """The anchor is the position in the video that the entity is linked to. - This anchor cannot be found in the document property because that points - to a text document that was somehow derived from the video document. Some - graph traversal is needed to get the anchor, but we know that the anchor - is always a time frame or a bounding box. - """ - # TODO: deal with the case where the primary document is not a video - self.paths = self.paths_to_docs() - bbtf = self.find_boundingbox_or_timeframe() - # for path in paths: - # print('... [') - # for n in path: print(' ', n) - # print('===', bbtf) - if bbtf.at_type.shortname == config.BOUNDING_BOX: - return {'video-start': bbtf.properties['timePoint'], - 'coordinates': bbtf.properties['coordinates']} - elif bbtf.at_type.shortname == config.TIME_FRAME: - return {'video-start': bbtf.properties['start'], - 'video-end': bbtf.properties['end']} - - def anchor2(self): - """The anchor is the position in the video that the entity is linked to. - This anchor cannot be found in the document property because that points - to a text document that was somehow derived from the video document. Some - graph traversal is needed to get the anchor, but we know that the anchor - is always a time frame or a bounding box. - """ - # TODO: with this version you get an error that the paths variable does - # not exist yet, must get a clearer picture on how to build a graph - # where nodes have paths to anchors - # TODO: deal with the case where the primary document is not a video - if self._anchor is None: - self._paths = self.paths_to_docs() - bbtf = self.find_boundingbox_or_timeframe() - # for path in self._paths: - # print('... [') - # for n in path: print(' ', n) - # print('===', bbtf) - if bbtf.at_type.shortname == config.BOUNDING_BOX: - self._anchor = {'video-start': bbtf.properties['timePoint'], - 'coordinates': bbtf.properties['coordinates']} - elif bbtf.at_type.shortname == config.TIME_FRAME: - self._anchor = {'video-start': bbtf.properties['start'], - 'video-end': bbtf.properties['end']} - return self._anchor - - def find_boundingbox_or_timeframe(self): - return self.paths[-1][-2] - - @staticmethod - def _coordinates_as_string(anchor): - if 'coordinates' not in anchor: - return None - return ','.join(["%s:%s" % (pair[0], pair[1]) - for pair in anchor['coordinates']]) - - -class Nodes(object): - - """Factory class for Node creation. Use Node for creation unless a special - class was registered for the kind of annotation we have.""" - - node_classes = { config.NAMED_ENTITY: EntityNode, - config.TIME_FRAME: TimeFrameNode } - - @classmethod - def new(cls, graph, view, annotation): - node_class = cls.node_classes.get(annotation.at_type.shortname, Node) - return node_class(graph, view, annotation) - - if __name__ == '__main__': diff --git a/mmif/utils/summarizer/nodes.py b/mmif/utils/summarizer/nodes.py new file mode 100644 index 00000000..53201022 --- /dev/null +++ b/mmif/utils/summarizer/nodes.py @@ -0,0 +1,370 @@ +import json + +from typing import Any + +from mmif.utils.summarizer import config + + + +class Node(object): + + def __init__(self, graph, view, annotation): + self.graph = graph + self.view = view + self.view_id = None if self.view is None else self.view.id + self.annotation = annotation + # copy some information from the Annotation + self.at_type = annotation.at_type + self.identifier = annotation.id + self.properties = json.loads(str(annotation.properties)) + # get the document from the view or the properties + self.document = self._get_document() + # The targets property contains a list of annotations or documents that + # the node content points to. This includes the document the annotation + # points to as well as the alignment from a token or text document to a + # bounding box or time frame (which is added later). + # TODO: the above does not seem to be true since there is no evidence of + # data from alignments being added. + self.targets = [] if self.document is None else [self.document] + self.anchors = {} + self.add_local_anchors() + self.add_anchors_from_targets() + + def __str__(self): + anchor = '' + if self.at_type.shortname == config.TOKEN: + anchor = " %s:%s '%s'" % (self.properties['start'], + self.properties['end'], + self.properties.get('text','').replace('\n', '\\n')) + return "<%s %s%s>" % (self.at_type.shortname, self.identifier, anchor) + + def add_local_anchors(self): + """Get the anchors that you can get from the annotation itself, which + includes the start and end offsets, the coordinates, the timePoint of + a BoundingBox and any annotation with targets.""" + props = self.properties + attype = self.annotation.at_type.shortname + if 'start' in props and 'end' in props: + # TimeFrame is the only non-character based interval so this simple + # if-then-else should work + if attype == config.TIME_FRAME: + self.anchors['text-offsets'] = (props['start'], props['end']) + else: + self.anchors['time-offsets'] = (props['start'], props['end']) + if 'coordinates' in props: + self.anchors['coordinates'] = props['coordinates'] + if 'timePoint' in props: + self.anchors['time-point'] = props['timePoint'] + if 'targets' in props: + self.anchors['targets'] = props['targets'] + + def add_anchors_from_targets(self): + """Get start and end offsets or timePoints from the targets and add them to + the anchors, but only if there were no anchors on the node already. This has + two cases: one for TimeFrames and one for text intervals.""" + props = self.properties + attype = self.annotation.at_type.shortname + if 'targets' in props: + try: + t1 = self.graph.nodes[props['targets'][0]] + t2 = self.graph.nodes[props['targets'][-1]] + if attype == config.TIME_FRAME: + if not 'time-offsets' in props: + self.anchors['time-offsets'] = ( + t1.properties['timePoint'], t2.properties['timePoint']) + else: + if not 'text-offsets' in props: + self.anchors['text-offsets'] = ( + t1.properties['start'], t2.properties['end']) + except IndexError: + print(f'WARNING: Unexpected empty target list for {self.identifier}') + + def add_anchors_from_alignment(self, target: Any, debug=False): + if target is None: + return + source_attype = self.at_type.shortname + target_attype = target.at_type.shortname + if debug: + print('\n@ DEBUG SOURCE->TARGET ', source_attype, target_attype) + print('@ DEBUG SOURCE.PROPS ', list(self.properties.keys())) + print('@ DEBUG TARGET.PROPS ', list(target.properties.keys())) + print('@ DEBUG TARGET.ANCHORS ', target.anchors) + # If a TextDocument is aligned to a BoundingBox then we grab the coordinates + # TODO: how are we getting the time point? + if source_attype == 'TextDocument' and target_attype == 'BoundingBox': + if 'coordinates' in target.properties: + self.anchors['coordinates'] = target.properties['coordinates'] + #print(source_attype, self.anchors) + elif source_attype == 'BoundingBox' and target_attype == 'TextDocument': + pass + # If a TextDocument is aligned to a TimeFrame then we copy time anchors + # but also targets and representatives, the latter because some alignments + # are not precise + elif source_attype == 'TextDocument' and target_attype == 'TimeFrame': + if 'start' in target.properties and 'end' in target.properties: + self.anchors['time-offsets'] = (target.properties['start'], + target.properties['end']) + if 'time-offsets' in target.anchors: + # TODO: is this ever used? + self.anchors['time-offsets'] = target.anchors['time-offsets'] + if 'targets' in target.properties: + self.anchors['targets'] = target.properties['targets'] + if 'representatives' in target.properties: + self.anchors['representatives'] = target.properties['representatives'] + #print('-', source_attype, self.anchors, self, target) + elif source_attype == 'TimeFrame' and target_attype == 'TextDocument': + pass + # Simply copy the time point + elif source_attype == 'TextDocument' and target_attype == 'TimePoint': + self.anchors['time-point'] = target.anchors['time-point'] + if debug: + print('+ ADDED SOURCE.ANCHORS ', self.anchors) + # For Token-TimeFrame alignments all we need are the start and end time points + elif source_attype == 'Token' and target_attype == 'TimeFrame': + if 'start' in target.properties and 'end' in target.properties: + self.anchors['time-offsets'] = (target.properties['start'], + target.properties['end']) + #print(source_attype, self.anchors) + elif source_attype == 'TimeFrame' and target_attype == 'Token': + pass + # TODO: check whether some action is needed for the next options + elif source_attype == 'TextDocument' and target_attype == 'VideoDocument': + pass + elif source_attype == 'VideoDocument' and target_attype == 'TextDocument': + pass + elif source_attype == 'BoundingBox' and target_attype == 'TimePoint': + pass + elif source_attype =='TimePoint' and target_attype == 'BoundingBox': + pass + elif source_attype == 'BoundingBox' and target_attype in ('Token', 'Sentence', 'Paragraph'): + pass + elif source_attype in ('Token', 'Sentence', 'Paragraph') and target_attype == 'BoundingBox': + pass + elif source_attype == 'TextDocument' and target_attype == 'TimePoint': + pass + elif source_attype == 'TimePoint' and target_attype == 'TextDocument': + pass + else: + print('-', source_attype, target_attype) + #if debug: + # print('DEBUG', self.anchors) + + def _get_document(self): + """Return the document or annotation node that the annotation/document in + the node refers to via the document property. This could be a local property + or a metadata property if there is no such local property. Return None + if neither of those exist.""" + # try the local property + docid = self.properties.get('document') + if docid is not None: + # print('>>>', docid, self.graph.get_node(docid)) + return self.graph.get_node(docid) + # try the metadata property + if self.view is not None: + try: + metadata = self.view.metadata.contains[self.at_type] + docid = metadata['document'] + return self.graph.get_node(docid) + except KeyError: + return None + return None + + def summary(self): + """The default summary is just the identfier, this should typically be + overriden by sub classes.""" + return { 'id': self.identifier } + + def has_label(self): + """Only TimeFrameNodes can have labels so this returns False.""" + return False + + def pp(self, close=True): + print('-' * 80) + print(self) + print(f' document = {self.document}') + for prop in self.properties: + print(f' {prop} = {self.properties[prop]}') + print(' targets = ') + for target in self.targets: + print(' ', target) + print(' anchors = ') + for anchor in self.anchors: + print(f' {anchor} -> {self.anchors[anchor]}') + if close: + print('-' * 80) + + +class TimeFrameNode(Node): + + def __str__(self): + frame_type = ' ' + self.frame_type() if self.has_label() else '' + return ('' + % (self.identifier, self.start(), self.end(), frame_type)) + + def start(self): + return self.properties.get('start', -1) + + def end(self): + return self.properties.get('end', -1) + + def frame_type(self): + # TODO: rename this, uses old property since replaced by "label"" + # NOTE: this is still aloowing for the old property though + return self.properties.get('label') or self.properties.get('frameType') + + def has_label(self): + return self.frame_type() is not None + + def representatives(self) -> list: + """Return a list of the representative TimePoints.""" + # TODO: why could I not get this from the anchors? + rep_ids = self.properties.get('representatives', []) + reps = [self.graph.get_node(rep_id) for rep_id in rep_ids] + return reps + + def summary(self): + """The summary of a time frame just contains the identifier, start, end + and frame type.""" + return { 'id': self.identifier, + 'start': self.properties['start'], + 'end': self.properties['end'], + 'frameType': self.properties.get('frameType') } + + +class EntityNode(Node): + + def __init__(self, graph, view, annotation): + super().__init__(graph, view, annotation) + self.tokens = [] + self._paths = None + self._anchor = None + + def __str__(self): + try: + start = self.properties['start'] + end = self.properties['end'] + except KeyError: + start, end = self.anchors['text-offsets'] + return ("" + % (self.identifier, start, end, self.properties['text'])) + + def start_in_video(self): + #print('+++', self.document.properties) + try: + return self.document.anchors['time-point'] + except KeyError: + return -1 + #return self.anchor()['video-start'] + + def end_in_video(self): + return self.anchor().get('video-end') + + ''' + Commented this out because the type checking in the code coverage tests requires + the default vaue for the close parameter to be the same as on Node.pp(). + + def pp(self, close=False): + super().pp(close=close) + try: + for i, p in enumerate(self.paths_to_docs()): + print(' %s' % ' '.join([str(n) for n in p[1:]])) + except ValueError: + print(' WARNING: error in path_to_docs in NamedEntityNode.pp()') + print('-' * 80) + ''' + + def summary(self): + """The summary for entities needs to include where in the video or image + the entity occurs, it is not enough to just give the text document.""" + # TODO: in the old days this used an anchor() method which was fragile + # TODO: revamping it now + return { + 'id': self.identifier, + 'group': self.properties['group'], + 'cat': self.properties['category'], + 'document': self.document.identifier, + # Entities in a TextDocument that is a full transcript without any + # alignments do not have a TimePoint + #'time-point': self.document.anchors.get('time-point'), + #'text-offsets': self.anchors.get('text-offsets'), + 'time-point': self.document.anchors.get('time-point', -1), + 'text-offsets': self.anchors.get('text-offsets', (-1 ,-1)), + #'document': self._get_document_plus_span(), + #'video-start': anchor.get('video-start'), + #'video-end': anchor.get('video-end'), + #'coordinates': self._coordinates_as_string(anchor) + } + + def anchor(self) -> dict: + """The anchor is the position in the video that the entity is linked to. + This anchor cannot be found in the document property because that points + to a text document that was somehow derived from the video document. Some + graph traversal is needed to get the anchor, but we know that the anchor + is always a time frame or a bounding box. + """ + # TODO: deal with the case where the primary document is not a video + self.paths = self.paths_to_docs() + bbtf = self.find_boundingbox_or_timeframe() + # for path in paths: + # print('... [') + # for n in path: print(' ', n) + # print('===', bbtf) + if bbtf.at_type.shortname == config.BOUNDING_BOX: + return {'video-start': bbtf.properties['timePoint'], + 'coordinates': bbtf.properties['coordinates']} + elif bbtf.at_type.shortname == config.TIME_FRAME: + return {'video-start': bbtf.properties['start'], + 'video-end': bbtf.properties['end']} + else: + return {} + + def anchor2(self): + """The anchor is the position in the video that the entity is linked to. + This anchor cannot be found in the document property because that points + to a text document that was somehow derived from the video document. Some + graph traversal is needed to get the anchor, but we know that the anchor + is always a time frame or a bounding box. + """ + # TODO: with this version you get an error that the paths variable does + # not exist yet, must get a clearer picture on how to build a graph + # where nodes have paths to anchors + # TODO: deal with the case where the primary document is not a video + if self._anchor is None: + self._paths = self.paths_to_docs() + bbtf = self.find_boundingbox_or_timeframe() + # for path in self._paths: + # print('... [') + # for n in path: print(' ', n) + # print('===', bbtf) + if bbtf.at_type.shortname == config.BOUNDING_BOX: + self._anchor = {'video-start': bbtf.properties['timePoint'], + 'coordinates': bbtf.properties['coordinates']} + elif bbtf.at_type.shortname == config.TIME_FRAME: + self._anchor = {'video-start': bbtf.properties['start'], + 'video-end': bbtf.properties['end']} + return self._anchor + + def find_boundingbox_or_timeframe(self): + return self.paths[-1][-2] + + @staticmethod + def _coordinates_as_string(anchor): + if 'coordinates' not in anchor: + return None + return ','.join(["%s:%s" % (pair[0], pair[1]) + for pair in anchor['coordinates']]) + + +class Nodes(object): + + """Factory class for Node creation. Use Node for creation unless a special + class was registered for the kind of annotation we have.""" + + node_classes = { config.NAMED_ENTITY: EntityNode, + config.TIME_FRAME: TimeFrameNode } + + @classmethod + def new(cls, graph, view, annotation): + node_class = cls.node_classes.get(annotation.at_type.shortname, Node) + return node_class(graph, view, annotation) + diff --git a/mmif/utils/summarizer/summary.py b/mmif/utils/summarizer/summary.py index b340f35b..a5c9bb07 100644 --- a/mmif/utils/summarizer/summary.py +++ b/mmif/utils/summarizer/summary.py @@ -81,11 +81,11 @@ from mmif.serialize import Mmif from mmif.vocabulary import DocumentTypes +from mmif.utils.summarizer import config from mmif.utils.summarizer.utils import CharacterList from mmif.utils.summarizer.utils import get_aligned_tokens, timestamp from mmif.utils.summarizer.utils import get_transcript_view, get_last_segmenter_view, get_captions_view from mmif.utils.summarizer.graph import Graph -from mmif.utils.summarizer import config VERSION = '0.2.0' @@ -150,23 +150,19 @@ def validate(self): def video_documents(self): return self.mmif.get_documents_by_type(DocumentTypes.VideoDocument) - def report(self, outfile=None, html=None, full=False, timeframes=False, - transcript=False, captions=False, entities=False): + def report(self, outfile=None): json_obj = { 'mmif_version': self.mmif.metadata.mmif, 'document': self.document.data, 'documents': self.documents.data, 'annotations': self.annotations.data, - 'views': self.views.data} - if transcript or full: - json_obj['transcript'] = self.transcript.data - if captions or full: - json_obj['captions'] = self.captions.as_json() - if timeframes or full: - json_obj['timeframes'] = self.timeframes.as_json() - json_obj['timeframe_stats'] = self.timeframe_stats.data - if entities or full: - json_obj['entities'] = self.entities.as_json() + 'views': self.views.data, + 'transcript': self.transcript.data, + 'captions': self.captions.as_json(), + 'timeframes': self.timeframes.as_json(), + 'timeframe_stats': self.timeframe_stats.data, + 'entities': self.entities.as_json() + } report = json.dumps(json_obj, indent=2) if outfile is None: return report @@ -631,17 +627,17 @@ def __init__(self, summary): f'>>> PROPS {list(doc.properties.keys())}', f'>>> TEXT ' + text.replace("\n", "")[:100], f'>>> ANCHORS {doc.anchors}') - if 'time-offsets' in doc.anchors: + if 'time-offsets' in doc.anchors and 'representatives' in doc.anchors: # For older LLava-style captions # http://apps.clams.ai/llava-captioner/v1.2-6-gc824c97 - p1, p2 = doc.anchors['time-offsets'] - if 'representatives' in doc.anchors: - tp_id = doc.anchors["representatives"][0] - tp = summary.graph.get_node(tp_id) - self.captions.append( - { 'identifier': doc.identifier, - 'time-point': tp.properties['timePoint'], - 'text': text }) + # NOTE: probably obsolete, at least the link above is dead + tp_id = doc.anchors["representatives"][0] + tp = summary.graph.get_node(tp_id) + if tp is not None: + self.captions.append( + { 'identifier': doc.identifier, + 'time-point': tp.properties['timePoint'], + 'text': text }) if 'time-point' in doc.anchors: # For newer SmolVLM-style captions # http://apps.clams.ai/smolvlm2-captioner