Skip to main content

Firehose Ingestion Pipeline

The firehose ingestion pipeline runs as a separate process (src/indexer.ts), independent of the API server (src/index.ts). This two-process architecture matches Chive's design: the indexer can crash, restart, or scale independently without affecting query serving.

The pipeline subscribes to the ATProto relay's event stream, filters for Layers-relevant commits, validates records against their Lexicon schemas, and routes them through a dependency-aware job queue into PostgreSQL, Elasticsearch, and Neo4j.

ATProto Firehose Overview

The AT Protocol relay aggregates repository events from every PDS in the network and broadcasts them over a single WebSocket stream. The appview subscribes to this stream using the com.atproto.sync.subscribeRepos XRPC method. Every event on the firehose falls into one of four categories:

  • Commit events: repository mutations (create, update, delete) containing CAR-encoded blocks with the actual record data
  • Handle events: DID-to-handle mapping changes
  • Identity events: DID document updates (e.g., key rotation, PDS migration)
  • Account events: account status changes (activation, deactivation, suspension)

Each event carries a monotonically increasing sequence number (cursor). The appview persists this cursor so it can resume from where it left off after a restart, without reprocessing the entire history.

Commit events are the primary interest of the Layers appview. Each commit contains one or more operations (creates, updates, or deletes) on records within a user's repository. The operations include the collection NSID, the record key (rkey), and (for creates and updates) the record value encoded as a DAG-CBOR block within a CAR file.

Component Architecture

The indexer process decomposes into 8 components in src/services/indexing/, matching Chive's granular pipeline:

ComponentFileResponsibility
FirehoseConsumerfirehose-consumer.tsWebSocket subscription, AsyncIterable event stream
EventFilterevent-filter.tsNSID filtering via Set<string> lookup
CommitHandlercommit-handler.tsCAR file parsing, DAG-CBOR record extraction
EventProcessorevent-processor.tsRoutes validated events to storage backends
CursorManagercursor-manager.tsBatch cursor persistence (every N events or T seconds)
EventQueueevent-queue.tsBullMQ queue dispatch with backpressure control
DLQHandlerdlq-handler.tsDead letter queue with AlertService integration
ErrorClassifiererror-classifier.tsClassifies errors as retryable (transient) vs permanent

The initialization flow in src/indexer.ts:

// src/indexer.ts — separate process from the API server
const pg = await createPool(config.database)
const redis = new Redis(config.redis)
const es = new Client(config.elasticsearch)
const neo4j = neo4jDriver(config.neo4j)

const cursorManager = new CursorManager(pg, { batchSize: 1000, flushIntervalMs: 5000 })
const errorClassifier = new ErrorClassifier()
const dlqHandler = new DLQHandler(pg, alertService)
const eventProcessor = new EventProcessor({ pg, es, neo4j, dlqHandler, errorClassifier })
const eventQueue = new EventQueue(redis, { maxDepth: 10_000 })
const eventFilter = new EventFilter(LAYERS_NSIDS)
const commitHandler = new CommitHandler()

const consumer = new FirehoseConsumer(config.relayUrl, cursorManager, {
eventFilter,
commitHandler,
eventProcessor,
eventQueue,
})

await consumer.start()

Subscription Management

Connection

The FirehoseConsumer establishes a WebSocket connection to the relay at wss://bsky.network (configurable via LAYERS_RELAY_URL). The subscription request includes the last persisted cursor as a query parameter:

wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos?cursor=12345678

If no cursor is persisted (first run or full rebuild), the consumer starts from cursor 0 to replay the entire history.

Cursor Persistence

The CursorManager persists the current cursor to the firehose_cursor table in PostgreSQL. To avoid excessive write amplification, the cursor is flushed in batches (every 1000 events or every 5 seconds, whichever comes first) rather than on every event. On unclean shutdown, at most a few seconds of events are reprocessed. This is safe because all handlers are idempotent.

-- See Database Design for full schema
CREATE TABLE firehose_cursor (
id INTEGER PRIMARY KEY DEFAULT 1 CHECK (id = 1),
cursor BIGINT NOT NULL,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

Reconnection

Connection drops are expected and handled automatically. The consumer uses a cockatiel circuit breaker with the following parameters:

ParameterValueRationale
Initial delay500msFast retry for transient network blips
Max delay30sCap to avoid long outages
Backoff multiplier2xExponential growth
Jitter+/-25%Prevent thundering herd across replicas
Half-open after60sProbe relay availability

Backpressure

If the BullMQ job queue depth exceeds a configurable threshold (default: 10,000 pending jobs), the consumer pauses the WebSocket connection. The relay buffers events for a limited window; if the pause exceeds the relay's buffer, the consumer reconnects and resumes from its last persisted cursor. This prevents unbounded memory growth during downstream slowdowns.

Event Filtering

The vast majority of firehose events are irrelevant to the Layers appview (Bluesky posts, likes, follows, etc.). The filter stage inspects each commit's operations and extracts only those touching one of the 26 pub.layers.* collection NSIDs:

#Collection NSID
1pub.layers.expression.expression
2pub.layers.segmentation.segmentation
3pub.layers.annotation.annotationLayer
4pub.layers.annotation.clusterSet
5pub.layers.ontology.ontology
6pub.layers.ontology.typeDef
7pub.layers.corpus.corpus
8pub.layers.corpus.membership
9pub.layers.resource.entry
10pub.layers.resource.collection
11pub.layers.resource.collectionMembership
12pub.layers.resource.template
13pub.layers.resource.filling
14pub.layers.resource.templateComposition
15pub.layers.judgment.experimentDef
16pub.layers.judgment.judgmentSet
17pub.layers.judgment.agreementReport
18pub.layers.alignment.alignment
19pub.layers.graph.graphNode
20pub.layers.graph.graphEdge
21pub.layers.graph.graphEdgeSet
22pub.layers.persona.persona
23pub.layers.media.media
24pub.layers.eprint.eprint
25pub.layers.eprint.dataLink
26pub.layers.changelog.entry

The filter is implemented as a Set<string> lookup on the operation's collection field. Non-matching operations are discarded with zero allocation overhead. Matching operations are decoded from DAG-CBOR and passed to the validation stage.

Record Validation

Every extracted record passes through two validation layers before entering the job queue.

Lexicon Schema Validation

The record is validated against its Lexicon JSON schema using @atproto/lexicon. This ensures structural correctness: required fields are present, types match, string formats (AT-URI, DID, datetime) are well-formed, and union discriminators resolve to known variants. Records that fail Lexicon validation are rejected immediately because they represent protocol-level corruption or a version mismatch.

Business Rule Validation (Zod)

Records that pass Lexicon validation are then checked against stricter Zod schemas that encode Layers-specific business rules:

  • Annotation spans must have start < end
  • Segmentation token offsets must be monotonically increasing and within the expression's text length
  • Ontology typeDef records must reference an existing ontology AT-URI
  • Graph edges must reference valid graphNode AT-URIs
  • Enum values must belong to their declared flexible enum domains

Dead Letter Queue

Records that fail either validation stage are routed to the Dead Letter Queue (DLQ) with structured error metadata:

interface DLQEntry {
id: string;
collection: string;
rkey: string;
did: string;
error: {
stage: 'lexicon' | 'zod';
message: string;
path?: string[]; // JSON path to the failing field
expected?: string;
received?: string;
};
rawRecord: unknown; // Original record for debugging
firehoseCursor: number;
timestamp: Date;
}

DLQ entries are stored in PostgreSQL and exposed through an admin API for inspection and replay. The DLQHandler integrates with the AlertService to send notifications when DLQ entries exceed configurable thresholds.

Error Classification

The ErrorClassifier categorizes errors to determine retry behavior:

CategoryExamplesAction
RetryableNetwork timeout, DB connection refused, ES bulk rejectionRe-queue with exponential backoff
PermanentValidation failure, malformed record, unknown collection NSIDRoute to DLQ immediately
DependencyMissing referenced expression, missing ontologyRe-queue with dependency delay

See Background Jobs for DLQ monitoring and reprocessing.

Job Queue Architecture

Validated records are dispatched to BullMQ queues organized by namespace. Each queue can be scaled independently by adjusting its worker concurrency.

Queue Topology

QueueRecordsPriorityNotes
layers:expressionexpressionHIGHMust process before annotation/segmentation
layers:segmentationsegmentationHIGHMust process before annotation
layers:annotationannotationLayer, clusterSetHIGHCore pipeline; depends on expression + segmentation
layers:ontologyontology, typeDefMEDIUMIndependent
layers:corpuscorpus, membershipMEDIUMIndependent
layers:resourceentry, collection, collectionMembership, template, filling, templateCompositionMEDIUMIndependent
layers:judgmentexperimentDef, judgmentSet, agreementReportMEDIUMIndependent
layers:alignmentalignmentMEDIUMIndependent
layers:graphgraphNode, graphEdge, graphEdgeSetMEDIUMIndependent
layers:integrationpersona, media, eprint, dataLinkLOWSupporting records

Dependency-Aware Processing

The core pipeline records (expression, segmentation, and annotation) have strict processing order dependencies:

An annotationLayer references both an expression (via sourceUrl) and a segmentation (via segSourceUrl). If an annotation arrives on the firehose before its expression, the handler cannot resolve the cross-reference or validate span offsets.

The dependency resolution strategy:

  1. Check existence: The annotation handler queries PostgreSQL for the referenced expression and segmentation.
  2. If present: Proceed with normal indexing.
  3. If missing: Re-queue the job with an exponential delay (1s, 4s, 16s) using BullMQ's built-in delay mechanism.
  4. After 3 retries: Move to the DLQ. The missing dependency likely indicates a data integrity issue (deleted expression, malformed AT-URI) rather than a timing issue.

This pattern applies to any record type that references another Layers record. The layers:expression and layers:segmentation queues are processed at HIGH priority to minimize the window during which dependent records are waiting.

Indexing Pipeline

The full path from firehose event to indexed record:

Per-Record-Type Handlers

Each record type has a dedicated handler that knows how to extract fields for PostgreSQL columns, construct Elasticsearch documents, and create Neo4j nodes and edges. Handlers implement a common interface:

interface RecordHandler<T> {
/** The collection NSID this handler processes */
collection: string;

/** Extract PG row data from the record */
toPgRow(did: string, rkey: string, record: T): PgRowData;

/** Construct ES document (return null if this type is not indexed in ES) */
toEsDocument(did: string, rkey: string, record: T): EsDocument | null;

/** Create Neo4j operations (return empty array if not indexed in Neo4j) */
toNeo4jOps(did: string, rkey: string, record: T): Neo4jOperation[];

/** Extract cross-references from this record */
extractRefs(did: string, rkey: string, record: T): CrossReference[];
}

Example: Expression Handler

const expressionHandler: RecordHandler<Expression> = {
collection: 'pub.layers.expression.expression',

toPgRow(did, rkey, record) {
return {
table: 'expressions',
data: {
uri: `at://${did}/pub.layers.expression.expression/${rkey}`,
did,
rkey,
text: record.text,
lang: record.lang,
media_type: record.mediaType ?? 'text/plain',
source_url: record.sourceUrl ?? null,
context_url: record.contextUrl ?? null,
created_at: record.createdAt,
indexed_at: new Date(),
},
};
},

toEsDocument(did, rkey, record) {
return {
index: 'layers_expressions',
id: `at://${did}/pub.layers.expression.expression/${rkey}`,
body: {
uri: `at://${did}/pub.layers.expression.expression/${rkey}`,
did,
text: record.text,
lang: record.lang,
media_type: record.mediaType ?? 'text/plain',
created_at: record.createdAt,
},
};
},

toNeo4jOps(did, rkey, record) {
const uri = `at://${did}/pub.layers.expression.expression/${rkey}`;
const ops: Neo4jOperation[] = [
{
type: 'MERGE_NODE',
label: 'Expression',
properties: { uri, did, lang: record.lang },
},
];

// If the expression references a source (e.g., a media record), create an edge
if (record.sourceUrl) {
ops.push({
type: 'MERGE_EDGE',
from: uri,
to: record.sourceUrl,
label: 'HAS_SOURCE',
});
}

return ops;
},

extractRefs(did, rkey, record) {
const uri = `at://${did}/pub.layers.expression.expression/${rkey}`;
const refs: CrossReference[] = [];

if (record.sourceUrl) {
refs.push({ fromUri: uri, toUri: record.sourceUrl, refType: 'sourceUrl' });
}
if (record.contextUrl) {
refs.push({ fromUri: uri, toUri: record.contextUrl, refType: 'contextUrl' });
}

return refs;
},
};

All three writes (PG, ES, Neo4j) execute within a single handler invocation. PG is the source of truth and is written first inside a transaction. ES and Neo4j writes are best-effort; if either fails, the job is retried. See Database Design for the full schema definitions and Background Jobs for retry and consistency reconciliation.

Cross-Reference Resolution

Layers records form a dense web of AT-URI references. The ingestion pipeline extracts every reference and writes it to the cross_references table, enabling efficient "find everything referencing this record" queries.

Forward References

When a record is indexed, all AT-URI reference fields are extracted by the handler's extractRefs method and inserted into cross_references:

INSERT INTO cross_references (from_uri, to_uri, ref_type, created_at)
VALUES ($1, $2, $3, NOW())
ON CONFLICT (from_uri, to_uri, ref_type) DO NOTHING;

This happens during the initial indexing pass. No additional processing is needed.

Back-References

When a new expression arrives, the pipeline checks whether any already-indexed records hold forward references to it that could not be fully resolved at index time. For example:

  1. An annotationLayer arrives referencing expression at://did:plc:abc/pub.layers.expression.expression/xyz.
  2. The expression does not yet exist in PG. The annotation job is re-queued (see Dependency-Aware Processing).
  3. Later, the expression arrives and is indexed.
  4. After indexing the expression, the handler queries cross_references for any to_uri matching the new expression's AT-URI and re-enqueues the corresponding records for processing.

This ensures that late-arriving dependencies are eventually resolved without manual intervention.

Deletion and Tombstone Handling

When a delete operation arrives on the firehose, the pipeline removes the record from all three storage backends.

Deletion Sequence

  1. PostgreSQL: Delete the row from the record's table. This cascades to any PG-level foreign keys (e.g., normalized annotation rows for an annotationLayer).
  2. Elasticsearch: Delete the document by its AT-URI ID.
  3. Neo4j: Delete the node and all edges originating from it.
  4. Cross-references: Delete all rows in cross_references where from_uri matches the deleted record.

Dangling Reference Policy

Cross-references that point to the deleted record are intentionally not removed. Records that referenced the now-deleted record retain their dangling to_uri references. This is consistent with ATProto's eventual consistency model: a user may delete a record that others reference, and those references become stale rather than cascading deletions across repository boundaries.

Query endpoints handle dangling references gracefully by returning null for unresolvable AT-URIs and flagging them in the response.

Firehose Lag Monitoring

Firehose lag (the delay between when an event is produced by the relay and when the appview processes it) is the primary health indicator for the ingestion pipeline.

Metrics

MetricTypeLabelsDescription
layers_firehose_cursor_lag_secondsGaugeWall-clock time minus the timestamp of the last processed event
layers_firehose_events_processed_totalCountercollectionTotal events processed, broken down by collection NSID
layers_firehose_events_filtered_totalCounterTotal events discarded by the filter stage
layers_firehose_validation_failures_totalCounterstage, collectionValidation failures by stage (lexicon/zod) and collection
layers_firehose_queue_depthGaugequeueCurrent pending job count per BullMQ queue

Alert Thresholds

ConditionSeverityAction
cursor_lag_seconds > 60WarningPage on-call; check queue depth and worker health
cursor_lag_seconds > 300CriticalImmediate investigation; likely downstream outage or backpressure
queue_depth > 10000 (any queue)WarningScale workers or investigate slow handler
validation_failures_total spikeWarningCheck for Lexicon version mismatch or malformed PDS

Dashboard

The Grafana dashboard includes the following panels for firehose health:

  • Cursor lag: Real-time gauge and time-series graph of layers_firehose_cursor_lag_seconds
  • Throughput: Events per second, stacked by collection
  • Queue depth: Per-queue pending job counts
  • DLQ inflow: Rate of records entering the dead letter queue
  • Validation failure rate: Failures per second by stage and collection

See Observability for the full dashboard and alerting configuration.

Future Considerations

WebTransport

WebTransport is being monitored as a potential replacement for WebSocket-based firehose subscription. WebTransport provides multiplexed streams over HTTP/3, unreliable datagrams for non-critical events, and better congestion control. It is not yet adopted by ATProto relays but could improve ingestion performance for high-volume streams.

Structured Concurrency

Node.js 22+ improvements to AbortController enable cleaner structured concurrency patterns for the firehose pipeline. Cancellation signals can propagate through the entire handler chain (FirehoseConsumerEventFilterCommitHandlerEventProcessor), ensuring graceful shutdown without orphaned async operations.

See Also

  • Database Design for PostgreSQL schema, Elasticsearch mappings, and Neo4j graph model
  • Background Jobs for worker configuration, concurrency tuning, and DLQ management
  • Indexing Strategy for per-record-type indexing decisions and annotation normalization
  • Technology Stack for version pins on BullMQ, Redis, cockatiel, and other dependencies