haileyok/at-kafka
A small service for putting AT firehose events onto Kafka
{ "createdAt": "2025-11-26T01:34:07Z", "defaultBranch": "main", "description": "A small service for putting AT firehose events onto Kafka", "fullName": "haileyok/at-kafka", "homepage": null, "language": "Go", "name": "at-kafka", "pushedAt": "2026-01-02T01:27:18Z", "stargazersCount": 4, "topics": [], "updatedAt": "2026-01-02T01:27:22Z", "url": "https://github.com/haileyok/at-kafka"}at-kafka
Section titled “at-kafka”A small service that receives events from the AT firehose and produces them to Kafka. Supports standard JSON outputs as well as Osprey formatted events.
Additionally, at-kafka supports subscribing to Tap if youare attempting to perform a network backfill.
Docker Compose
Section titled “Docker Compose”The included docker-compose.yml provides a complete local stack. Edit the environment variables in the file to customize:
environment: # For relay mode ATKAFKA_RELAY_HOST: "wss://bsky.network" # ATProto relay to subscribe to for events
# For tap mode ATKAFKA_TAP_HOST: "ws://localhost:2480" # Tap websocket host to subscribe to for events ATKAFKA_DISABLE_ACKS: false # Whether to disable sending of acks to Tap
# Kafka configuration ATKAFKA_BOOTSTRAP_SERVERS: "kafka:29092" # Kafka bootstrap servers, comma separated ATKAFKA_OUTPUT_TOPIC: "atproto-events" # The output topic for events ATKAFKA_OSPREY_COMPATIBLE: false # Whether to produce Osprey-compatible events
# Match only Blacksky PDS users ATKAFKA_MATCHED_SERVICES: "blacksky.app" # A comma-separated list of PDSes to emit events for # OR ignore anyone on Bluesky PBC PDSes ATKAFKA_IGNORED_SERVICES: "*.bsky.network" # OR a comma-separated list of PDSes to _not_ emit events for
# Match only Teal.fm records ATKAFKA_MATCHED_COLLECTIONS: "fm.teal.*" # A comma-separated list of collections to emit events for # OR ignore all Bluesky records ATKAFKA_IGNORED_COLLECTIONS: "app.bsky.*" # OR a comma-separated list of collections to ignore events forThen start:
# For normal modedocker compose up -d
# For tap modedocker compose -f docker-compose.tap.yml up -dEvent Structure
Section titled “Event Structure”Standard Mode
Section titled “Standard Mode”Events are structured similarly to the raw AT Protocol firehose, with one key difference: commit events are split into individual operation events.
Operation Event
Section titled “Operation Event”{ "did": "did:plc:...", "timestamp": "2024-01-01T12:00:00.000Z", "operation": { "action": "create", "collection": "app.bsky.feed.post", "rkey": "some-rkey", "uri": "at://did:plc:123/app.bsky.feed.post/some-rkey", "cid": "bafyrei...", "path": "app.bsky.feed.post/...", "record": { "text": "Hello world!", "$type": "app.bsky.feed.post", "createdAt": "2024-01-01T12:00:00.000Z" } }}Account Event
Section titled “Account Event”{ "did": "did:plc:...", "timestamp": "2024-01-01T12:00:00.000Z", "account": { "active": true, "seq": 12345, "status": "active" }}Identity Event
Section titled “Identity Event”{ "did": "did:plc:...", "timestamp": "2024-01-01T12:00:00.000Z", "identity": { "seq": 12345, "handle": "user.bsky.social" }}Osprey-Compatible Mode
Section titled “Osprey-Compatible Mode”When --osprey-compatible is enabled, events are wrapped in the Osprey event format:
{ "data": { "action_name": "operation#create", "action_id": 1234567890, "data": { "did": "did:plc:...", "timestamp": "2024-01-01T12:00:00.000Z", "operation": { ... } }, "timestamp": "2024-01-01T12:00:00.000Z", "secret_data": {}, "encoding": "UTF8" }, "send_time": "2024-01-01T12:00:00Z"}Action names in Osprey mode:
operation#create- Record creationoperation#update- Record updateoperation#delete- Record deletionaccount- Account status changesidentity- Identity/handle changesinfo- Informational messages
Monitoring
Section titled “Monitoring”The service exposes Prometheus metrics on the default metrics port.
atkafka_handled_events- Total events that are received on the firehose and handledatkafka_produced_events- Total messages that are output on the busatkafka_plc_requests- Total number of PLC requests that were made, if applicable, and whether they were cachedatkafka_api_requests- Total number of API requests that were made, if applicable, and whether they were cachedatkafka_cache_size- The size of the PLC and API cachesatkafka_acks_sent- Total acks that were sent to Tap, if applicable