API Reference

Here’s an API reference for some of the most central components in aw_core, aw_client and aw_server. These are the most important packages in ActivityWatch. A lot of it currently lacks proper docstrings, but it’s a start.

aw_core

aw_core.models

class aw_core.models.Event(id: Union[int, str, None] = None, timestamp: Union[datetime.datetime, str] = None, duration: Union[datetime.timedelta, int, float] = 0, data: Dict[str, Any] = {})[source]

Used to represents an event.

data
duration
id
timestamp
to_json_dict() → dict[source]

Useful when sending data over the wire. Any mongodb interop should not use do this as it accepts datetimes.

to_json_str() → str[source]

aw_core.log

aw_core.log.get_latest_log_file(name, testing=False) → Optional[str][source]

Returns the filename of the last logfile with name. Useful when you want to read the logfile of another ActivityWatch service.

aw_core.log.get_log_file_path() → Optional[str][source]

DEPRECATED: Use get_latest_log_file instead.

aw_core.log.setup_logging(name: str, testing=False, verbose=False, log_stderr=True, log_file=False, log_file_json=False)[source]

aw_core.dirs

aw_core.dirs.ensure_path_exists(path: str) → None[source]
aw_core.dirs.get_cache_dir(module_name: Optional[str]) → str[source]
aw_core.dirs.get_config_dir(module_name: Optional[str]) → str[source]
aw_core.dirs.get_data_dir(module_name: Optional[str]) → str[source]
aw_core.dirs.get_log_dir(module_name: Optional[str]) → str[source]

aw_client

The aw_client package contains a programmer-friendly wrapper around the servers REST API.

class aw_client.ActivityWatchClient(client_name: str = 'unknown', testing=False, host=None, port=None, protocol='http')[source]
connect()[source]
create_bucket(bucket_id: str, event_type: str, queued=False)[source]
delete_bucket(bucket_id: str)[source]
disconnect()[source]
export_all() → dict[source]
export_bucket(bucket_id) → dict[source]
get_buckets()[source]
get_eventcount(bucket_id: str, limit: int = 100, start: datetime.datetime = None, end: datetime.datetime = None) → int[source]
get_events(bucket_id: str, limit: int = 100, start: datetime.datetime = None, end: datetime.datetime = None) → List[aw_core.models.Event][source]
get_info()[source]

Returns a dict currently containing the keys ‘hostname’ and ‘testing’.

heartbeat(bucket_id: str, event: aw_core.models.Event, pulsetime: float, queued: bool = False, commit_interval: Optional[float] = None) → Optional[aw_core.models.Event][source]
Args:
bucket_id: The bucket_id of the bucket to send the heartbeat to event: The actual heartbeat event pulsetime: The maximum amount of time in seconds since the last heartbeat to be merged with the previous heartbeat in aw-server queued: Use the aw-client queue feature to queue events if client loses connection with the server commit_interval: Override default pre-merge commit interval
NOTE: This endpoint can use the failed requests retry queue.
This makes the request itself non-blocking and therefore the function will in that case always returns None.
import_bucket(bucket: dict) → None[source]
insert_event(bucket_id: str, event: aw_core.models.Event) → aw_core.models.Event[source]
insert_events(bucket_id: str, events: List[aw_core.models.Event]) → None[source]
query(query: str, start: datetime.datetime, end: datetime.datetime, name: str = None, cache: bool = False) → Union[int, dict][source]
send_event(bucket_id: str, event: aw_core.models.Event)[source]
send_events(bucket_id: str, events: List[aw_core.models.Event])[source]
setup_bucket(bucket_id: str, event_type: str)[source]

aw_server

aw_server.api

The ServerAPI class contains the basic API methods, these methods are primarily called from RPC layers such as the one found in aw_server.rest.

class aw_server.api.ServerAPI(db, testing)[source]
create_bucket(bucket_id: str, event_type: str, client: str, hostname: str, created: Optional[datetime.datetime] = None) → bool[source]

Create bucket. Returns True if successful, otherwise false if a bucket with the given ID already existed.

create_events(bucket_id: str, events: List[aw_core.models.Event]) → Optional[aw_core.models.Event][source]

Create events for a bucket. Can handle both single events and multiple ones.

Returns the inserted event when a single event was inserted, otherwise None.

delete_bucket(bucket_id: str) → None[source]

Delete a bucket

delete_event(bucket_id: str, event_id) → bool[source]

Delete a single event from a bucket

export_all() → Dict[str, Any][source]

Exports all buckets and their events to a format consistent across versions

export_bucket(bucket_id: str) → Dict[str, Any][source]

Export a bucket to a dataformat consistent across versions, including all events in it.

get_bucket_metadata(bucket_id: str) → Dict[str, Any][source]

Get metadata about bucket.

get_buckets() → Dict[str, Dict[KT, VT]][source]

Get dict {bucket_name: Bucket} of all buckets

get_eventcount(bucket_id: str, start: datetime.datetime = None, end: datetime.datetime = None) → int[source]

Get eventcount from a bucket

get_events(bucket_id: str, limit: int = -1, start: datetime.datetime = None, end: datetime.datetime = None) → List[aw_core.models.Event][source]

Get events from a bucket

get_info() → Dict[str, Dict[KT, VT]][source]

Get server info

get_log()[source]

Get the server log in json format

heartbeat(bucket_id: str, heartbeat: aw_core.models.Event, pulsetime: float) → aw_core.models.Event[source]

Heartbeats are useful when implementing watchers that simply keep track of a state, how long it’s in that state and when it changes. A single heartbeat always has a duration of zero.

If the heartbeat was identical to the last (apart from timestamp), then the last event has its duration updated. If the heartbeat differed, then a new event is created.

Such as:
  • Active application and window title - Example: aw-watcher-window
  • Currently open document/browser tab/playing song - Example: wakatime - Example: aw-watcher-web - Example: aw-watcher-spotify
  • Is the user active/inactive? Send an event on some interval indicating if the user is active or not. - Example: aw-watcher-afk

Inspired by: https://wakatime.com/developers#heartbeats

import_all(buckets: Dict[str, Any])[source]
import_bucket(bucket_data: Any)[source]
query2(name, query, timeperiods, cache)[source]