Skip to content

ACLEW metrics with standard deviation #526

@LawrenceBorst

Description

@LawrenceBorst

Is your feature request related to a problem? Please describe.
Camila asked this week for ACLEW metrics, but including standard deviations for certain columns, like vocal counts.

Her idea was to start with a segments dataframe and derive the from there. The code would look like, after you produce the segments dataframe:

"""
Takes the output from `get_segments_dataframe` and gathers ACLEW metrics over it
"""

from datetime import timedelta
from pathlib import Path
from typing import Callable, List

import click
import numpy as np
import pandas as pd
from ChildProject.annotations import TimeInterval, AnnotationManager
from ChildProject.projects import ChildProject

CURRENT_DIR: Path = Path(__file__).parent


@click.command()
@click.option(
    "--dataset",
    type=str,
    required=True,
    help="Dataset name",
)
@click.option(
    "--input-file",
    type=str,
    required=True,
    help="Input file name",
)
@click.option(
    "--output-file",
    type=str,
    required=True,
    help="Output file name",
)
@click.option(
    "--interval",
    type=int,
    required=True,
    help="Interval in minutes",
)
def aclew_per_segment(dataset: str, input_file: str, output_file: str, interval: int) -> None:
    dataset_dir = _get_dataset_dir(dataset)
    project = ChildProject(dataset_dir)

    am = AnnotationManager(project)
    am.read()
    recordings: pd.DataFrame = project.recordings
    
    segments = pd.read_csv(input_file, parse_dates=["onset_time", "offset_time"])

    results = []

    for interval in _get_intervals_for_a_day(   # TODO: just get all intervals over the day
        _m_to_h(interval), min=segments["onset_time"].min().floor('h'), max=segments["offset_time"].max().ceil('h')
    ):
        # I initially used am.get_within_time_range() but this clips out segments at the edges,
        # which by inspection turned out to be a real problem for short intervals e.g., 3 minutes
        # NOTE: also, this ChildProject function doesn't seem to work as expected(?)
        segments_in_time_range = _get_within_time_range(segments, interval)

        if segments_in_time_range.empty:
            continue

        for recording_filename, group_data in segments_in_time_range.groupby("recording_filename"):
            speaker_durations = group_data.groupby("speaker_type")["duration"].sum()
            speaker_durations_std = group_data.groupby("speaker_type")["duration"].std()
            speaker_counts = group_data.groupby("speaker_type").size()
            
            result = {
                "recording_filename": recording_filename,
                "period_start": interval.start.strftime("%H:%M:%S"),
                "period_end": interval.stop.strftime("%H:%M:%S"),
                "duration_vtc": _get_time_vtc(recordings, recording_filename, interval)
            }
            
            for speaker in ["CHI", "OCH", "FEM", "MAL"]:
                speaker_name = speaker.lower()
                result[f"tot_voc_dur_{speaker_name}"] = speaker_durations.get(speaker, np.float64(0.0)).round(1)
                result[f"std_voc_dur_{speaker_name}"] = speaker_durations_std.get(speaker, np.float64(0.0)).round(1)
                result[f"voc_{speaker_name}"] = speaker_counts.get(speaker, np.float64(0.0)).round(1)
            
            results.append(result)

    results_df = pd.DataFrame(results)
    results_df = results_df.sort_values(["recording_filename", "period_start"])
    results_df.to_csv(output_file, index=False)

    return


def _get_time_vtc(recordings: pd.DataFrame, recording_filename: str, interval: TimeInterval) -> float:
    recording: pd.Series = recordings[recordings["recording_filename"] == recording_filename].iloc[0]

    start_time = pd.to_datetime(recording["start_time"]).to_pydatetime()
    end_time = (pd.to_datetime(recording["start_time"]) + pd.Timedelta(milliseconds=recording["duration"])).to_pydatetime()

    recording_interval = TimeInterval(start=start_time, stop=end_time)

    clipped_start_time = max(interval.start, recording_interval.start)
    clipped_end_time = min(interval.stop, recording_interval.stop)

    return (clipped_end_time - clipped_start_time).total_seconds() * 1000



def _get_intervals_for_a_day(
    interval_len_h: float, min: pd.Timestamp, max: pd.Timestamp
) -> List[TimeInterval]:
    timestamps: List[pd.Timestamp] = pd.date_range(
        start=pd.Timestamp(1900, 1, 1, min.hour, min.minute, min.second),
        end=pd.Timestamp(1900, 1, 1, max.hour, max.minute, max.second + 1),
        freq=timedelta(days=0, hours=interval_len_h),
    )

    return [
        TimeInterval(
            start=timestamps[idx].to_pydatetime(),
            stop=timestamps[idx + 1].to_pydatetime(),
        )
        for idx in range(len(timestamps) - 1)
    ]


def _get_within_time_range(
    segments: pd.DataFrame, interval: TimeInterval
) -> pd.DataFrame:
    """
    The ChildProject function doesn't seem to be correctly implemented. Besides, it clips segments at the edge,
    which I don't want for the purpose of this script
    This function is also significantly faster and from profiling it's clear that
    the ChildProject routine was the main performance bottleneck in running this script
    """
    segments = segments[
        (
            segments["offset_time"].map(lambda t: t.to_pydatetime().time())
            >= interval.start.time()
        )
        & (
            segments["onset_time"].map(lambda t: t.to_pydatetime().time())
            <= interval.stop.time()
        )
    ]

    segments = segments.apply(get_row_callback_min(interval), axis=1)
    segments = segments.apply(get_row_callback_max(interval), axis=1)

    return segments


def get_row_callback_min(
    time_interval: TimeInterval,
) -> Callable[[pd.Series], pd.Series]:
    def row_callback(row: pd.Series) -> bool:
        onset_time: pd.Timestamp = row["onset_time"]

        if onset_time.to_pydatetime().time() <= time_interval.start.time():
            row["onset_time"] = pd.Timestamp(
                year=onset_time.year,
                month=onset_time.month,
                day=onset_time.day,
                hour=time_interval.start.hour,
                minute=time_interval.start.minute,
                second=time_interval.start.second,
            )

        return row

    return row_callback


def get_row_callback_max(
    time_interval: TimeInterval,
) -> Callable[[pd.Series], pd.Series]:
    def row_callback(row: pd.Series) -> bool:
        offset_time: pd.Timestamp = row["offset_time"]

        if offset_time.to_pydatetime().time() >= time_interval.stop.time():
            row["offset_time"] = pd.Timestamp(
                year=offset_time.year,
                month=offset_time.month,
                day=offset_time.day,
                hour=time_interval.stop.hour,
                minute=time_interval.stop.minute,
                second=time_interval.stop.second,
            )

        return row

    return row_callback


def _m_to_h(minutes: int) -> float:
    return minutes / 60


def _get_dataset_dir(dataset: str) -> Path:
    return (CURRENT_DIR / ".." / "datasets" / dataset).resolve()


if __name__ == "__main__":
    aclew_per_segment()

(maybe a bit shorter because it's rolling its own get_within_time_range() function, because am.get_within_time_range() seemed to have some quirks + doesn't return VTC data like speaker type.

But as you can see it's a lot of code for doing something that would be much simpler if we just change the AclewMetrics class. I'd rather add this as a feature than create a script or use a fork/local branch of ChildProject.

Describe the solution you'd like
I think it should be an optional parameter to the metrics command, like --include-std

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions