Skip to main content

Database Design

The Layers appview indexes all 26 pub.layers.* record types across four storage backends. Each backend serves a distinct purpose: PostgreSQL is the authoritative source of truth, Elasticsearch and Neo4j are derived indexes optimized for specific query patterns, and Redis handles ephemeral state. Every piece of data in Elasticsearch and Neo4j can be reconstructed from PostgreSQL, and every row in PostgreSQL can be reconstructed from the ATProto firehose.

Database Roles

DatabaseRoleData LifetimeRebuildable From
PostgreSQL 16+Source of truth. Stores every indexed record as structured columns plus full JSONB. Handles relational queries, cross-reference lookups, and transactional writes.PersistentATProto firehose (cursor 0)
Elasticsearch 8+Full-text search, faceted filtering, and aggregation. Powers the search API for expressions, annotations, ontologies, graph nodes, and other searchable record types.Persistent (derived)PostgreSQL
Neo4j 5+Knowledge graph and cross-reference traversal. Models the dense reference network between expressions, annotations, graph nodes, ontologies, and alignments as a native graph for efficient path queries.Persistent (derived)PostgreSQL
Redis 7+Cache, session management, rate limiting, and job queue backing store (BullMQ). All data is ephemeral and can be lost without affecting correctness.EphemeralN/A (regenerated on demand)

Why four databases? Layers' record types are densely cross-referenced and support diverse query patterns that no single database handles well:

  • Relational integrity (foreign keys, transactions, JSONB queries) requires PostgreSQL.
  • Full-text search with faceting (language-aware stemming, nested annotation search, aggregation buckets) requires Elasticsearch.
  • Graph traversal (multi-hop cross-reference walks, shortest path between annotations, subgraph extraction) requires Neo4j.
  • Sub-millisecond ephemeral state (session tokens, rate-limit counters, cached records, job queues) requires Redis.

Running all four adds operational complexity, but the alternative (forcing PostgreSQL to handle graph traversal or Elasticsearch to handle transactional writes) produces worse performance and more brittle code.

Storage Adapter Pattern

Each database is accessed through an adapter implementing the IStorageBackend interface, following Chive's src/storage/ pattern:

// src/storage/postgresql/adapter.ts
@singleton()
class PostgreSQLAdapter implements IStorageBackend {
constructor(@inject('PgPool') private pool: Pool) {}

async storeRecord(table: string, data: PgRowData): Promise<Result<void>> { /* ... */ }
async getByUri(table: string, uri: string): Promise<Result<Record | null>> { /* ... */ }
async deleteByUri(table: string, uri: string): Promise<Result<void>> { /* ... */ }
}

// src/storage/elasticsearch/adapter.ts
@singleton()
class ElasticsearchAdapter implements ISearchBackend {
constructor(@inject('EsClient') private client: Client) {}

async indexDocument(index: string, doc: EsDocument): Promise<Result<void>> { /* ... */ }
async search(request: SearchRequest): Promise<Result<SearchResponse>> { /* ... */ }
}

// src/storage/neo4j/adapter.ts
@singleton()
class Neo4jAdapter implements IGraphBackend {
constructor(@inject('Neo4jDriver') private driver: Driver) {}

async mergeNode(label: string, props: NodeProperties): Promise<Result<void>> { /* ... */ }
async mergeEdge(from: string, to: string, type: string): Promise<Result<void>> { /* ... */ }
}

Supporting utilities in each adapter directory:

FilePurpose
src/storage/postgresql/query-builder.tsComposable parameterized SQL builder
src/storage/postgresql/batch-operations.tsBulk insert/upsert for firehose catchup
src/storage/postgresql/migrations/node-pg-migrate migration files
src/storage/elasticsearch/document-mapper.tsPG row → ES document transformation
src/storage/elasticsearch/templates/Index template JSON files
src/storage/elasticsearch/ilm/Index Lifecycle Management policies
src/storage/elasticsearch/index-manager.tsIndex creation, mapping updates, ILM application
src/storage/neo4j/schema/constraints.cypher, indexes.cypher schema files
src/storage/neo4j/setup-manager.tsSchema initialization on startup
src/storage/redis/structures.tsType-safe Redis key patterns

