import logging
from datetime import timedelta
from copy import deepcopy
from typing import List
from aw_core.models import Event
logger = logging.getLogger(__name__)
[docs]def flood(events: List[Event], pulsetime: float = 5) -> List[Event]:
"""
Takes a list of events and "floods" any empty space between events by extending one of the surrounding events to cover the empty space.
For more details on flooding, see this issue:
- https://github.com/ActivityWatch/activitywatch/issues/124
"""
# Originally written in aw-research: https://github.com/ActivityWatch/aw-analysis/blob/7da1f2cd8552f866f643501de633d74cdecab168/aw_analysis/flood.py
# NOTE: This algorithm has a lot of smaller details that need to be
# carefully considered by anyone wishing to edit it, see:
# - https://github.com/ActivityWatch/aw-core/pull/73
events = deepcopy(events)
events = sorted(events, key=lambda e: e.timestamp)
# If negative gaps are smaller than this, prune them to become zero
negative_gap_trim_thres = timedelta(seconds=0.1)
warned_about_negative_gap_safe = False
warned_about_negative_gap_unsafe = False
for e1, e2 in zip(events[:-1], events[1:]):
gap = e2.timestamp - (e1.timestamp + e1.duration)
if not gap:
continue
# Sanity check in case events overlap
if gap < timedelta(0) and e1.data == e2.data:
# Events with negative gap but same data can safely be merged
start = min(e1.timestamp, e2.timestamp)
end = max(e1.timestamp + e1.duration, e2.timestamp + e2.duration)
e1.timestamp, e1.duration = start, (end - start)
e2.timestamp, e2.duration = end, timedelta(0)
if not warned_about_negative_gap_safe:
logger.warning(
"Gap was of negative duration but could be safely merged ({}s). This message will only show once per batch.".format(
gap.total_seconds()
)
)
warned_about_negative_gap_safe = True
elif gap < -negative_gap_trim_thres and not warned_about_negative_gap_unsafe:
# Events with negative gap but differing data cannot be merged safely
logger.warning(
"Gap was of negative duration and could NOT be safely merged ({}s). This warning will only show once per batch.".format(
gap.total_seconds()
)
)
warned_about_negative_gap_unsafe = True
# logger.warning("Event 1 (id {}): {} {}".format(e1.id, e1.timestamp, e1.duration))
# logger.warning("Event 2 (id {}): {} {}".format(e2.id, e2.timestamp, e2.duration))
elif -negative_gap_trim_thres < gap <= timedelta(seconds=pulsetime):
e2_end = e2.timestamp + e2.duration
# Prioritize flooding from the longer event
if e1.duration >= e2.duration:
if e1.data == e2.data:
# Extend e1 to the end of e2
# Set duration of e2 to zero (mark to delete)
e1.duration = e2_end - e1.timestamp
e2.timestamp = e2_end
e2.duration = timedelta(0)
else:
# Extend e1 to the start of e2
e1.duration = e2.timestamp - e1.timestamp
else:
if e1.data == e2.data:
# Extend e2 to the start of e1, discard e1
e2.timestamp = e1.timestamp
e2.duration = e2_end - e2.timestamp
e1.duration = timedelta(0)
else:
# Extend e2 backwards to end of e1
e2.timestamp = e1.timestamp + e1.duration
e2.duration = e2_end - e2.timestamp
# Filter out remaining zero-duration events
events = [e for e in events if e.duration > timedelta(0)]
return events