Zum Inhalt springen

Asynchrone Jobs

Status: Entwurf · Spec-Kandidat: ja

Zweck

Spezifiziert das Laufzeitverhalten des generischen Worker-Pools, der AA_JOB-Zeilen abarbeitet. Erste Anwendung: Email-Versand (job_kind='email_send'). Das Pattern ist für weitere Hintergrundarbeiten gedacht (Bulk-Import OP-41, Reindex, Outbox-Dispatch, Cleanup, Virus-Scan, Index-Pflege OP-17).

AA_JOB ist nicht der Outbox-Eventbus. Outbox dient externen Konsumenten (siehe Events / Webhooks). AA_JOB ist interne, asynchrone Pflichtarbeit der Plattform.

Job-Kinds (V1)

job_kindAuslöserWorker-AufgabeSpec
email_sendService-Layer schreibt Job in derselben Tx wie auslösende fachliche AktionProvider-Versand, Eintrag in AA_EMAIL_LOG, Suppression-CheckEmail
bulk_importREST-Upload eines Import-JobsStaging-Validierung, Commit von AA_IMPORT_ROW-Batch in DDM_ENTITYOP-41
reindexSteward-Aktion oder WartungVolltext-/Search-Vector-Rebuild für Tenant + TypSuche und Filterung
outbox_dispatchPending-Events in AA_OUTBOX_EVENTDispatch an Webhook-Konsumenten, Retry/DLQEvents / Webhooks
cleanup_jobsCron-getaktet (alle 5 Min)Stale-Claim-Recovery, Outbox-Compact, Done-Job-Retentiondieser Abschnitt
virus_scanPre-Signed-URL-Commit auf DDM_ATTACHMENTAsynchroner Scan, Ergebnis-Patch auf virus_scan_statusAnhänge-Verhalten
index_syncToggle von DDM_ENTITY_TYPE_ATTRIBUTE.searchable / .sortable (OP-17)CREATE INDEX CONCURRENTLY bzw. DROP INDEX für Expression-Index; Drift-Check vor Build; setzt index_status auf buildingactive/failedIndizes – OP-17
gin_backfillSetzen von DDM_ENTITY_TYPE.is_external_source_target=true (OP-17)CREATE INDEX CONCURRENTLY ix_entity_attributes_gin_<type_key> als partielles GIN auf (attributes) jsonb_path_opsIndizes – OP-17

Backoff- und Stale-TTL-Defaults werden pro job_kind konfiguriert: index_sync und gin_backfill laufen mit langer Stale-TTL (≥ 1 h) wegen langer CONCURRENTLY-Builds, geringer Priorität (priority=200), max_attempts=3 (kein endloses Retry-Spinning bei kaputter Spalten-Coercion).

Worker-Pool

Mehrere parallele Worker arbeiten gegen AA_JOB. Pool-Größe ist konfigurierbar (Default 4 pro Service-Instanz). Jeder Worker hält eine eindeutige worker_id (<host>/<pid>/<uuid>), die er beim Claim in AA_JOB.claimed_by schreibt. Worker können auf Subset von job_kind und/oder tenant_id eingeschränkt werden — dedizierte Worker für High-Volume-Tenants oder kostenintensive Job-Arten.

Claim via SKIP LOCKED

Pull-Modell, single-statement Claim:

WITH next AS (
SELECT id
FROM mdm.AA_JOB
WHERE status = 'pending'
AND available_at <= now()
AND job_kind = ANY($kinds)
AND tenant_id = ANY($tenants) -- optional, default: alle
ORDER BY priority, available_at
FOR UPDATE SKIP LOCKED
LIMIT 1
)
UPDATE mdm.AA_JOB j
SET status = 'running',
claimed_by = $worker_id,
claimed_at = now(),
attempts = attempts + 1
FROM next
WHERE j.id = next.id
RETURNING j.*;

FOR UPDATE SKIP LOCKED garantiert, dass zwei parallele Worker nie denselben Job picken — der zweite sieht den gelockten Datensatz nicht und greift zum nächsten. Keine externen Locks, kein Broker.

Lebenszyklus