Connection pooling follows Chive's createPool/closePool pattern for clean lifecycle management. All adapter methods are wrapped in cockatiel resilience policies (circuit breaker + retry).

PostgreSQL Schema

PostgreSQL is the authoritative store. Every record ingested from the firehose is written here first. Tables follow consistent conventions:

Conventions

  • Table names are derived from record type names, lowercased and pluralized (e.g., pub.layers.expression.expression becomes expressions).
  • Primary key is always uri (the AT-URI of the record, e.g., at://did:plc:abc123/pub.layers.expression.expression/rkey).
  • Standard columns appear on every table: uri, did (record owner DID), rkey (record key), indexed_at (timestamp of indexing), and record (full record as JSONB).
  • Extracted columns pull frequently queried fields out of the JSONB into dedicated typed columns for efficient indexing. The record column always contains the complete record for schema-evolution resilience.
  • Cross-references are extracted into both dedicated columns on the source table (for direct lookups) and into the shared cross_references table (for reverse lookups and graph construction).

Core Pipeline Tables

These tables correspond to the core pipeline lexicons that build incrementally: expression, segmentation, annotation.

expressions

Stores pub.layers.expression.expression records.

CREATE TABLE expressions (
uri TEXT PRIMARY KEY,
did TEXT NOT NULL,
rkey TEXT NOT NULL,
text TEXT,
kind TEXT,
language TEXT,
source_url TEXT,
source_ref TEXT, -- AT-URI of source expression
eprint_ref TEXT, -- AT-URI of linked eprint
parent_ref TEXT, -- AT-URI of parent expression
indexed_at TIMESTAMPTZ NOT NULL DEFAULT now(),
record JSONB NOT NULL,

CONSTRAINT expressions_did_rkey_unique UNIQUE (did, rkey)
);

CREATE INDEX idx_expressions_did ON expressions (did);
CREATE INDEX idx_expressions_kind_language ON expressions (kind, language);
CREATE INDEX idx_expressions_source_url ON expressions (source_url)
WHERE source_url IS NOT NULL;
CREATE INDEX idx_expressions_parent_ref ON expressions (parent_ref)
WHERE parent_ref IS NOT NULL;
CREATE INDEX idx_expressions_eprint_ref ON expressions (eprint_ref)
WHERE eprint_ref IS NOT NULL;
CREATE INDEX idx_expressions_record ON expressions USING GIN (record);
CREATE INDEX idx_expressions_indexed_at ON expressions (indexed_at);

segmentations

Stores pub.layers.segmentation.segmentation records.

CREATE TABLE segmentations (
uri TEXT PRIMARY KEY,
did TEXT NOT NULL,
rkey TEXT NOT NULL,
expression_ref TEXT NOT NULL, -- AT-URI of target expression
strategy TEXT,
token_count INTEGER,
indexed_at TIMESTAMPTZ NOT NULL DEFAULT now(),
record JSONB NOT NULL,

CONSTRAINT segmentations_did_rkey_unique UNIQUE (did, rkey)
);

CREATE INDEX idx_segmentations_did ON segmentations (did);
CREATE INDEX idx_segmentations_expression_ref ON segmentations (expression_ref);
CREATE INDEX idx_segmentations_strategy ON segmentations (strategy);
CREATE INDEX idx_segmentations_record ON segmentations USING GIN (record);

annotation_layers

Stores pub.layers.annotation.annotationLayer records. The embedded annotations array is normalized into a separate annotations table.

CREATE TABLE annotation_layers (
uri TEXT PRIMARY KEY,
did TEXT NOT NULL,
rkey TEXT NOT NULL,
expression_ref TEXT NOT NULL, -- AT-URI of annotated expression
segmentation_ref TEXT, -- AT-URI of segmentation used
kind TEXT NOT NULL,
subkind TEXT,
formalism TEXT,
ontology_ref TEXT, -- AT-URI of governing ontology
persona_ref TEXT, -- AT-URI of annotator persona
annotation_count INTEGER NOT NULL DEFAULT 0,
indexed_at TIMESTAMPTZ NOT NULL DEFAULT now(),
record JSONB NOT NULL,

CONSTRAINT annotation_layers_did_rkey_unique UNIQUE (did, rkey)
);

CREATE INDEX idx_annotation_layers_did ON annotation_layers (did);
CREATE INDEX idx_annotation_layers_expression_ref ON annotation_layers (expression_ref);
CREATE INDEX idx_annotation_layers_kind ON annotation_layers (kind);
CREATE INDEX idx_annotation_layers_kind_subkind ON annotation_layers (kind, subkind);
CREATE INDEX idx_annotation_layers_kind_subkind_formalism
ON annotation_layers (kind, subkind, formalism);
CREATE INDEX idx_annotation_layers_ontology_ref ON annotation_layers (ontology_ref)
WHERE ontology_ref IS NOT NULL;
CREATE INDEX idx_annotation_layers_record ON annotation_layers USING GIN (record);

annotations

Normalized from the embedded annotations array within each annotationLayer record. Each row represents one annotation. This table has no uri of its own; it is keyed by the parent layer's URI plus the array index.

CREATE TABLE annotations (
layer_uri TEXT NOT NULL REFERENCES annotation_layers(uri) ON DELETE CASCADE,
index INTEGER NOT NULL,
label TEXT,
value TEXT,
anchor_type TEXT, -- textSpan, tokenRef, temporalSpan, etc.
start_offset INTEGER,
end_offset INTEGER,
token_index INTEGER,
confidence REAL,
record JSONB NOT NULL,

PRIMARY KEY (layer_uri, index)
);

CREATE INDEX idx_annotations_label ON annotations (label);
CREATE INDEX idx_annotations_anchor_type ON annotations (anchor_type);
CREATE INDEX idx_annotations_layer_uri ON annotations (layer_uri);
CREATE INDEX idx_annotations_record ON annotations USING GIN (record);

cluster_sets

Stores pub.layers.annotation.clusterSet records.

CREATE TABLE cluster_sets (
uri TEXT PRIMARY KEY,
did TEXT NOT NULL,
rkey TEXT NOT NULL,
expression_ref TEXT,
kind TEXT,
indexed_at TIMESTAMPTZ NOT NULL DEFAULT now(),
record JSONB NOT NULL,

CONSTRAINT cluster_sets_did_rkey_unique UNIQUE (did, rkey)
);

CREATE INDEX idx_cluster_sets_did ON cluster_sets (did);
CREATE INDEX idx_cluster_sets_expression_ref ON cluster_sets (expression_ref)
WHERE expression_ref IS NOT NULL;
CREATE INDEX idx_cluster_sets_kind ON cluster_sets (kind);
CREATE INDEX idx_cluster_sets_record ON cluster_sets USING GIN (record);

Parallel Track Tables

These tables store records from the parallel support lexicons. They follow the same column conventions (uri, did, rkey, indexed_at, record) with type-specific extracted columns.

TableRecord TypeKey Extracted Columns
ontologiesontology.ontologyname, domain, version
type_defsontology.typeDefontology_ref, label, relation_type
corporacorpus.corpusname, language, license
corpus_membershipscorpus.membershipcorpus_ref, expression_ref
resource_entriesresource.entrylemma, form, language, collection_ref
resource_collectionsresource.collectionname, collection_type
collection_membershipsresource.collectionMembershipcollection_ref, entry_ref
templatesresource.templatename, slot_count
fillingsresource.fillingtemplate_ref, expression_ref
template_compositionsresource.templateCompositionname, template_refs (JSONB)
experiment_defsjudgment.experimentDefmeasure, task_type, design_type
judgment_setsjudgment.judgmentSetexperiment_ref, annotator_did
agreement_reportsjudgment.agreementReportexperiment_ref, metric, score
alignmentsalignment.alignmentsource_ref, target_ref, alignment_type

Integration Tables

TableRecord TypeKey Extracted Columns
graph_nodesgraph.graphNodekind, name, description, ontology_ref
graph_edgesgraph.graphEdgesource_ref, target_ref, edge_type, edge_set_ref
graph_edge_setsgraph.graphEdgeSetname, edge_type, edge_count
personaspersona.personaname, domain, kind
media_recordsmedia.mediamodality, mime_type, duration, expression_ref
eprintseprint.eprintidentifier, title, platform, doi
data_linkseprint.dataLinkeprint_ref, corpus_ref, link_type
changelogschangelog.entrysubject_uri, subject_collection, version, summary, sections (JSONB)

Cross-Reference Table

A single denormalized table captures every cross-reference between records. This table is the source for building Neo4j edges and for reverse-lookup queries ("which records reference this expression?").

CREATE TABLE cross_references (
source_uri TEXT NOT NULL,
target_uri TEXT NOT NULL,
ref_type TEXT NOT NULL, -- sourceRef, sourceUrl, eprintRef, parentRef,
-- expressionRef, segmentationRef, ontologyRef,
-- personaRef, corpusRef, templateRef, etc.
indexed_at TIMESTAMPTZ NOT NULL DEFAULT now(),

PRIMARY KEY (source_uri, target_uri, ref_type)
);

CREATE INDEX idx_cross_references_target ON cross_references (target_uri);
CREATE INDEX idx_cross_references_ref_type ON cross_references (ref_type);
CREATE INDEX idx_cross_references_source ON cross_references (source_uri);

Every time a record is ingested, the indexer extracts all AT-URI references from the record and inserts rows into cross_references. This enables queries like:

-- Find all annotation layers that reference a given expression
SELECT al.*
FROM annotation_layers al
JOIN cross_references cr ON cr.source_uri = al.uri
WHERE cr.target_uri = 'at://did:plc:abc123/pub.layers.expression.expression/doc1'
AND cr.ref_type = 'expressionRef';

-- Find all records of any type that reference a given ontology
SELECT cr.source_uri, cr.ref_type
FROM cross_references cr
WHERE cr.target_uri = 'at://did:plc:abc123/pub.layers.ontology.ontology/ud-pos'
ORDER BY cr.indexed_at DESC;

Elasticsearch Mappings

Elasticsearch indexes are derived from PostgreSQL. Only record types that benefit from full-text search or faceted aggregation are indexed. See the Record Type Coverage Matrix for the full list.

Custom Analyzers

Layers configures custom analyzers for linguistic data:

{
"settings": {
"analysis": {
"analyzer": {
"layers_text": {
"type": "custom",
"tokenizer": "icu_tokenizer",
"filter": ["icu_normalizer", "icu_folding", "lowercase"]
},
"layers_linguistic": {
"type": "custom",
"tokenizer": "icu_tokenizer",
"filter": [
"icu_normalizer",
"icu_folding",
"lowercase",
"english_stemmer"
]
}
},
"filter": {
"english_stemmer": {
"type": "stemmer",
"language": "english"
}
}
}
}
}

The layers_text analyzer provides Unicode-normalized, case-folded tokenization suitable for multilingual linguistic data. The layers_linguistic analyzer adds English stemming for metadata search. Language-specific stemmers are configured per index when the corpus language is known.

Index: expressions

{
"mappings": {
"properties": {
"uri": { "type": "keyword" },
"did": { "type": "keyword" },
"text": { "type": "text", "analyzer": "layers_text",
"fields": { "raw": { "type": "keyword", "ignore_above": 32766 } } },
"kind": { "type": "keyword" },
"language": { "type": "keyword" },
"source_url": { "type": "keyword" },
"source_ref": { "type": "keyword" },
"eprint_ref": { "type": "keyword" },
"parent_ref": { "type": "keyword" },
"indexed_at": { "type": "date" }
}
}
}

Supports queries like: full-text search over expression text, faceted filtering by kind and language, and lookup by source URL or eprint reference.

Index: annotation_layers

{
"mappings": {
"properties": {
"uri": { "type": "keyword" },
"did": { "type": "keyword" },
"expression_ref": { "type": "keyword" },
"kind": { "type": "keyword" },
"subkind": { "type": "keyword" },
"formalism": { "type": "keyword" },
"ontology_ref": { "type": "keyword" },
"persona_ref": { "type": "keyword" },
"annotation_count": { "type": "integer" },
"annotations": {
"type": "nested",
"properties": {
"label": { "type": "keyword" },
"value": { "type": "text", "analyzer": "layers_text" },
"anchor_type": { "type": "keyword" },
"confidence": { "type": "float" }
}
},
"indexed_at": { "type": "date" }
}
}
}

The annotations field uses Elasticsearch's nested type so that queries can filter on label-value pairs without cross-matching (e.g., find layers where at least one annotation has label "NNP" and confidence > 0.9, without accidentally matching label "NNP" from one annotation against confidence 0.9 from another).

Index: ontologies

{
"mappings": {
"properties": {
"uri": { "type": "keyword" },
"did": { "type": "keyword" },
"name": { "type": "text", "analyzer": "layers_linguistic",
"fields": { "keyword": { "type": "keyword" } } },
"domain": { "type": "keyword" },
"version": { "type": "keyword" },
"indexed_at": { "type": "date" }
}
}
}

Index: graph_nodes

{
"mappings": {
"properties": {
"uri": { "type": "keyword" },
"did": { "type": "keyword" },
"kind": { "type": "keyword" },
"name": { "type": "text", "analyzer": "layers_linguistic",
"fields": { "keyword": { "type": "keyword" } } },
"description": { "type": "text", "analyzer": "layers_linguistic" },
"ontology_ref": { "type": "keyword" },
"indexed_at": { "type": "date" }
}
}
}

Additional Indexes

The following indexes use simpler mappings with the same conventions (keyword for identifiers and enum fields, text with layers_text or layers_linguistic for human-readable fields):

IndexKey Text FieldsKey Keyword Fields
type_defslabel, descriptionontology_ref, relation_type
corporaname, descriptionlanguage, license
resource_entrieslemma, formlanguage, collection_ref
resource_collectionsnamecollection_type
experiment_defsname, descriptionmeasure, task_type, design_type
personasname, descriptiondomain, kind
media_recordsdescriptionmodality, mime_type
eprintstitle, abstractidentifier, platform, doi

Neo4j Graph Model

Neo4j stores the cross-reference graph derived from PostgreSQL. Every record that participates in cross-references becomes a node; every reference becomes a relationship. This enables multi-hop traversal queries that would require expensive recursive CTEs in PostgreSQL.

Node Labels

Each indexed record type maps to a Neo4j node label. All nodes carry at minimum the uri and did properties.

Node LabelSource TableKey Properties
Expressionexpressionsuri, did, kind, language, text (truncated)
AnnotationLayerannotation_layersuri, did, kind, subkind, formalism
Annotationannotationslayer_uri, index, label, value, confidence
ClusterSetcluster_setsuri, did, kind
Ontologyontologiesuri, did, name, domain
TypeDeftype_defsuri, did, label, relation_type
Corpuscorporauri, did, name, language
GraphNodegraph_nodesuri, did, kind, name
GraphEdgegraph_edgesuri, did, edge_type
Personapersonasuri, did, name, kind
Mediamedia_recordsuri, did, modality
Eprinteprintsuri, did, identifier, title
Alignmentalignmentsuri, did, alignment_type

Relationship Types

RelationshipSource NodeTarget NodeDerived From
PARENT_OFExpressionExpressionexpressions.parent_ref
SEGMENTED_BYExpressionSegmentationsegmentations.expression_ref
ANNOTATESAnnotationLayerExpressionannotation_layers.expression_ref
USES_ONTOLOGYAnnotationLayerOntologyannotation_layers.ontology_ref
MEMBER_OFExpressionCorpuscorpus_memberships
REFERENCESany nodeany nodecross_references table (generic)
GRAPH_EDGEGraphNodeGraphNodegraph_edges (typed via edge_type property)
KNOWLEDGE_REFAnnotationGraphNodeknowledgeRefs in annotation JSONB
ALIGNSAlignmentExpression / AnnotationLayeralignments.source_ref, alignments.target_ref
LINKS_EPRINTDataLinkEprint / Corpusdata_links.eprint_ref, data_links.corpus_ref

Cypher Examples

Creating an expression node and its relationships:

// Create an expression node
MERGE (e:Expression {uri: $uri})
SET e.did = $did,
e.kind = $kind,
e.language = $language,
e.text = left($text, 500),
e.indexedAt = datetime()

// Link to parent expression
WITH e
MATCH (parent:Expression {uri: $parentRef})
MERGE (parent)-[:PARENT_OF]->(e)

// Link to corpus membership
WITH e
MATCH (c:Corpus {uri: $corpusRef})
MERGE (e)-[:MEMBER_OF]->(c)

Multi-hop traversal to find all annotations on an expression and its children:

// Find all annotation layers on an expression tree (up to 5 levels deep)
MATCH (root:Expression {uri: $rootUri})
MATCH (root)-[:PARENT_OF*0..5]->(child:Expression)
MATCH (layer:AnnotationLayer)-[:ANNOTATES]->(child)
RETURN child.uri AS expressionUri,
layer.uri AS layerUri,
layer.kind AS kind,
layer.subkind AS subkind
ORDER BY child.uri, layer.kind

Shortest path between two graph nodes:

MATCH path = shortestPath(
(a:GraphNode {uri: $sourceUri})-[:GRAPH_EDGE*..10]-(b:GraphNode {uri: $targetUri})
)
RETURN [n IN nodes(path) | n.uri] AS nodeUris,
[r IN relationships(path) | r.edge_type] AS edgeTypes

Indexes and Constraints

-- Uniqueness constraint on uri for all node labels
CREATE CONSTRAINT expression_uri IF NOT EXISTS
FOR (n:Expression) REQUIRE n.uri IS UNIQUE;
CREATE CONSTRAINT annotation_layer_uri IF NOT EXISTS
FOR (n:AnnotationLayer) REQUIRE n.uri IS UNIQUE;
CREATE CONSTRAINT graph_node_uri IF NOT EXISTS
FOR (n:GraphNode) REQUIRE n.uri IS UNIQUE;
-- (repeated for all node labels)

-- Full-text index for graph node search
CREATE FULLTEXT INDEX graph_node_search IF NOT EXISTS
FOR (n:GraphNode) ON EACH [n.name, n.description];

Redis Data Model

Redis stores ephemeral data only. Nothing in Redis is required for correctness; if Redis is flushed, the system recovers gracefully through cache misses and session re-authentication.

Key Patterns

PatternTypeTTLPurpose
session:{did}:{token}Hash24hUser session data (DID, scope, issued-at)
record:{uri}String (JSON)5mCached record fetched from PG
ratelimit:{did}:{endpoint}Sorted set60sSliding-window rate limiter (timestamps as scores)
resolve:{did}String1hCached DID-to-PDS resolution
cursor:firehoseStringnoneLast processed firehose cursor for resumption

BullMQ Queue Keys

Job queues are managed by BullMQ and use Redis as the backing store. BullMQ manages its own key namespace:

PatternPurpose
bull:{queueName}:waitJobs waiting to be processed
bull:{queueName}:activeJobs currently being processed
bull:{queueName}:completedCompleted jobs (with configurable retention)
bull:{queueName}:failedFailed jobs awaiting retry or manual intervention
bull:{queueName}:delayedJobs scheduled for future processing
bull:{queueName}:stalledJobs detected as stalled by the stall checker

Queue names include firehose-ingest, es-sync, neo4j-sync, enrichment, and format-import.

Rate Limiting Example

-- Sliding window: allow 100 requests per 60 seconds per DID per endpoint
ZADD ratelimit:{did}:{endpoint} {timestamp} {requestId}
ZREMRANGEBYSCORE ratelimit:{did}:{endpoint} 0 {timestamp - 60}
ZCARD ratelimit:{did}:{endpoint}
-- If count > 100, reject with 429
EXPIRE ratelimit:{did}:{endpoint} 60

Data Consistency

Write Ordering

All writes follow a strict ordering to maintain consistency:

  1. PostgreSQL first. The firehose consumer writes the record to PostgreSQL within a transaction. If the PG write fails, the record is not indexed anywhere.
  2. Elasticsearch and Neo4j second. After the PG transaction commits, separate BullMQ jobs are enqueued to sync the record to ES and Neo4j. These jobs are idempotent and retryable.
  3. Redis as needed. Cache entries are invalidated (or lazily expire via TTL) when records are updated or deleted.

Eventual Consistency

Elasticsearch and Neo4j may lag behind PostgreSQL by seconds under normal load, or longer during backpressure or recovery. The API layer handles this by:

  • Serving authoritative reads from PostgreSQL when consistency is required (e.g., immediately after a write).
  • Serving search and graph queries from ES/Neo4j with the understanding that very recent changes may not yet be reflected.
  • Including an indexed_at timestamp on all API responses so clients can assess freshness.

Reconciliation

Background maintenance jobs periodically verify that ES and Neo4j are consistent with PostgreSQL:

JobFrequencyBehavior
es-reconcileHourlySamples records from PG, checks presence and freshness in ES, re-syncs any stale or missing records
neo4j-reconcileHourlySamples nodes and edges from PG, checks presence in Neo4j, re-syncs any stale or missing data
full-reindexOn-demandWalks the entire PG database and rebuilds ES or Neo4j from scratch

Cursor-Based Rebuild

The entire database can be rebuilt from the ATProto firehose by resetting the cursor to 0:

  1. Truncate all PG tables.
  2. Reset cursor:firehose in Redis to 0.
  3. Restart the firehose consumer. It will replay every event and re-index all records.
  4. ES and Neo4j are rebuilt as a side effect of the PG writes triggering sync jobs.

This process is also used for disaster recovery and for spinning up new appview instances.

Migrations

Database migrations are managed with node-pg-migrate (version 8+). Migrations apply only to PostgreSQL; Elasticsearch and Neo4j indexes are managed programmatically by the application.

Conventions

  • Timestamp-based versioning: migration files are named {timestamp}_{description}.ts (e.g., 1709234567890_create-expressions-table.ts).
  • Reversible migrations: every up function has a corresponding down function.
  • No data migrations in schema files: data transformations are handled by separate scripts.

Commands

CommandDescription
pnpm db:migrate:upRun all pending migrations
pnpm db:migrate:downRevert the most recent migration
pnpm db:migrate:create <name>Create a new migration file with timestamp prefix

Example Migration

import type { MigrationBuilder } from "node-pg-migrate";

export async function up(pgm: MigrationBuilder): Promise<void> {
pgm.createTable("expressions", {
uri: { type: "text", primaryKey: true },
did: { type: "text", notNull: true },
rkey: { type: "text", notNull: true },
text: { type: "text" },
kind: { type: "text" },
language: { type: "text" },
source_url: { type: "text" },
source_ref: { type: "text" },
eprint_ref: { type: "text" },
parent_ref: { type: "text" },
indexed_at: { type: "timestamptz", notNull: true, default: pgm.func("now()") },
record: { type: "jsonb", notNull: true },
});

pgm.addConstraint("expressions", "expressions_did_rkey_unique", {
unique: ["did", "rkey"],
});

pgm.createIndex("expressions", "did");
pgm.createIndex("expressions", ["kind", "language"]);
pgm.createIndex("expressions", "source_url", { where: "source_url IS NOT NULL" });
pgm.createIndex("expressions", "parent_ref", { where: "parent_ref IS NOT NULL" });
pgm.createIndex("expressions", "record", { method: "gin" });
pgm.createIndex("expressions", "indexed_at");
}

export async function down(pgm: MigrationBuilder): Promise<void> {
pgm.dropTable("expressions");
}

See Also