API Reference

Here’s an API reference for some of the most central components in aw_core, aw_client and aw_server. These are the most important packages in ActivityWatch. A lot of it currently lacks proper docstrings, but it’s a start.

aw_core

aw_core.models

class aw_core.models.Event(id: Union[int, str, NoneType] = None, timestamp: Union[datetime.datetime, str] = None, duration: Union[datetime.timedelta, int, float] = 0, data: Dict[str, Any] = {})[source]

Used to represents an event.

data
duration
id
timestamp
to_json_dict() → dict[source]

Useful when sending data over the wire. Any mongodb interop should not use do this as it accepts datetimes.

to_json_str() → str[source]

aw_core.log

aw_core.log.get_latest_log_file(name, testing=False) → Union[str, NoneType][source]

Returns the filename of the last logfile with name. Useful when you want to read the logfile of another ActivityWatch service.

aw_core.log.get_log_file_path() → Union[str, NoneType][source]

DEPRECATED: Use get_latest_log_file instead.

aw_core.log.setup_logging(name: str, testing=False, verbose=False, log_stderr=True, log_file=False, log_file_json=False)[source]

aw_core.dirs

aw_core.dirs.ensure_path_exists(path: str) → None[source]
aw_core.dirs.get_config_dir(module_name: Union[str, NoneType]) → str[source]
aw_core.dirs.get_data_dir(module_name: Union[str, NoneType]) → str[source]
aw_core.dirs.get_log_dir(module_name: Union[str, NoneType]) → str[source]

aw_client

The aw_client package contains a programmer-friendly wrapper around the servers REST API.

class aw_client.ActivityWatchClient(client_name: str = 'unknown', testing=False, host=None)[source]

A handy wrapper around the aw-server REST API. The recommended way of interacting with the server.

Can be used with a with-statement as an alternative to manually calling connect and disconnect in a try-finally clause.

Example:
from aw_client import ActivityWatchClient

# We'll run with testing=True so we don't mess up any production instance.
# Make sure you've started aw-server with the `--testing` flag as well.
client = ActivityWatchClient("test-client", testing=True)

# Make the bucket_id unique for both the client and host
# The convention is to use client-name_hostname as bucket name,
# but if you have multiple buckets in one client you can add a
# suffix such as client-name-event-type or similar
bucket_id = "{}_{}".format("test-client-bucket", client.hostname)
# A short and descriptive event type name
# Will be used by visualizers (such as aw-webui) to detect what type and format the events are in
# Can for example be "currentwindow", "afkstatus", "ping" or "currentsong"
event_type = "dummydata"

# First we need a bucket to send events/heartbeats to.
# If the bucket already exists aw-server will simply return 304 NOT MODIFIED,
# so run this every time the clients starts up to verify that the bucket exists.
# If the client was unable to connect to aw-server or something failed
# during the creation of the bucket, an exception will be raised.
client.create_bucket(bucket_id, event_type="test")

# Asynchronous loop example
with client:
    # This context manager starts the queue dispatcher thread and stops it when done, always use it when setting queued=True.
    # Alternatively you can use client.connect() and client.disconnect() instead if you prefer that

    # Create a sample event to send as heartbeat
    heartbeat_data = {"label": "heartbeat"}
    now = datetime.now(timezone.utc)
    heartbeat_event = Event(timestamp=now, data=heartbeat_data)

    # Now we can send some events via heartbeats
    # This will send one heartbeat every second 5 times
    sleeptime = 1
    for i in range(5):
        # The duration between the heartbeats will be less than pulsetime, so they will get merged.
        # TODO: Make a section with an illustration on how heartbeats work and insert a link here
        print("Sending heartbeat {}".format(i))
        client.heartbeat(bucket_id, heartbeat_event, pulsetime=sleeptime+1, queued=True)

        # Sleep a second until next heartbeat
        sleep(sleeptime)

        # Update timestamp for next heartbeat
        heartbeat_event.timestamp = datetime.now(timezone.utc)

    # Give the dispatcher thread some time to complete sending the last events.
    # If we don't do this the events might possibly queue up and be sent the
    # next time the client starts instead.
    sleep(1)

# Synchronous example, insert an event
event_data = {"label": "non-heartbeat event"}
now = datetime.now(timezone.utc)
event = Event(timestamp=now, data=event_data)
inserted_event = client.insert_event(bucket_id, event)

# The event returned from insert_event has been assigned an id by aw-server
assert inserted_event.id is not None

# Fetch last 10 events from bucket
# Should be two events in order of newest to oldest
# - "shutdown" event with a duration of 0
# - "heartbeat" event with a duration of 5*sleeptime
events = client.get_events(bucket_id=bucket_id, limit=10)
print(events)

