Skip to main content

Background Jobs and Workers

Job vs Worker Distinction

Following Chive's pattern, background work is split between two code locations:

  • Jobs (src/jobs/): Scheduled interval-based tasks that run within the API server process. Each job class implements start(), stop(), and run() methods with an setInterval timer.
  • Workers (src/workers/): BullMQ queue consumers that run within the indexer process. Each worker class processes jobs from a specific BullMQ queue.
JobFileSchedule
MaterializedViewRefreshJobsrc/jobs/materialized-view-refresh-job.ts15 min / 1 hour
StalenessDetectionJobsrc/jobs/staleness-detection-job.tsDaily
ReconciliationJobsrc/jobs/reconciliation-job.ts6 hours (ES) / daily (Neo4j)
KnowledgeGraphLinkingJobsrc/jobs/knowledge-graph-linking-job.tsOn-demand
OntologySyncJobsrc/jobs/ontology-sync-job.tsHourly
ImportSchedulerJobsrc/jobs/import-scheduler-job.tsOn-demand
WorkerFileQueue
EnrichmentWorkersrc/workers/enrichment-worker.tslayers:enrichment
FreshnessWorkersrc/workers/freshness-worker.tslayers:maintenance
IndexRetryWorkersrc/workers/index-retry-worker.tsDLQ replay
// src/jobs/staleness-detection-job.ts — interval-based job pattern
class StalenessDetectionJob {
private timer: NodeJS.Timeout | null = null

async start(): Promise<void> {
this.timer = setInterval(() => {
this.run().catch(err => this.logger.error('Staleness detection failed', err))
}, this.intervalMs)
}

async stop(): Promise<void> {
if (this.timer) { clearInterval(this.timer); this.timer = null }
}

protected async run(): Promise<void> { /* compare indexed_at vs firehose cursor */ }
}

Job Queue Architecture

All queue-based background work runs through BullMQ queues backed by Redis. Each queue has its own worker pool with configurable concurrency.

Queue Topology

QueuePurposeDefault Concurrency
layers:expressionIndex expression records10
layers:segmentationIndex segmentation records10
layers:annotationIndex annotation layers and cluster sets10
layers:ontologyIndex ontologies and type definitions5
layers:corpusIndex corpora and memberships5
layers:resourceIndex resource entries, collections, templates, fillings5
layers:judgmentIndex experiments, judgment sets, agreement reports5
layers:alignmentIndex alignments5
layers:graphIndex graph nodes, edges, edge sets10
layers:integrationIndex personas, media, eprints, data links, changelogs5
layers:enrichmentPost-indexing enrichment tasks3
layers:importFormat import jobs (CoNLL, BRAT, ELAN, TEI)2
layers:maintenanceScheduled maintenance and reconciliation1

Worker Pool Management

Each queue spawns a BullMQ Worker instance with the configured concurrency. Workers are started as separate processes (via the indexer entry point) to isolate them from the API server. This allows independent scaling: API pods and worker pods can have different replica counts.

const expressionWorker = new Worker(
'layers:expression',
expressionProcessor,
{
connection: redisConnection,
concurrency: config.workers.expression.concurrency,
limiter: { max: 100, duration: 1000 }, // 100 jobs/sec rate limit
}
);

Firehose Ingestion Jobs

These are the primary jobs created by the firehose ingestion pipeline. Each filtered and validated record is dispatched as a job to its namespace queue. The job payload includes the DID, rkey, record data, and firehose cursor position.

Enrichment Jobs

Enrichment jobs run after initial indexing to compute derived data that requires additional processing.

Language Detection

For expressions where language is not set by the record author, a language detection job runs ICU-based language identification on the text field and writes the result back to the PG row and ES document.

Knowledge Graph Linking

When an annotation layer contains knowledgeRefs pointing to external knowledge bases (Wikidata, WordNet, FrameNet), an enrichment job resolves the external identifiers and creates or updates Neo4j nodes for the KB entities. This ensures the knowledge graph stays connected even when external references arrive before their target KB nodes are indexed.

Media Metadata Extraction

For media records referencing audio, video, or image blobs, an enrichment job extracts technical metadata (duration, sample rate, resolution, codec) from the blob if not already present in the record. This uses the media file's ATProto blob reference to fetch content from the user's PDS.

Annotation Statistics

After a batch of annotation layers is indexed, a statistics job computes per-expression annotation coverage (how many layers, which kinds/subkinds) and per-corpus aggregate statistics. These are written to the corpus_statistics and annotation_coverage materialized views.

Format Import Jobs

The format import pipeline converts standard annotation formats into Layers records. Each import job is triggered by an API request (e.g., "import this CoNLL-U file into corpus X") and runs inside the plugin sandbox.

Import Pipeline

  1. Parse: The importer plugin reads the source file and extracts its native data structures.
  2. Map: The plugin converts native structures to Layers record types following the mappings documented in Data Models Integration.
  3. Validate: Generated records are validated against Layers lexicon schemas.
  4. Write: Records are written to the user's PDS via the ATProto com.atproto.repo.createRecord XRPC call. This requires the user's OAuth session.
  5. Index: The firehose picks up the new PDS records and indexes them through the normal pipeline.

Supported Formats

FormatImporterRecords Produced
CoNLL-Uconll-importerexpression + segmentation + annotationLayer (POS, lemma, deps)
CoNLL-2003conll-importerexpression + segmentation + annotationLayer (NER)
BRAT (.ann)brat-importerexpression + segmentation + annotationLayer (entities, relations, events)
ELAN (.eaf)elan-importerexpression + media + segmentation + annotationLayer (per tier)
Praat (.TextGrid)praat-importerexpression + media + segmentation + annotationLayer (intervals, points)
TEI XMLtei-importerexpression + corpus + annotationLayer (inline annotations)

Each importer is documented in the corresponding data model integration page.

Maintenance Jobs

Materialized View Refresh

Refreshes PostgreSQL materialized views (corpus_statistics, annotation_coverage, label_distribution, knowledge_graph_density) on a configurable schedule.

ViewDefault Schedule
corpus_statisticsEvery 15 minutes
annotation_coverageEvery 15 minutes
label_distributionEvery hour
knowledge_graph_densityEvery hour

Elasticsearch Reconciliation

A sampling-based reconciliation job compares a random subset of PG records against their ES counterparts. Any mismatches (missing documents, stale data) trigger re-indexing for the affected records.

Default schedule: every 6 hours, sampling 1% of records per type.

Neo4j Reconciliation

Similar to ES reconciliation, but checks Neo4j node counts and edge integrity against PG. Missing nodes or edges are re-created from PG data.

Default schedule: daily.

Stale Record Detection

Compares indexed_at timestamps against the firehose cursor position to detect records that may have been updated upstream but not re-indexed. Stale records are re-fetched from the user's PDS and re-indexed.

Default schedule: daily.

Dead Letter Queue Management

Records that fail processing after exhausting retries (see Firehose Ingestion) land in the DLQ. The DLQ is a PostgreSQL table (firehose_dlq) with structured error metadata.

DLQ Admin API

EndpointAction
GET /admin/dlqList DLQ entries with filtering by collection, error stage, time range
POST /admin/dlq/:id/replayRe-queue a specific DLQ entry for reprocessing
POST /admin/dlq/replay-allRe-queue all DLQ entries matching a filter
DELETE /admin/dlq/:idDiscard a DLQ entry

Monitoring

The layers_dlq_entries_total Prometheus gauge tracks the current DLQ size. An alert fires when the DLQ exceeds 100 entries (configurable), prompting investigation.

See Also