Skip to content
Oeiuwq Faith Blog OpenSource Porfolio

haileyok/at-kafka

A small service for putting AT firehose events onto Kafka

haileyok/at-kafka.json
{
"createdAt": "2025-11-26T01:34:07Z",
"defaultBranch": "main",
"description": "A small service for putting AT firehose events onto Kafka",
"fullName": "haileyok/at-kafka",
"homepage": null,
"language": "Go",
"name": "at-kafka",
"pushedAt": "2026-01-02T01:27:18Z",
"stargazersCount": 4,
"topics": [],
"updatedAt": "2026-01-02T01:27:22Z",
"url": "https://github.com/haileyok/at-kafka"
}

A small service that receives events from the AT firehose and produces them to Kafka. Supports standard JSON outputs as well as Osprey formatted events.

Additionally, at-kafka supports subscribing to Tap if youare attempting to perform a network backfill.

The included docker-compose.yml provides a complete local stack. Edit the environment variables in the file to customize:

environment:
# For relay mode
ATKAFKA_RELAY_HOST: "wss://bsky.network" # ATProto relay to subscribe to for events
# For tap mode
ATKAFKA_TAP_HOST: "ws://localhost:2480" # Tap websocket host to subscribe to for events
ATKAFKA_DISABLE_ACKS: false # Whether to disable sending of acks to Tap
# Kafka configuration
ATKAFKA_BOOTSTRAP_SERVERS: "kafka:29092" # Kafka bootstrap servers, comma separated
ATKAFKA_OUTPUT_TOPIC: "atproto-events" # The output topic for events
ATKAFKA_OSPREY_COMPATIBLE: false # Whether to produce Osprey-compatible events
# Match only Blacksky PDS users
ATKAFKA_MATCHED_SERVICES: "blacksky.app" # A comma-separated list of PDSes to emit events for
# OR ignore anyone on Bluesky PBC PDSes
ATKAFKA_IGNORED_SERVICES: "*.bsky.network" # OR a comma-separated list of PDSes to _not_ emit events for
# Match only Teal.fm records
ATKAFKA_MATCHED_COLLECTIONS: "fm.teal.*" # A comma-separated list of collections to emit events for
# OR ignore all Bluesky records
ATKAFKA_IGNORED_COLLECTIONS: "app.bsky.*" # OR a comma-separated list of collections to ignore events for

Then start:

Terminal window
# For normal mode
docker compose up -d
# For tap mode
docker compose -f docker-compose.tap.yml up -d

Events are structured similarly to the raw AT Protocol firehose, with one key difference: commit events are split into individual operation events.

{
"did": "did:plc:...",
"timestamp": "2024-01-01T12:00:00.000Z",
"operation": {
"action": "create",
"collection": "app.bsky.feed.post",
"rkey": "some-rkey",
"uri": "at://did:plc:123/app.bsky.feed.post/some-rkey",
"cid": "bafyrei...",
"path": "app.bsky.feed.post/...",
"record": {
"text": "Hello world!",
"$type": "app.bsky.feed.post",
"createdAt": "2024-01-01T12:00:00.000Z"
}
}
}
{
"did": "did:plc:...",
"timestamp": "2024-01-01T12:00:00.000Z",
"account": {
"active": true,
"seq": 12345,
"status": "active"
}
}
{
"did": "did:plc:...",
"timestamp": "2024-01-01T12:00:00.000Z",
"identity": {
"seq": 12345,
"handle": "user.bsky.social"
}
}

When --osprey-compatible is enabled, events are wrapped in the Osprey event format:

{
"data": {
"action_name": "operation#create",
"action_id": 1234567890,
"data": {
"did": "did:plc:...",
"timestamp": "2024-01-01T12:00:00.000Z",
"operation": { ... }
},
"timestamp": "2024-01-01T12:00:00.000Z",
"secret_data": {},
"encoding": "UTF8"
},
"send_time": "2024-01-01T12:00:00Z"
}

Action names in Osprey mode:

  • operation#create - Record creation
  • operation#update - Record update
  • operation#delete - Record deletion
  • account - Account status changes
  • identity - Identity/handle changes
  • info - Informational messages

The service exposes Prometheus metrics on the default metrics port.

  • atkafka_handled_events - Total events that are received on the firehose and handled
  • atkafka_produced_events - Total messages that are output on the bus
  • atkafka_plc_requests - Total number of PLC requests that were made, if applicable, and whether they were cached
  • atkafka_api_requests - Total number of API requests that were made, if applicable, and whether they were cached
  • atkafka_cache_size - The size of the PLC and API caches
  • atkafka_acks_sent - Total acks that were sent to Tap, if applicable