# Now lets clean up after us.
# You probably don't want this in your watchers though!
client.delete_bucket(bucket_id)

# If something doesn't work, run aw-server with --verbose to see why some request doesn't go through
# Good luck with writing your own watchers :-)
connect()[source]
create_bucket(bucket_id: str, event_type: str, queued=False)[source]
delete_bucket(bucket_id: str)[source]
disconnect()[source]
get_buckets()[source]
get_eventcount(bucket_id: str, limit: int = 100, start: datetime.datetime = None, end: datetime.datetime = None) → int[source]
get_events(bucket_id: str, limit: int = 100, start: datetime.datetime = None, end: datetime.datetime = None) → List[aw_core.models.Event][source]
get_info()[source]

Returns a dict currently containing the keys ‘hostname’ and ‘testing’.

heartbeat(bucket_id: str, event: aw_core.models.Event, pulsetime: float, queued: bool = False, commit_interval: Union[float, NoneType] = None) → Union[aw_core.models.Event, NoneType][source]
Args:
bucket_id: The bucket_id of the bucket to send the heartbeat to event: The actual heartbeat event pulsetime: The maximum amount of time in seconds since the last heartbeat to be merged with the previous heartbeat in aw-server queued: Use the aw-client queue feature to queue events if client loses connection with the server commit_interval: Override default pre-merge commit interval
NOTE: This endpoint can use the failed requests retry queue.
This makes the request itself non-blocking and therefore the function will in that case always returns None.
insert_event(bucket_id: str, event: aw_core.models.Event) → aw_core.models.Event[source]
insert_events(bucket_id: str, events: List[aw_core.models.Event]) → None[source]
query(query: str, start: datetime.datetime, end: datetime.datetime, name: str = None, cache: bool = False) → Union[int, dict][source]
send_event(bucket_id: str, event: aw_core.models.Event)[source]
send_events(bucket_id: str, events: List[aw_core.models.Event])[source]
setup_bucket(bucket_id: str, event_type: str)[source]

aw_server

aw_server.api

The ServerAPI class contains the basic API methods, these methods are primarily called from RPC layers such as the one found in aw_server.rest.

class aw_server.api.ServerAPI(db, testing)[source]
create_bucket(bucket_id: str, event_type: str, client: str, hostname: str) → bool[source]

Create bucket. Returns True if successful, otherwise false if a bucket with the given ID already existed.

create_events(bucket_id: str, events: List[aw_core.models.Event]) → Union[aw_core.models.Event, NoneType][source]

Create events for a bucket. Can handle both single events and multiple ones.

Returns the inserted event when a single event was inserted, otherwise None.

delete_bucket(bucket_id: str) → None[source]

Delete a bucket

delete_event(bucket_id: str, event_id) → bool[source]

Delete a single event from a bucket

export_all() → Dict[str, dict][source]

Exports all buckets and their events to a format consistent across versions

export_bucket(bucket_id: str) → Dict[str, Any][source]

Export a bucket to a dataformat consistent across versions, including all events in it.

get_bucket_metadata(bucket_id: str) → Dict[str, Any][source]

Get metadata about bucket.

get_buckets() → Dict[str, Dict][source]

Get dict {bucket_name: Bucket} of all buckets

get_eventcount(bucket_id: str, start: datetime.datetime = None, end: datetime.datetime = None) → int[source]

Get eventcount from a bucket

get_events(bucket_id: str, limit: int = -1, start: datetime.datetime = None, end: datetime.datetime = None) → List[aw_core.models.Event][source]

Get events from a bucket

get_info() → Dict[str, Dict][source]

Get server info

get_log()[source]

Get the server log in json format

heartbeat(bucket_id: str, heartbeat: aw_core.models.Event, pulsetime: float) → aw_core.models.Event[source]

Heartbeats are useful when implementing watchers that simply keep track of a state, how long it’s in that state and when it changes. A single heartbeat always has a duration of zero.

If the heartbeat was identical to the last (apart from timestamp), then the last event has its duration updated. If the heartbeat differed, then a new event is created.

Such as:
  • Active application and window title - Example: aw-watcher-window
  • Currently open document/browser tab/playing song - Example: wakatime - Example: aw-watcher-web - Example: aw-watcher-spotify
  • Is the user active/inactive? Send an event on some interval indicating if the user is active or not. - Example: aw-watcher-afk

Inspired by: https://wakatime.com/developers#heartbeats

query2(name, query, timeperiods, cache)[source]