Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/mollifier.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: feature
---

Mollifier — Redis-backed burst buffer in front of `engine.trigger` with a fair drainer, full read/write parity for buffered runs across the API + dashboard + realtime stream, alertable `mollifier.stale_entries.current` gauge for drainer health, and `runFailed` alerts on drainer-terminal `SYSTEM_FAILURE` rows.
2 changes: 2 additions & 0 deletions apps/webapp/app/entry.server.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { renderToPipeableStream } from "react-dom/server";
import { PassThrough } from "stream";
import * as Worker from "~/services/worker.server";
import { initMollifierDrainerWorker } from "~/v3/mollifierDrainerWorker.server";
import { initMollifierStaleSweepWorker } from "~/v3/mollifierStaleSweepWorker.server";
import { bootstrap } from "./bootstrap";
import { LocaleContextProvider } from "./components/primitives/LocaleProvider";
import {
Expand Down Expand Up @@ -228,6 +229,7 @@ Worker.init().catch((error) => {
});

initMollifierDrainerWorker();
initMollifierStaleSweepWorker();

bootstrap().catch((error) => {
logError(error);
Expand Down
37 changes: 30 additions & 7 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1063,13 +1063,16 @@ const EnvironmentSchema = z
// Separate switch for the drainer (consumer side) so it can be split
// off onto a dedicated worker service. Unset → inherits
// TRIGGER_MOLLIFIER_ENABLED, so single-container self-hosters don't have to
// flip two switches. In multi-replica deployments, set this to "0"
// explicitly on every replica except the one dedicated drainer
// service — otherwise every replica's polling loop races for the
// same buffer entries. `TRIGGER_MOLLIFIER_ENABLED` is still the master kill
// switch; setting this to "1" while `TRIGGER_MOLLIFIER_ENABLED` is "0" is a
// no-op because the gate-side singleton refuses to construct a
// buffer when the system is off.
// flip two switches. Multi-replica drainers are correct — `popAndMarkDraining`
// is an atomic ZPOPMIN + status flip in one Lua call, so only one replica
// can win any given entry — but inefficient: polling load (SMEMBERS +
// per-env scans) multiplies by N, and `TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY`
// is per-process so engine load also multiplies. Splitting the drainer
// onto a dedicated worker keeps that traffic off the request-serving
// replicas. `TRIGGER_MOLLIFIER_ENABLED` is still the master kill switch;
// setting this to "1" while `TRIGGER_MOLLIFIER_ENABLED` is "0" is a
// no-op because the gate-side singleton refuses to construct a buffer
// when the system is off.
TRIGGER_MOLLIFIER_DRAINER_ENABLED: z.string().default(process.env.TRIGGER_MOLLIFIER_ENABLED ?? "0"),
TRIGGER_MOLLIFIER_SHADOW_MODE: z.string().default("0"),
TRIGGER_MOLLIFIER_REDIS_HOST: z
Expand Down Expand Up @@ -1098,6 +1101,26 @@ const EnvironmentSchema = z
TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS: z.coerce.number().int().positive().default(3),
TRIGGER_MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().positive().default(30_000),
TRIGGER_MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK: z.coerce.number().int().positive().default(500),
// Periodic sweep that scans buffer queue ZSETs for entries whose
// dwell exceeds the stale threshold. Independent of the drainer —
// its job is exactly to make a stuck/offline drainer visible to
// ops. Defaults: enabled when the mollifier is enabled, run every
// 5 minutes, alert on anything that's been dwelling for 5+ minutes
// (matches the sweep interval — "anything still here when we
// check" is the simplest threshold that converges).
TRIGGER_MOLLIFIER_STALE_SWEEP_ENABLED: z
.string()
.default(process.env.TRIGGER_MOLLIFIER_ENABLED ?? "0"),
TRIGGER_MOLLIFIER_STALE_SWEEP_INTERVAL_MS: z.coerce
.number()
.int()
.positive()
.default(5 * 60_000),
TRIGGER_MOLLIFIER_STALE_SWEEP_THRESHOLD_MS: z.coerce
.number()
.int()
.positive()
.default(5 * 60_000),

BATCH_TRIGGER_PROCESS_JOB_VISIBILITY_TIMEOUT_MS: z.coerce
.number()
Expand Down
27 changes: 27 additions & 0 deletions apps/webapp/app/runEngine/services/triggerFailedTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import type { PrismaClientOrTransaction } from "@trigger.dev/database";
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { getEventRepository } from "~/v3/eventRepository/index.server";
import { PerformTaskRunAlertsService } from "~/v3/services/alerts/performTaskRunAlerts.server";
import { DefaultQueueManager } from "../concerns/queues.server";
import type { TriggerTaskRequest } from "../types";

Expand Down Expand Up @@ -176,6 +177,14 @@ export class TriggerFailedTaskService {
event.setAttribute("runId", failedRunFriendlyId);
event.failWithError(taskRunError);

// `emitRunFailedEvent: false` because this call site owns the
// trace-event lifecycle via the outer `traceEvent({
// incomplete: false, isError: true })`. Letting the engine
// emit `runFailed` here would race the
// `completeFailedRunEvent` listener against the outer trace
// event's own completion write for the same (traceId, spanId).
// We re-trigger the alerts side directly after the trace
// event closes, below.
return await this.engine.createFailedTaskRun({
friendlyId: failedRunFriendlyId,
environment: {
Expand All @@ -200,12 +209,30 @@ export class TriggerFailedTaskService {
spanId: event.spanId,
traceContext: traceContext as Record<string, unknown>,
taskEventStore: store,
emitRunFailedEvent: false,
...(queueName !== undefined && { queue: queueName }),
...(lockedQueueId !== undefined && { lockedQueueId }),
});
}
);

// Alerts side of `runFailed` — the engine emit was suppressed
// above so the trace-event completion isn't double-written; we
// still need the alert pipeline to fire so customers' ERROR
// channels see the failure. Best-effort: a failed enqueue logs
// but doesn't block returning the friendlyId, mirroring the
// engine handler's behaviour at runEngineHandlers.server.ts:81.
try {
await PerformTaskRunAlertsService.enqueue(failedRun.id);
} catch (alertsError) {
logger.warn("TriggerFailedTaskService: alert enqueue failed", {
taskId: request.taskId,
friendlyId: failedRun.friendlyId,
error:
alertsError instanceof Error ? alertsError.message : String(alertsError),
});
}

return failedRun.friendlyId;
} catch (createError) {
const createErrorMsg =
Expand Down
48 changes: 13 additions & 35 deletions apps/webapp/app/v3/mollifier/mollifierDrainer.server.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import { createHash } from "node:crypto";
import { MollifierDrainer, serialiseSnapshot } from "@trigger.dev/redis-worker";
import { MollifierDrainer } from "@trigger.dev/redis-worker";
import { prisma } from "~/db.server";
import { env } from "~/env.server";
import { engine as runEngine } from "~/v3/runEngine.server";
import { logger } from "~/services/logger.server";
import { singleton } from "~/utils/singleton";
import { getMollifierBuffer } from "./mollifierBuffer.server";
import type { BufferedTriggerPayload } from "./bufferedTriggerPayload.server";
import {
createDrainerHandler,
isRetryablePgError,
} from "./mollifierDrainerHandler.server";
import type { MollifierSnapshot } from "./mollifierSnapshot.server";

// Distinct error class for the deterministic "fail loud at boot" throws
// below. The bootstrap in `mollifierDrainerWorker.server.ts` catches
Expand All @@ -25,7 +30,7 @@ export class MollifierConfigurationError extends Error {
}
}

function initializeMollifierDrainer(): MollifierDrainer<BufferedTriggerPayload> {
function initializeMollifierDrainer(): MollifierDrainer<MollifierSnapshot> {
const buffer = getMollifierBuffer();
if (!buffer) {
// Unreachable in normal config: getMollifierDrainer() gates on the
Expand Down Expand Up @@ -68,40 +73,13 @@ function initializeMollifierDrainer(): MollifierDrainer<BufferedTriggerPayload>
maxAttempts: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS,
});

// Phase 1 handler: no-op ack. The trigger has ALREADY been written to
// Postgres via engine.trigger (dual-write at the call site). Popping +
// acking here proves the dequeue mechanism works end-to-end without
// duplicating the work. Phase 2 will replace this with an engine.trigger
// replay that performs the actual Postgres write.
const drainer = new MollifierDrainer<BufferedTriggerPayload>({
const drainer = new MollifierDrainer<MollifierSnapshot>({
buffer,
handler: async (input) => {
// Hash the (re-serialised, canonical) payload on the drain side rather
// than on the trigger hot path. Burst-time CPU stays with engine.trigger;
// the drainer is the natural place for the audit-equivalence checksum.
// Re-serialisation is identity for the BufferedTriggerPayload shape
// (only strings/numbers/plain objects), so this hash matches what the
// call site wrote into Redis.
const reserialised = serialiseSnapshot(input.payload);
const payloadHash = createHash("sha256").update(reserialised).digest("hex");
logger.info("mollifier.drained", {
runId: input.runId,
envId: input.envId,
orgId: input.orgId,
taskId: input.payload.taskId,
attempts: input.attempts,
ageMs: Date.now() - input.createdAt.getTime(),
payloadBytes: reserialised.length,
payloadHash,
});
},
handler: createDrainerHandler({ engine: runEngine, prisma }),
concurrency: env.TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY,
maxAttempts: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS,
maxOrgsPerTick: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK,
// A no-op handler shouldn't throw, but if something does (e.g. an
// unexpected deserialise failure), don't loop — let it FAIL terminally
// so the entry is observable in metrics.
isRetryable: () => false,
isRetryable: isRetryablePgError,
});

return drainer;
Expand All @@ -114,7 +92,7 @@ function initializeMollifierDrainer(): MollifierDrainer<BufferedTriggerPayload>
// handler registration, leaving a narrow window where a SIGTERM landing
// between `start()` and `process.once("SIGTERM", ...)` would skip the
// graceful stop. The split is intentional.
export function getMollifierDrainer(): MollifierDrainer<BufferedTriggerPayload> | null {
export function getMollifierDrainer(): MollifierDrainer<MollifierSnapshot> | null {
if (env.TRIGGER_MOLLIFIER_ENABLED !== "1") return null;
return singleton("mollifierDrainer", initializeMollifierDrainer);
}
170 changes: 170 additions & 0 deletions apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import { context, trace, TraceFlags } from "@opentelemetry/api";
import type { RunEngine } from "@internal/run-engine";
import type { PrismaClientOrTransaction } from "@trigger.dev/database";
import type { MollifierDrainerHandler } from "@trigger.dev/redis-worker";
import { startSpan } from "~/v3/tracing.server";
import type { MollifierSnapshot } from "./mollifierSnapshot.server";

const tracer = trace.getTracer("mollifier-drainer");

export function isRetryablePgError(err: unknown): boolean {
if (!(err instanceof Error)) return false;
const msg = err.message ?? "";
// Prisma surfaces P1001 ("Can't reach database server") via two
// different error classes — `PrismaClientKnownRequestError` exposes
// it as `err.code`, `PrismaClientInitializationError` exposes it as
// `err.errorCode`. Check both so reconnection-time errors retry
// regardless of which class fires.
const code = (err as { code?: string }).code;
const errorCode = (err as { errorCode?: string }).errorCode;
if (code === "P2024") return true;
if (code === "P1001" || errorCode === "P1001") return true;
if (msg.includes("Can't reach database server")) return true;
if (msg.includes("Connection lost")) return true;
if (msg.includes("ECONNRESET")) return true;
return false;
}
Comment thread
d-cs marked this conversation as resolved.

export function createDrainerHandler(deps: {
engine: RunEngine;
prisma: PrismaClientOrTransaction;
}): MollifierDrainerHandler<MollifierSnapshot> {
return async (input) => {
const dwellMs = Date.now() - input.createdAt.getTime();

// Re-attach to the trace started by the caller's mollifier.queued span
// (its traceId + spanId were captured into the snapshot at buffer time).
// Without this the drainer would emit mollifier.drained in a brand-new
// trace and the engine.trigger instrumentation would inherit an empty
// active context — leaving the run-detail page with only the root span.
const snapshotTraceId =
typeof input.payload.traceId === "string" ? input.payload.traceId : undefined;
const snapshotSpanId =
typeof input.payload.spanId === "string" ? input.payload.spanId : undefined;

const parentContext =
snapshotTraceId && snapshotSpanId
? trace.setSpanContext(context.active(), {
traceId: snapshotTraceId,
spanId: snapshotSpanId,
traceFlags: TraceFlags.SAMPLED,
isRemote: true,
})
: context.active();

// Cancel-wins-over-trigger (Q4 bifurcation). If a cancel API call
// landed on this entry while it was QUEUED, the snapshot carries
// `cancelledAt` + `cancelReason`. Skip the normal materialise path
// and write a CANCELED PG row directly. The existing runCancelled
// handler writes the TaskEvent.
const cancelledAtStr =
typeof input.payload.cancelledAt === "string" ? input.payload.cancelledAt : undefined;
if (cancelledAtStr) {
const cancelReason =
typeof input.payload.cancelReason === "string"
? input.payload.cancelReason
: "Canceled by user";
await context.with(parentContext, async () => {
await startSpan(tracer, "mollifier.drained.cancelled", async (span) => {
span.setAttribute("mollifier.drained", true);
span.setAttribute("mollifier.dwell_ms", dwellMs);
span.setAttribute("mollifier.attempts", input.attempts);
span.setAttribute("mollifier.run_friendly_id", input.runId);
span.setAttribute("mollifier.cancel_bifurcation", true);
span.setAttribute("taskRunId", input.runId);
await deps.engine.createCancelledRun(
{
snapshot: input.payload as any,
cancelledAt: new Date(cancelledAtStr),
cancelReason,
},
deps.prisma,
);
});
});
return;
}

await context.with(parentContext, async () => {
await startSpan(tracer, "mollifier.drained", async (span) => {
span.setAttribute("mollifier.drained", true);
span.setAttribute("mollifier.dwell_ms", dwellMs);
span.setAttribute("mollifier.attempts", input.attempts);
span.setAttribute("mollifier.run_friendly_id", input.runId);
span.setAttribute("taskRunId", input.runId);

try {
await deps.engine.trigger(input.payload as any, deps.prisma);
} catch (err) {
// The retryable-PG class re-throws so the drainer's outer
// worker loop can `buffer.requeue` (handled in
// `MollifierDrainer.drainOne`). For non-retryable failures we
// write a terminal SYSTEM_FAILURE row to PG via the engine's
// existing `createFailedTaskRun` (used by batch-trigger for
// the same purpose) so the customer sees the run in their
// dashboard / SDK instead of silently losing it when the
// buffer entry TTLs out. If THAT insert also fails (PG truly
// unreachable), rethrow so the drainer's outer catch falls
// through to its existing `buffer.fail` terminal-marker path.
if (isRetryablePgError(err)) {
throw err;
}
const reason = err instanceof Error ? err.message : String(err);
span.setAttribute("mollifier.terminal_failure_reason", reason);
const snapshot = input.payload as Record<string, unknown>;
const env = snapshot.environment as
| {
id: string;
type: any;
project: { id: string };
organization: { id: string };
}
| undefined;
if (!env) {
// Snapshot too malformed to even construct a TaskRun row.
// Drainer's outer catch will buffer.fail this entry.
throw err;
}
try {
await deps.engine.createFailedTaskRun({
friendlyId: input.runId,
environment: env,
taskIdentifier: String(snapshot.taskIdentifier ?? ""),
payload: typeof snapshot.payload === "string" ? snapshot.payload : undefined,
payloadType:
typeof snapshot.payloadType === "string" ? snapshot.payloadType : undefined,
error: {
type: "STRING_ERROR",
raw: `Mollifier drainer terminal failure: ${reason}`,
},
parentTaskRunId:
typeof snapshot.parentTaskRunId === "string"
? snapshot.parentTaskRunId
: undefined,
rootTaskRunId:
typeof snapshot.rootTaskRunId === "string"
? snapshot.rootTaskRunId
: undefined,
depth: typeof snapshot.depth === "number" ? snapshot.depth : 0,
resumeParentOnCompletion: snapshot.resumeParentOnCompletion === true,
traceId: typeof snapshot.traceId === "string" ? snapshot.traceId : undefined,
spanId: typeof snapshot.spanId === "string" ? snapshot.spanId : undefined,
taskEventStore:
typeof snapshot.taskEventStore === "string"
? snapshot.taskEventStore
: undefined,
queue: typeof snapshot.queue === "string" ? snapshot.queue : undefined,
lockedQueueId:
typeof snapshot.lockedQueueId === "string" ? snapshot.lockedQueueId : undefined,
});
} catch (writeErr) {
// Class A — PG itself is failing. Rethrow the original
// error so the drainer falls back to buffer.fail. Include
// the write error in the log line at the drainer layer.
throw err;
}
}
});
});
};
}
Loading