Job-Lebenszyklus: pending → running → done / dlq

  • Success: status='done', completed_at=now(), last_error=NULL. Kind-spezifische Folge-Schreibvorgänge (z. B. AA_EMAIL_LOG-Zeile, Audit action='email_sent') laufen in derselben Transaktion wie das Status-Update.
  • Failure (transient): status='pending', available_at = now() + backoff(attempts), last_error=<truncated>, claimed_by=NULL, claimed_at=NULL.
  • DLQ (attempts >= max_attempts): status='dlq', completed_at=now(). Audit action='job_dlq' mit payload.last_error. Manueller Replay/Discard durch Admin (siehe Stale-Claim-Recovery).
  • failed-Zustand wird nur als manuelles Stop-Flag genutzt (z. B. „dauerhaft kaputt, nicht mehr probieren”) — Worker bewegt Jobs nicht von selbst nach failed. Default-Pfad ist pendingdlq.

Backoff

Exponentiell mit Jitter:

delay(attempts) = min(base * 2^(attempts-1), max_delay) + random(0..jitter)

V1-Defaults: base=30s, max_delay=24h, jitter=10%. Schritte ungefähr: 30s · 1m · 2m · 4m · 8m · 16m · 32m · 1h · 2h … bis Cap. Konfigurierbar pro job_kind — kostengünstige Jobs kürzer, teure (Provider-Calls) länger.

Rate-Limiting (V1, gesetzt)

Pro Tenant + pro job_kind ein Token-Bucket im Worker-Prozess:

  • Konfiguration in AA_TENANT.metadata.rate_limit.<job_kind> = { rps, burst }. Default-Set wird beim Tenant-Onboarding gesetzt.
  • Globaler Cap pro Provider (z. B. SES-Account-Limit) als Plattform-Konfig (außerhalb DB).
  • Wenn Token nicht verfügbar: Worker requeued den Job mit available_at = now() + retry_after und gibt den Slot frei (kein Spinning).
  • Audit: Rate-Limit-Hits werden nicht je Treffer auditiert (zu laut), sondern als aggregierte Metrik in Beobachtbarkeit.

Dieser leichtgewichtige In-Process-Bucket reicht für V1. Bei Mehrinstanz-Setup mit harten Provider-Caps ist später ein zentraler Token-Store (Redis) nötig — nicht V1.

Stale-Claim-Recovery

Worker können crashen, bevor sie einen Job zu Ende führen. Ein Recovery-Pfad läuft regelmäßig (eigener job_kind='cleanup_jobs'-Job alle 5 Minuten):

UPDATE mdm.AA_JOB
SET status = 'pending',
claimed_by = NULL,
claimed_at = NULL,
last_error = COALESCE(last_error, '') || ' [stale-claim recovered]'
WHERE status = 'running'
AND claimed_at < now() - INTERVAL '15 minutes';

Stale-TTL ist konfigurierbar pro job_kind (Email kurz, Bulk-Import lang). Recovery zählt nicht als zusätzlicher Versuch — attempts bleibt unverändert.

DLQ-Verwaltung

  • View mdm.v_job_dlq: alle Jobs in dlq-Status mit last_error, payload, tenant_id.
  • Replay-Endpoint (Admin only, manage_metadata-Permission): setzt status='pending', attempts=0, available_at=now(). Audit action='job_replay'.
  • Discard-Endpoint (Admin only): hartes Delete plus Audit action='job_discard' mit payload-Snapshot.

Idempotenz

  • Enqueue-Idempotenz: Optional idempotency_key; bei Konflikt UPDATE-or-IGNORE im Service.
  • Ausführungs-Idempotenz: Worker-Code muss Doppel-Ausführung tolerieren (z. B. Provider hat geantwortet, aber DB-Commit ging verloren). Job-Kind-spezifisch — Email nutzt provider_message_id als Dedup-Schlüssel im Email-Log.

Audit-Aktionen für Jobs

mdm.audit_action wird um 'job_enqueued', 'job_done', 'job_dlq', 'job_replay', 'job_discard' erweitert. Job-Lebenszyklus-Aktionen schreiben actor_principal_id = einleitende Identität (Service-Account des Workers für maschinelle Übergänge, eingeloggter User für Admin-Aktionen).

Pro-Versand-Aktionen wie email_sent sind kind-spezifisch und werden zusätzlich geschrieben.

Beobachtbarkeit

Per Worker und Job-Kind exportierte Metriken:

  • mdm_jobs_pending{tenant,kind}
  • mdm_jobs_running{tenant,kind}
  • mdm_jobs_dlq{tenant,kind}
  • mdm_jobs_claim_latency_seconds{kind} (Histogram: now() − created_at beim Claim)
  • mdm_jobs_processing_seconds{kind} (Histogram)
  • mdm_jobs_attempts_total{kind,outcome} (Counter)

Detail siehe Beobachtbarkeit.

Verwandte Dokumente