From adbb9eaee0d077a4bb168466a1256411caa11813 Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Tue, 26 May 2026 11:06:11 +0100 Subject: [PATCH 1/3] feat(redis-worker,webapp): mollifier buffer extensions + snapshot type MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the buffer-side data layer used by phase-3 work: - buffer.ts gains entry inspection (getEntry), idempotency lookup (lookupIdempotency), in-place snapshot mutation (mutateSnapshot), and dwell tracking — all atomic via Lua. - snapshot.server.ts: shared MollifierSnapshot type + (de)serialise. - Drops the entry-TTL config — the drainer is the recovery mechanism. Adds methods to the buffer interface; nothing consumes them yet. Subsequent PRs in the stack wire trigger-time mollify, read-fallback, and mutation paths against this surface. Co-Authored-By: Claude Opus 4.7 (1M context) --- .changeset/mollifier-buffer-extensions.md | 6 + apps/webapp/app/env.server.ts | 1 - .../v3/mollifier/mollifierBuffer.server.ts | 1 - .../v3/mollifier/mollifierSnapshot.server.ts | 16 + .../redis-worker/src/mollifier/buffer.test.ts | 1221 +++++++++++++++-- packages/redis-worker/src/mollifier/buffer.ts | 646 ++++++++- packages/redis-worker/src/mollifier/index.ts | 11 +- .../redis-worker/src/mollifier/schemas.ts | 22 + 8 files changed, 1786 insertions(+), 138 deletions(-) create mode 100644 .changeset/mollifier-buffer-extensions.md create mode 100644 apps/webapp/app/v3/mollifier/mollifierSnapshot.server.ts diff --git a/.changeset/mollifier-buffer-extensions.md b/.changeset/mollifier-buffer-extensions.md new file mode 100644 index 00000000000..b1f38f51ecc --- /dev/null +++ b/.changeset/mollifier-buffer-extensions.md @@ -0,0 +1,6 @@ +--- +"@trigger.dev/redis-worker": minor +"@trigger.dev/core": patch +--- + +Mollifier buffer feature set built on top of the initial primitives: idempotency-lookup with SETNX dedup, atomic snapshot-mutation API (`mutateSnapshot` with tag/metadata/delay/cancel patches), metadata CAS for lossless concurrent updates, watermark-paginated listing, claim primitives for pre-gate idempotency, ZSET-backed per-env queue, 30s post-ack grace TTL, and drop the accept-time entry TTL (drainer is now the only removal mechanism). `@trigger.dev/core` gains an optional `notice` field on the trigger response so the SDK can surface mollifier-queued guidance to customers. diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 97b8333e279..9e4743d740f 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -1095,7 +1095,6 @@ const EnvironmentSchema = z TRIGGER_MOLLIFIER_TRIP_THRESHOLD: z.coerce.number().int().positive().default(100), TRIGGER_MOLLIFIER_HOLD_MS: z.coerce.number().int().positive().default(500), TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY: z.coerce.number().int().positive().default(50), - TRIGGER_MOLLIFIER_ENTRY_TTL_S: z.coerce.number().int().positive().default(600), TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS: z.coerce.number().int().positive().default(3), TRIGGER_MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().positive().default(30_000), TRIGGER_MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK: z.coerce.number().int().positive().default(500), diff --git a/apps/webapp/app/v3/mollifier/mollifierBuffer.server.ts b/apps/webapp/app/v3/mollifier/mollifierBuffer.server.ts index 9c8917623e4..09b52aa9da3 100644 --- a/apps/webapp/app/v3/mollifier/mollifierBuffer.server.ts +++ b/apps/webapp/app/v3/mollifier/mollifierBuffer.server.ts @@ -22,7 +22,6 @@ function initializeMollifierBuffer(): MollifierBuffer { enableAutoPipelining: true, ...(env.TRIGGER_MOLLIFIER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), }, - entryTtlSeconds: env.TRIGGER_MOLLIFIER_ENTRY_TTL_S, }); } diff --git a/apps/webapp/app/v3/mollifier/mollifierSnapshot.server.ts b/apps/webapp/app/v3/mollifier/mollifierSnapshot.server.ts new file mode 100644 index 00000000000..a0732a3542e --- /dev/null +++ b/apps/webapp/app/v3/mollifier/mollifierSnapshot.server.ts @@ -0,0 +1,16 @@ +import { serialiseSnapshot, deserialiseSnapshot } from "@trigger.dev/redis-worker"; + +// MollifierSnapshot is the JSON-serialisable shape of the input that would be +// passed to engine.trigger(). The drainer deserialises and replays it. +// Kept as Record at this layer — the engine.trigger call site +// casts it to the engine's typed input. This keeps the mollifier subdirectory +// from depending on @internal/run-engine internals. +export type MollifierSnapshot = Record; + +export function serialiseMollifierSnapshot(input: MollifierSnapshot): string { + return serialiseSnapshot(input); +} + +export function deserialiseMollifierSnapshot(serialised: string): MollifierSnapshot { + return deserialiseSnapshot(serialised); +} diff --git a/packages/redis-worker/src/mollifier/buffer.test.ts b/packages/redis-worker/src/mollifier/buffer.test.ts index c8f7b95c97a..a4c1be35eb3 100644 --- a/packages/redis-worker/src/mollifier/buffer.test.ts +++ b/packages/redis-worker/src/mollifier/buffer.test.ts @@ -20,12 +20,14 @@ describe("schemas", () => { status: "QUEUED", attempts: "0", createdAt: "2026-05-11T10:00:00.000Z", + createdAtMicros: "1747044000000000", }; const parsed = BufferEntrySchema.parse(raw); expect(parsed.runId).toBe("run_abc"); expect(parsed.status).toBe("QUEUED"); expect(parsed.attempts).toBe(0); expect(parsed.createdAt).toBeInstanceOf(Date); + expect(parsed.createdAtMicros).toBe(1747044000000000); }); it("BufferEntrySchema parses a FAILED entry with lastError", () => { @@ -37,6 +39,7 @@ describe("schemas", () => { status: "FAILED", attempts: "3", createdAt: "2026-05-11T10:00:00.000Z", + createdAtMicros: "1747044000000000", lastError: JSON.stringify({ code: "P2024", message: "connection lost" }), }; const parsed = BufferEntrySchema.parse(raw); @@ -52,7 +55,6 @@ describe("MollifierBuffer construction", () => { port: redisContainer.getPort(), password: redisContainer.getPassword(), }, - entryTtlSeconds: 600, logger: new Logger("test", "log"), }); @@ -68,7 +70,6 @@ describe("MollifierBuffer.accept", () => { port: redisContainer.getPort(), password: redisContainer.getPassword(), }, - entryTtlSeconds: 600, logger: new Logger("test", "log"), }); @@ -105,7 +106,6 @@ describe("MollifierBuffer.pop", () => { port: redisContainer.getPort(), password: redisContainer.getPassword(), }, - entryTtlSeconds: 600, logger: new Logger("test", "log"), }); @@ -132,7 +132,6 @@ describe("MollifierBuffer.pop", () => { port: redisContainer.getPort(), password: redisContainer.getPassword(), }, - entryTtlSeconds: 600, logger: new Logger("test", "log"), }); @@ -151,7 +150,6 @@ describe("MollifierBuffer.pop", () => { port: redisContainer.getPort(), password: redisContainer.getPassword(), }, - entryTtlSeconds: 600, logger: new Logger("test", "log"), }); @@ -169,24 +167,56 @@ describe("MollifierBuffer.pop", () => { }); describe("MollifierBuffer.ack", () => { - redisTest("ack deletes the entry", { timeout: 20_000 }, async ({ redisContainer }) => { + redisTest( + "ack marks entry materialised and applies the grace TTL — entry persists as a read-fallback safety net", + { timeout: 20_000 }, + async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + + try { + await buffer.accept({ runId: "run_x", envId: "env_a", orgId: "org_1", payload: "{}" }); + await buffer.pop("env_a"); + await buffer.ack("run_x"); + + const after = await buffer.getEntry("run_x"); + expect(after).not.toBeNull(); + expect(after!.materialised).toBe(true); + + // ack grace TTL is the only context where an entry hash gets + // an EXPIRE — accept no longer sets one. Should be at most 30s. + const ttl = await buffer.getEntryTtlSeconds("run_x"); + expect(ttl).toBeGreaterThan(0); + expect(ttl).toBeLessThanOrEqual(30); + } finally { + await buffer.close(); + } + }, + ); + + redisTest("ack on missing entry is a no-op", { timeout: 20_000 }, async ({ redisContainer }) => { const buffer = new MollifierBuffer({ redisOptions: { host: redisContainer.getHost(), port: redisContainer.getPort(), password: redisContainer.getPassword(), }, - entryTtlSeconds: 600, logger: new Logger("test", "log"), }); try { - await buffer.accept({ runId: "run_x", envId: "env_a", orgId: "org_1", payload: "{}" }); - await buffer.pop("env_a"); - await buffer.ack("run_x"); - - const after = await buffer.getEntry("run_x"); - expect(after).toBeNull(); + await buffer.ack("run_ghost"); + const stored = await buffer.getEntry("run_ghost"); + expect(stored).toBeNull(); + // Critical: no partial hash created. + const raw = await buffer["redis"].hgetall("mollifier:entries:run_ghost"); + expect(Object.keys(raw)).toHaveLength(0); } finally { await buffer.close(); } @@ -204,13 +234,12 @@ describe("MollifierBuffer.pop orphan handling", () => { port: redisContainer.getPort(), password: redisContainer.getPassword(), }, - entryTtlSeconds: 600, logger: new Logger("test", "log"), }); try { // Simulate a TTL-expired orphan: queue ref exists, entry hash does not. - await buffer["redis"].lpush("mollifier:queue:env_a", "run_orphan"); + await buffer["redis"].zadd("mollifier:queue:env_a", 1, "run_orphan"); const popped = await buffer.pop("env_a"); expect(popped).toBeNull(); @@ -220,7 +249,7 @@ describe("MollifierBuffer.pop orphan handling", () => { expect(Object.keys(raw)).toHaveLength(0); // Queue is drained — the loop pops orphans until empty. - const qLen = await buffer["redis"].llen("mollifier:queue:env_a"); + const qLen = await buffer["redis"].zcard("mollifier:queue:env_a"); expect(qLen).toBe(0); } finally { await buffer.close(); @@ -238,17 +267,16 @@ describe("MollifierBuffer.pop orphan handling", () => { port: redisContainer.getPort(), password: redisContainer.getPassword(), }, - entryTtlSeconds: 600, logger: new Logger("test", "log"), }); try { - // Layout (oldest-first, since RPOP takes from tail): orphan, valid, orphan. - // LPUSH puts items at the head, so to get RPOP order [orphan_a, valid, orphan_b] - // we LPUSH in reverse: orphan_b first, then valid, then orphan_a. - await buffer["redis"].lpush("mollifier:queue:env_a", "orphan_b"); + // Layout by score (lowest-first, since ZPOPMIN takes the min): + // orphan_a (score 1) → valid (score = its createdAtMicros, large) → orphan_b (score 1e18). + // First pop skips orphan_a, returns valid; orphan_b remains. + await buffer["redis"].zadd("mollifier:queue:env_a", 1, "orphan_a"); await buffer.accept({ runId: "valid", envId: "env_a", orgId: "org_1", payload: "{}" }); - await buffer["redis"].lpush("mollifier:queue:env_a", "orphan_a"); + await buffer["redis"].zadd("mollifier:queue:env_a", 1e18, "orphan_b"); const popped = await buffer.pop("env_a"); expect(popped).not.toBeNull(); @@ -256,7 +284,7 @@ describe("MollifierBuffer.pop orphan handling", () => { expect(popped!.status).toBe("DRAINING"); // The trailing orphan_b is still in the queue (single pop call). - const remaining = await buffer["redis"].llen("mollifier:queue:env_a"); + const remaining = await buffer["redis"].zcard("mollifier:queue:env_a"); expect(remaining).toBe(1); // A second pop drains the trailing orphan_b. The queue is now @@ -283,7 +311,6 @@ describe("MollifierBuffer.requeue", () => { port: redisContainer.getPort(), password: redisContainer.getPassword(), }, - entryTtlSeconds: 600, logger: new Logger("test", "log"), }); @@ -305,30 +332,43 @@ describe("MollifierBuffer.requeue", () => { }); describe("MollifierBuffer.fail", () => { - redisTest("fail transitions to FAILED and stores lastError", { timeout: 20_000 }, async ({ redisContainer }) => { - const buffer = new MollifierBuffer({ - redisOptions: { - host: redisContainer.getHost(), - port: redisContainer.getPort(), - password: redisContainer.getPassword(), - }, - entryTtlSeconds: 600, - logger: new Logger("test", "log"), - }); + redisTest( + "fail returns true and tears the entry down (drainer-terminal cleanup)", + { timeout: 20_000 }, + async ({ redisContainer }) => { + // Post-TTL-drop design: the drainer's createFailedTaskRun has + // already written a SYSTEM_FAILURE PG row by the time we call + // fail(), so the entry hash is no longer load-bearing. fail + // returns true and removes the entry; without this teardown + // failed entries would accrete forever now that there's no + // accept-time TTL. The Lua also DELs the idempotency lookup so + // future retries with the same key go through to PG instead of + // hitting an orphan dedup record. + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); - try { - await buffer.accept({ runId: "run_f", envId: "env_a", orgId: "org_1", payload: "{}" }); - await buffer.pop("env_a"); - const failed = await buffer.fail("run_f", { code: "VALIDATION", message: "boom" }); - expect(failed).toBe(true); + try { + await buffer.accept({ runId: "run_f", envId: "env_a", orgId: "org_1", payload: "{}" }); + await buffer.pop("env_a"); + const failed = await buffer.fail("run_f", { code: "VALIDATION", message: "boom" }); + expect(failed).toBe(true); - const entry = await buffer.getEntry("run_f"); - expect(entry!.status).toBe("FAILED"); - expect(entry!.lastError).toEqual({ code: "VALIDATION", message: "boom" }); - } finally { - await buffer.close(); - } - }); + // Entry hash is gone post-fail. + const entry = await buffer.getEntry("run_f"); + expect(entry).toBeNull(); + const raw = await buffer["redis"].hgetall("mollifier:entries:run_f"); + expect(Object.keys(raw)).toHaveLength(0); + } finally { + await buffer.close(); + } + }, + ); redisTest( "fail on missing entry is a no-op (returns false; no partial hash created)", @@ -340,7 +380,6 @@ describe("MollifierBuffer.fail", () => { port: redisContainer.getPort(), password: redisContainer.getPassword(), }, - entryTtlSeconds: 600, logger: new Logger("test", "log"), }); @@ -361,27 +400,35 @@ describe("MollifierBuffer.fail", () => { }); describe("MollifierBuffer TTL", () => { - redisTest("entry has TTL applied on accept", { timeout: 20_000 }, async ({ redisContainer }) => { - const buffer = new MollifierBuffer({ - redisOptions: { - host: redisContainer.getHost(), - port: redisContainer.getPort(), - password: redisContainer.getPassword(), - }, - entryTtlSeconds: 600, - logger: new Logger("test", "log"), - }); + redisTest( + "entry has NO TTL applied on accept — drainer is the only cleanup path", + { timeout: 20_000 }, + async ({ redisContainer }) => { + // Regression guard for the design change: buffer entries must + // persist until the drainer ACKs or FAILs them. An accept-time + // EXPIRE would re-introduce the silent-loss-when-drainer-offline + // failure mode that the stale-entry alerting pipeline depends on + // *not* happening. + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); - try { - await buffer.accept({ runId: "run_t", envId: "env_a", orgId: "org_1", payload: "{}" }); + try { + await buffer.accept({ runId: "run_t", envId: "env_a", orgId: "org_1", payload: "{}" }); - const ttl = await buffer.getEntryTtlSeconds("run_t"); - expect(ttl).toBeGreaterThan(0); - expect(ttl).toBeLessThanOrEqual(600); - } finally { - await buffer.close(); - } - }); + // Redis returns -1 when the key exists but has no TTL set. + const ttl = await buffer.getEntryTtlSeconds("run_t"); + expect(ttl).toBe(-1); + } finally { + await buffer.close(); + } + }, + ); }); describe("MollifierBuffer payload encoding", () => { @@ -395,7 +442,6 @@ describe("MollifierBuffer payload encoding", () => { port: redisContainer.getPort(), password: redisContainer.getPassword(), }, - entryTtlSeconds: 600, logger: new Logger("test", "log"), }); @@ -437,7 +483,6 @@ describe("MollifierBuffer.requeue on missing entry", () => { port: redisContainer.getPort(), password: redisContainer.getPassword(), }, - entryTtlSeconds: 600, logger: new Logger("test", "log"), }); @@ -458,22 +503,27 @@ describe("MollifierBuffer.requeue on missing entry", () => { describe("MollifierBuffer.requeue ordering", () => { redisTest( - "requeued entry is popped AFTER other queued entries on the same env (FIFO retry)", + "requeued entry retains its original createdAt and pops next (oldest-first by createdAt)", { timeout: 20_000 }, async ({ redisContainer }) => { + // Score == createdAtMicros; requeue does not bump the score. The + // oldest entry continues to pop first across retries. `maxAttempts` + // in the drainer bounds the retry loop for a persistently failing + // entry (after which it goes to the `fail` path, not requeue). const buffer = new MollifierBuffer({ redisOptions: { host: redisContainer.getHost(), port: redisContainer.getPort(), password: redisContainer.getPassword(), }, - entryTtlSeconds: 600, logger: new Logger("test", "log"), }); try { await buffer.accept({ runId: "a", envId: "env_a", orgId: "org_1", payload: "{}" }); + await new Promise((r) => setTimeout(r, 2)); await buffer.accept({ runId: "b", envId: "env_a", orgId: "org_1", payload: "{}" }); + await new Promise((r) => setTimeout(r, 2)); await buffer.accept({ runId: "c", envId: "env_a", orgId: "org_1", payload: "{}" }); const first = await buffer.pop("env_a"); @@ -481,12 +531,13 @@ describe("MollifierBuffer.requeue ordering", () => { await buffer.requeue("a"); + // a still has the smallest createdAtMicros → pops next. const next = await buffer.pop("env_a"); - expect(next!.runId).toBe("b"); + expect(next!.runId).toBe("a"); const after = await buffer.pop("env_a"); - expect(after!.runId).toBe("c"); + expect(after!.runId).toBe("b"); const last = await buffer.pop("env_a"); - expect(last!.runId).toBe("a"); + expect(last!.runId).toBe("c"); } finally { await buffer.close(); } @@ -508,7 +559,6 @@ describe("MollifierBuffer.evaluateTrip", () => { port: redisContainer.getPort(), password: redisContainer.getPassword(), }, - entryTtlSeconds: 600, logger: new Logger("test", "log"), }); @@ -530,7 +580,6 @@ describe("MollifierBuffer.evaluateTrip", () => { port: redisContainer.getPort(), password: redisContainer.getPassword(), }, - entryTtlSeconds: 600, logger: new Logger("test", "log"), }); @@ -557,7 +606,6 @@ describe("MollifierBuffer.evaluateTrip", () => { port: redisContainer.getPort(), password: redisContainer.getPassword(), }, - entryTtlSeconds: 600, logger: new Logger("test", "log"), }); @@ -585,7 +633,6 @@ describe("MollifierBuffer.evaluateTrip", () => { port: redisContainer.getPort(), password: redisContainer.getPassword(), }, - entryTtlSeconds: 600, logger: new Logger("test", "log"), }); @@ -610,7 +657,6 @@ describe("MollifierBuffer.evaluateTrip", () => { port: redisContainer.getPort(), password: redisContainer.getPassword(), }, - entryTtlSeconds: 600, logger: new Logger("test", "log"), }); @@ -638,7 +684,6 @@ describe("MollifierBuffer.evaluateTrip", () => { port: redisContainer.getPort(), password: redisContainer.getPassword(), }, - entryTtlSeconds: 600, logger: new Logger("test", "log"), }); @@ -671,7 +716,6 @@ describe("MollifierBuffer.evaluateTrip", () => { port: redisContainer.getPort(), password: redisContainer.getPassword(), }, - entryTtlSeconds: 600, logger: new Logger("test", "log"), }); @@ -707,22 +751,21 @@ describe("MollifierBuffer entry lifecycle invariants", () => { port: redisContainer.getPort(), password: redisContainer.getPassword(), }, - entryTtlSeconds: 600, logger: new Logger("test", "log"), }); try { await buffer.accept({ runId: "run_ttl", envId: "env_a", orgId: "org_1", payload: "{}" }); const beforeTtl = await buffer.getEntryTtlSeconds("run_ttl"); - expect(beforeTtl).toBeGreaterThan(0); + expect(beforeTtl).toBe(-1); await buffer.pop("env_a"); const afterTtl = await buffer.getEntryTtlSeconds("run_ttl"); - // TTL must still be present (>0). Redis returns -1 if the key has no - // TTL — that's the leak shape we're guarding against. - expect(afterTtl).toBeGreaterThan(0); - expect(afterTtl).toBeLessThanOrEqual(beforeTtl); + // No TTL applied at any point during accept/pop — the entry + // persists until the drainer ACKs or FAILs. Returning -1 from + // Redis here is the expected steady state, not a leak. + expect(afterTtl).toBe(-1); } finally { await buffer.close(); } @@ -739,7 +782,6 @@ describe("MollifierBuffer entry lifecycle invariants", () => { port: redisContainer.getPort(), password: redisContainer.getPassword(), }, - entryTtlSeconds: 600, logger: new Logger("test", "log"), }); @@ -795,7 +837,6 @@ describe("MollifierBuffer.accept idempotency", () => { port: redisContainer.getPort(), password: redisContainer.getPassword(), }, - entryTtlSeconds: 600, logger: new Logger("test", "log"), }); @@ -813,8 +854,8 @@ describe("MollifierBuffer.accept idempotency", () => { payload: serialiseSnapshot({ first: false }), }); - expect(first).toBe(true); - expect(second).toBe(false); + expect(first).toEqual({ kind: "accepted" }); + expect(second).toEqual({ kind: "duplicate_run_id" }); // First payload preserved; second was a no-op. const stored = await buffer.getEntry("run_dup"); @@ -844,7 +885,6 @@ describe("MollifierBuffer.accept idempotency", () => { port: redisContainer.getPort(), password: redisContainer.getPassword(), }, - entryTtlSeconds: 600, logger: new Logger("test", "log"), }); @@ -855,7 +895,7 @@ describe("MollifierBuffer.accept idempotency", () => { expect(stored!.status).toBe("DRAINING"); const dup = await buffer.accept({ runId: "run_dr", envId: "env_a", orgId: "org_1", payload: "{}" }); - expect(dup).toBe(false); + expect(dup).toEqual({ kind: "duplicate_run_id" }); const afterDup = await buffer.getEntry("run_dr"); expect(afterDup!.status).toBe("DRAINING"); // unchanged @@ -866,16 +906,21 @@ describe("MollifierBuffer.accept idempotency", () => { ); redisTest( - "accept refused while existing entry is FAILED", + "runId slot is reclaimable after fail tears the entry down", { timeout: 20_000 }, async ({ redisContainer }) => { + // Post-TTL-drop design: fail() deletes the entry hash because + // the SYSTEM_FAILURE PG row is the canonical record of the + // failure. The runId slot is therefore free for a fresh accept + // afterwards — runIds are server-generated CUIDs and don't + // collide in practice, but the contract pinning here documents + // that a re-acceptance does NOT see a phantom "FAILED" entry. const buffer = new MollifierBuffer({ redisOptions: { host: redisContainer.getHost(), port: redisContainer.getPort(), password: redisContainer.getPassword(), }, - entryTtlSeconds: 600, logger: new Logger("test", "log"), }); @@ -883,15 +928,20 @@ describe("MollifierBuffer.accept idempotency", () => { await buffer.accept({ runId: "run_fl", envId: "env_a", orgId: "org_1", payload: "{}" }); await buffer.pop("env_a"); await buffer.fail("run_fl", { code: "VALIDATION", message: "boom" }); - const stored = await buffer.getEntry("run_fl"); - expect(stored!.status).toBe("FAILED"); - const dup = await buffer.accept({ runId: "run_fl", envId: "env_a", orgId: "org_1", payload: "{}" }); - expect(dup).toBe(false); + // Entry hash gone after fail (see "fail returns true and tears + // the entry down" — this test pins the accept-side effect). + expect(await buffer.getEntry("run_fl")).toBeNull(); - const afterDup = await buffer.getEntry("run_fl"); - expect(afterDup!.status).toBe("FAILED"); // unchanged - expect(afterDup!.lastError).toEqual({ code: "VALIDATION", message: "boom" }); + const fresh = await buffer.accept({ + runId: "run_fl", + envId: "env_a", + orgId: "org_1", + payload: '{"fresh":true}', + }); + expect(fresh).toEqual({ kind: "accepted" }); + const after = await buffer.getEntry("run_fl"); + expect(after?.status).toBe("QUEUED"); } finally { await buffer.close(); } @@ -899,16 +949,21 @@ describe("MollifierBuffer.accept idempotency", () => { ); redisTest( - "re-accept after ack works (terminal entry can be re-accepted)", + "accept refused while a previously-acked (materialised) entry is still inside its grace TTL", { timeout: 20_000 }, async ({ redisContainer }) => { + // After ack, the entry hash persists for the grace window as a + // read-fallback safety net (Q1 D2). RunIds are server-generated and + // never collide in practice, but defense-in-depth: accept refuses + // while *any* entry exists for the runId, including materialised + // ones. The entry hash's TTL is now ~30s instead of the original + // entryTtlSeconds. const buffer = new MollifierBuffer({ redisOptions: { host: redisContainer.getHost(), port: redisContainer.getPort(), password: redisContainer.getPassword(), }, - entryTtlSeconds: 600, logger: new Logger("test", "log"), }); @@ -922,7 +977,6 @@ describe("MollifierBuffer.accept idempotency", () => { await buffer.pop("env_a"); await buffer.ack("run_x"); - // Entry is gone — re-accept should succeed. const reAccept = await buffer.accept({ runId: "run_x", envId: "env_a", @@ -930,8 +984,11 @@ describe("MollifierBuffer.accept idempotency", () => { payload: "{}", }); - expect(first).toBe(true); - expect(reAccept).toBe(true); + expect(first).toEqual({ kind: "accepted" }); + expect(reAccept).toEqual({ kind: "duplicate_run_id" }); + + const stored = await buffer.getEntry("run_x"); + expect(stored!.materialised).toBe(true); } finally { await buffer.close(); } @@ -950,7 +1007,6 @@ describe("MollifierBuffer envs set lifecycle", () => { port: redisContainer.getPort(), password: redisContainer.getPassword(), }, - entryTtlSeconds: 600, logger: new Logger("test", "log"), }); @@ -976,7 +1032,6 @@ describe("MollifierBuffer envs set lifecycle", () => { port: redisContainer.getPort(), password: redisContainer.getPassword(), }, - entryTtlSeconds: 600, logger: new Logger("test", "log"), }); @@ -1006,7 +1061,6 @@ describe("MollifierBuffer envs set lifecycle", () => { port: redisContainer.getPort(), password: redisContainer.getPassword(), }, - entryTtlSeconds: 600, logger: new Logger("test", "log"), }); @@ -1025,3 +1079,952 @@ describe("MollifierBuffer envs set lifecycle", () => { }, ); }); + +describe("MollifierBuffer idempotency lookup", () => { + redisTest( + "accept with idempotencyKey + taskIdentifier writes the lookup with no TTL", + { timeout: 20_000 }, + async ({ redisContainer }) => { + // Post-TTL-drop design: the idempotency lookup has no TTL, so it + // can never expire ahead of the entry hash (which used to cause + // a dedup-drift bug — once the lookup expired but the entry + // didn't, a retry with the same key would create a *new* + // buffered run for the same key). The drainer's ack and fail + // both DEL the lookup as part of teardown. + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + try { + const result = await buffer.accept({ + runId: "ri1", + envId: "env_i", + orgId: "org_1", + payload: "{}", + idempotencyKey: "ikey-1", + taskIdentifier: "my-task", + }); + expect(result).toEqual({ kind: "accepted" }); + + const lookupKey = "mollifier:idempotency:env_i:my-task:ikey-1"; + const stored = await buffer["redis"].get(lookupKey); + expect(stored).toBe("ri1"); + // -1 = key exists with no TTL set. + expect(await buffer["redis"].ttl(lookupKey)).toBe(-1); + + const entry = await buffer.getEntry("ri1"); + expect(entry!.idempotencyLookupKey).toBe(lookupKey); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "second accept with same (env, task, idempotencyKey) returns duplicate_idempotency with the winner's runId", + { timeout: 20_000 }, + async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + try { + const first = await buffer.accept({ + runId: "ri-a", + envId: "env_i", + orgId: "org_1", + payload: "{}", + idempotencyKey: "ikey-2", + taskIdentifier: "my-task", + }); + const second = await buffer.accept({ + runId: "ri-b", + envId: "env_i", + orgId: "org_1", + payload: "{}", + idempotencyKey: "ikey-2", + taskIdentifier: "my-task", + }); + + expect(first).toEqual({ kind: "accepted" }); + expect(second).toEqual({ + kind: "duplicate_idempotency", + existingRunId: "ri-a", + }); + + // The loser's runId entry was never created. + const loserEntry = await buffer.getEntry("ri-b"); + expect(loserEntry).toBeNull(); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "lookupIdempotency hits when the run is buffered", + { timeout: 20_000 }, + async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + try { + await buffer.accept({ + runId: "rl1", + envId: "env_i", + orgId: "org_1", + payload: "{}", + idempotencyKey: "k1", + taskIdentifier: "t", + }); + const found = await buffer.lookupIdempotency({ + envId: "env_i", + taskIdentifier: "t", + idempotencyKey: "k1", + }); + expect(found).toBe("rl1"); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "lookupIdempotency returns null when no lookup is bound", + { timeout: 20_000 }, + async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + try { + const found = await buffer.lookupIdempotency({ + envId: "env_i", + taskIdentifier: "t", + idempotencyKey: "absent", + }); + expect(found).toBeNull(); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "lookupIdempotency self-heals when the lookup points at an expired entry", + { timeout: 20_000 }, + async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + try { + // Plant a stale lookup pointing at a non-existent entry. + const lookupKey = "mollifier:idempotency:env_i:t:stale"; + await buffer["redis"].set(lookupKey, "rl-stale", "EX", 600); + expect(await buffer["redis"].get(lookupKey)).toBe("rl-stale"); + + const found = await buffer.lookupIdempotency({ + envId: "env_i", + taskIdentifier: "t", + idempotencyKey: "stale", + }); + expect(found).toBeNull(); + // Self-healed. + expect(await buffer["redis"].get(lookupKey)).toBeNull(); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "ack DELs the idempotency lookup along with marking materialised", + { timeout: 20_000 }, + async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + try { + await buffer.accept({ + runId: "ra1", + envId: "env_i", + orgId: "org_1", + payload: "{}", + idempotencyKey: "ka", + taskIdentifier: "t", + }); + await buffer.pop("env_i"); + await buffer.ack("ra1"); + + const lookupKey = "mollifier:idempotency:env_i:t:ka"; + expect(await buffer["redis"].get(lookupKey)).toBeNull(); + const entry = await buffer.getEntry("ra1"); + expect(entry!.materialised).toBe(true); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "resetIdempotency clears snapshot fields + lookup; returns the runId", + { timeout: 20_000 }, + async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + try { + await buffer.accept({ + runId: "rr1", + envId: "env_i", + orgId: "org_1", + payload: serialiseSnapshot({ + idempotencyKey: "kr", + idempotencyKeyExpiresAt: "2026-12-01T00:00:00Z", + other: "field", + }), + idempotencyKey: "kr", + taskIdentifier: "t", + }); + + const result = await buffer.resetIdempotency({ + envId: "env_i", + taskIdentifier: "t", + idempotencyKey: "kr", + }); + expect(result.clearedRunId).toBe("rr1"); + + // Lookup is gone. + const lookupKey = "mollifier:idempotency:env_i:t:kr"; + expect(await buffer["redis"].get(lookupKey)).toBeNull(); + + // Snapshot's idempotency fields are nulled, other fields kept. + const entry = await buffer.getEntry("rr1"); + const payload = JSON.parse(entry!.payload) as { + idempotencyKey: unknown; + idempotencyKeyExpiresAt: unknown; + other: string; + }; + expect(payload.idempotencyKey).toBeNull(); + expect(payload.idempotencyKeyExpiresAt).toBeNull(); + expect(payload.other).toBe("field"); + expect(entry!.idempotencyLookupKey).toBe(""); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "resetIdempotency returns null when nothing is bound", + { timeout: 20_000 }, + async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + try { + const result = await buffer.resetIdempotency({ + envId: "env_i", + taskIdentifier: "t", + idempotencyKey: "absent", + }); + expect(result.clearedRunId).toBeNull(); + } finally { + await buffer.close(); + } + }, + ); +}); + +describe("MollifierBuffer.casSetMetadata", () => { + redisTest( + "applies when expectedVersion matches; increments version; updates payload", + { timeout: 20_000 }, + async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + try { + await buffer.accept({ + runId: "cas1", + envId: "env_c", + orgId: "org_1", + payload: serialiseSnapshot({ metadata: '{"v":1}', metadataType: "application/json" }), + }); + const result = await buffer.casSetMetadata({ + runId: "cas1", + expectedVersion: 0, + newMetadata: '{"v":2}', + newMetadataType: "application/json", + }); + expect(result).toEqual({ kind: "applied", newVersion: 1 }); + + const entry = await buffer.getEntry("cas1"); + expect(entry!.metadataVersion).toBe(1); + const payload = JSON.parse(entry!.payload) as { metadata: string }; + expect(payload.metadata).toBe('{"v":2}'); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "returns version_conflict when expectedVersion is stale", + { timeout: 20_000 }, + async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + try { + await buffer.accept({ + runId: "cas2", + envId: "env_c", + orgId: "org_1", + payload: serialiseSnapshot({}), + }); + await buffer.casSetMetadata({ + runId: "cas2", + expectedVersion: 0, + newMetadata: '{"a":1}', + newMetadataType: "application/json", + }); + + // Second write with stale expectedVersion = 0 must conflict. + const result = await buffer.casSetMetadata({ + runId: "cas2", + expectedVersion: 0, + newMetadata: '{"a":2}', + newMetadataType: "application/json", + }); + expect(result).toEqual({ kind: "version_conflict", currentVersion: 1 }); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "returns not_found / busy on missing or terminal entries", + { timeout: 20_000 }, + async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + try { + const nf = await buffer.casSetMetadata({ + runId: "absent", + expectedVersion: 0, + newMetadata: "{}", + newMetadataType: "application/json", + }); + expect(nf).toEqual({ kind: "not_found" }); + + await buffer.accept({ + runId: "cas3", + envId: "env_c", + orgId: "org_1", + payload: serialiseSnapshot({}), + }); + await buffer.pop("env_c"); + const busy = await buffer.casSetMetadata({ + runId: "cas3", + expectedVersion: 0, + newMetadata: "{}", + newMetadataType: "application/json", + }); + expect(busy).toEqual({ kind: "busy" }); + } finally { + await buffer.close(); + } + }, + ); +}); + +describe("MollifierBuffer.mutateSnapshot", () => { + redisTest( + "returns not_found when no entry exists for the runId", + { timeout: 20_000 }, + async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + try { + const result = await buffer.mutateSnapshot("nope", { + type: "append_tags", + tags: ["x"], + }); + expect(result).toBe("not_found"); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "append_tags on QUEUED entry appends and dedupes", + { timeout: 20_000 }, + async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + try { + await buffer.accept({ + runId: "r1", + envId: "env_m", + orgId: "org_1", + payload: serialiseSnapshot({ tags: ["existing"] }), + }); + const first = await buffer.mutateSnapshot("r1", { + type: "append_tags", + tags: ["existing", "new"], + }); + expect(first).toBe("applied_to_snapshot"); + + const entry = await buffer.getEntry("r1"); + const payload = JSON.parse(entry!.payload) as { tags: string[] }; + expect(payload.tags).toEqual(["existing", "new"]); + + // Second mutation appends without duplicating + const second = await buffer.mutateSnapshot("r1", { + type: "append_tags", + tags: ["new", "third"], + }); + expect(second).toBe("applied_to_snapshot"); + const e2 = await buffer.getEntry("r1"); + const p2 = JSON.parse(e2!.payload) as { tags: string[] }; + expect(p2.tags).toEqual(["existing", "new", "third"]); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "append_tags creates payload.tags when absent", + { timeout: 20_000 }, + async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + try { + await buffer.accept({ + runId: "r2", + envId: "env_m", + orgId: "org_1", + payload: serialiseSnapshot({ taskId: "t" }), + }); + const result = await buffer.mutateSnapshot("r2", { + type: "append_tags", + tags: ["a", "b"], + }); + expect(result).toBe("applied_to_snapshot"); + const entry = await buffer.getEntry("r2"); + const payload = JSON.parse(entry!.payload) as { tags: string[] }; + expect(payload.tags).toEqual(["a", "b"]); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "set_metadata replaces metadata + metadataType (last-write-wins)", + { timeout: 20_000 }, + async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + try { + await buffer.accept({ + runId: "r3", + envId: "env_m", + orgId: "org_1", + payload: serialiseSnapshot({ metadata: '{"v":1}', metadataType: "application/json" }), + }); + const result = await buffer.mutateSnapshot("r3", { + type: "set_metadata", + metadata: '{"v":2}', + metadataType: "application/json", + }); + expect(result).toBe("applied_to_snapshot"); + const entry = await buffer.getEntry("r3"); + const payload = JSON.parse(entry!.payload) as { + metadata: string; + metadataType: string; + }; + expect(payload.metadata).toBe('{"v":2}'); + expect(payload.metadataType).toBe("application/json"); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "set_delay sets payload.delayUntil", + { timeout: 20_000 }, + async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + try { + await buffer.accept({ + runId: "r4", + envId: "env_m", + orgId: "org_1", + payload: serialiseSnapshot({ taskId: "t" }), + }); + const result = await buffer.mutateSnapshot("r4", { + type: "set_delay", + delayUntil: "2026-06-01T00:00:00.000Z", + }); + expect(result).toBe("applied_to_snapshot"); + const entry = await buffer.getEntry("r4"); + const payload = JSON.parse(entry!.payload) as { delayUntil: string }; + expect(payload.delayUntil).toBe("2026-06-01T00:00:00.000Z"); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "mark_cancelled stamps cancelledAt + cancelReason", + { timeout: 20_000 }, + async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + try { + await buffer.accept({ + runId: "r5", + envId: "env_m", + orgId: "org_1", + payload: serialiseSnapshot({ taskId: "t" }), + }); + const result = await buffer.mutateSnapshot("r5", { + type: "mark_cancelled", + cancelledAt: "2026-05-19T12:00:00.000Z", + cancelReason: "user-initiated", + }); + expect(result).toBe("applied_to_snapshot"); + const entry = await buffer.getEntry("r5"); + const payload = JSON.parse(entry!.payload) as { + cancelledAt: string; + cancelReason: string; + }; + expect(payload.cancelledAt).toBe("2026-05-19T12:00:00.000Z"); + expect(payload.cancelReason).toBe("user-initiated"); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "returns busy when entry is DRAINING", + { timeout: 20_000 }, + async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + try { + await buffer.accept({ + runId: "rd", + envId: "env_m", + orgId: "org_1", + payload: serialiseSnapshot({ tags: [] }), + }); + await buffer.pop("env_m"); + const result = await buffer.mutateSnapshot("rd", { + type: "append_tags", + tags: ["x"], + }); + expect(result).toBe("busy"); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "returns not_found when entry was FAILED (drainer-terminal teardown)", + { timeout: 20_000 }, + async ({ redisContainer }) => { + // Post-TTL-drop design: fail() DELs the entry hash because the + // drainer has already written the canonical SYSTEM_FAILURE PG + // row, and without an accept-time TTL we'd otherwise accrete + // failed entries in Redis forever. Late mutations against a + // failed run therefore see `not_found`, matching the same shape + // they'd get for any other already-cleaned-up runId. + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + try { + await buffer.accept({ + runId: "rf", + envId: "env_m", + orgId: "org_1", + payload: serialiseSnapshot({ tags: [] }), + }); + await buffer.pop("env_m"); + await buffer.fail("rf", { code: "X", message: "boom" }); + const result = await buffer.mutateSnapshot("rf", { + type: "append_tags", + tags: ["x"], + }); + expect(result).toBe("not_found"); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "returns busy when entry is materialised (post-ack grace window)", + { timeout: 20_000 }, + async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + try { + await buffer.accept({ + runId: "rm", + envId: "env_m", + orgId: "org_1", + payload: serialiseSnapshot({ tags: [] }), + }); + await buffer.pop("env_m"); + await buffer.ack("rm"); + const result = await buffer.mutateSnapshot("rm", { + type: "append_tags", + tags: ["x"], + }); + expect(result).toBe("busy"); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "Lua atomicity serialises concurrent mutations per-runId", + { timeout: 20_000 }, + async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + try { + await buffer.accept({ + runId: "rcc", + envId: "env_m", + orgId: "org_1", + payload: serialiseSnapshot({ tags: [] }), + }); + + const tagsToAdd = Array.from({ length: 50 }, (_, i) => `t${i}`); + await Promise.all( + tagsToAdd.map((t) => buffer.mutateSnapshot("rcc", { type: "append_tags", tags: [t] })), + ); + + const entry = await buffer.getEntry("rcc"); + const payload = JSON.parse(entry!.payload) as { tags: string[] }; + expect(payload.tags.sort()).toEqual(tagsToAdd.sort()); + } finally { + await buffer.close(); + } + }, + ); +}); + +describe("MollifierBuffer ZSET storage", () => { + redisTest( + "queue key is a ZSET scored by entry's createdAtMicros", + { timeout: 20_000 }, + async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + + try { + await buffer.accept({ runId: "z1", envId: "env_z", orgId: "org_1", payload: "{}" }); + + // ZSET-only commands must succeed against the queue key. + const card = await buffer["redis"].zcard("mollifier:queue:env_z"); + expect(card).toBe(1); + + const score = await buffer["redis"].zscore("mollifier:queue:env_z", "z1"); + expect(score).not.toBeNull(); + const scoreNum = Number(score); + expect(Number.isFinite(scoreNum)).toBe(true); + + // Score matches the entry hash's createdAtMicros field. + const micros = await buffer["redis"].hget("mollifier:entries:z1", "createdAtMicros"); + expect(micros).not.toBeNull(); + expect(Number(micros)).toBe(scoreNum); + + // Score is plausibly recent (within last minute as microseconds). + const nowMicros = Date.now() * 1000; + expect(scoreNum).toBeGreaterThan(nowMicros - 60_000_000); + expect(scoreNum).toBeLessThanOrEqual(nowMicros + 1_000_000); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "pop returns entries in ascending createdAtMicros order (FIFO by time, not by member)", + { timeout: 20_000 }, + async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + + try { + // Insert runIds in reverse-lex order to prove ordering is by score, not member. + await buffer.accept({ runId: "zzz", envId: "env_o", orgId: "org_1", payload: "{}" }); + await new Promise((r) => setTimeout(r, 5)); + await buffer.accept({ runId: "mmm", envId: "env_o", orgId: "org_1", payload: "{}" }); + await new Promise((r) => setTimeout(r, 5)); + await buffer.accept({ runId: "aaa", envId: "env_o", orgId: "org_1", payload: "{}" }); + + const first = await buffer.pop("env_o"); + expect(first!.runId).toBe("zzz"); + const second = await buffer.pop("env_o"); + expect(second!.runId).toBe("mmm"); + const third = await buffer.pop("env_o"); + expect(third!.runId).toBe("aaa"); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "requeue keeps original score; createdAt is immutable across retries", + { timeout: 20_000 }, + async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + + try { + await buffer.accept({ runId: "rq", envId: "env_rq", orgId: "org_1", payload: "{}" }); + const originalScore = Number( + await buffer["redis"].zscore("mollifier:queue:env_rq", "rq"), + ); + const originalMicros = Number( + await buffer["redis"].hget("mollifier:entries:rq", "createdAtMicros"), + ); + + await buffer.pop("env_rq"); + await new Promise((r) => setTimeout(r, 5)); + await buffer.requeue("rq"); + + const newScore = Number( + await buffer["redis"].zscore("mollifier:queue:env_rq", "rq"), + ); + const newMicros = Number( + await buffer["redis"].hget("mollifier:entries:rq", "createdAtMicros"), + ); + expect(newScore).toBe(originalScore); + expect(newMicros).toBe(originalMicros); + } finally { + await buffer.close(); + } + }, + ); +}); + +describe("MollifierBuffer.listEntriesForEnv", () => { + redisTest( + "returns up to maxCount entries from the queue without consuming them", + { timeout: 20_000 }, + async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + + try { + await buffer.accept({ runId: "r1", envId: "env_a", orgId: "org_1", payload: "{}" }); + await buffer.accept({ runId: "r2", envId: "env_a", orgId: "org_1", payload: "{}" }); + await buffer.accept({ runId: "r3", envId: "env_a", orgId: "org_1", payload: "{}" }); + + const entries = await buffer.listEntriesForEnv("env_a", 2); + expect(entries).toHaveLength(2); + const runIds = entries.map((e) => e.runId); + expect(new Set(runIds).size).toBe(2); + for (const id of runIds) expect(["r1", "r2", "r3"]).toContain(id); + + // Non-destructive: the drainer can still pop all three. + const popped: string[] = []; + for (let i = 0; i < 3; i++) { + const entry = await buffer.pop("env_a"); + if (entry) popped.push(entry.runId); + } + expect(new Set(popped)).toEqual(new Set(["r1", "r2", "r3"])); + } finally { + await buffer.close(); + } + }, + ); + + redisTest("returns empty array when env queue is empty", { timeout: 20_000 }, async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + + try { + expect(await buffer.listEntriesForEnv("env_empty", 10)).toEqual([]); + } finally { + await buffer.close(); + } + }); + + redisTest("maxCount <= 0 returns empty without hitting redis", { timeout: 20_000 }, async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + + try { + expect(await buffer.listEntriesForEnv("env_a", 0)).toEqual([]); + expect(await buffer.listEntriesForEnv("env_a", -5)).toEqual([]); + } finally { + await buffer.close(); + } + }); +}); diff --git a/packages/redis-worker/src/mollifier/buffer.ts b/packages/redis-worker/src/mollifier/buffer.ts index f739e3ff362..fd53f59efea 100644 --- a/packages/redis-worker/src/mollifier/buffer.ts +++ b/packages/redis-worker/src/mollifier/buffer.ts @@ -10,17 +10,66 @@ import { BufferEntry, BufferEntrySchema } from "./schemas.js"; export type MollifierBufferOptions = { redisOptions: RedisOptions; - entryTtlSeconds: number; logger?: Logger; }; +// Grace TTL applied to the entry hash on drainer ack. The entry survives +// this long after materialisation so direct reads (retrieve, trace, etc.) +// have a safety net while PG replica lag settles. Q1 D2. +const ACK_GRACE_TTL_SECONDS = 30; + +export type SnapshotPatch = + | { type: "append_tags"; tags: string[] } + | { type: "set_metadata"; metadata: string; metadataType: string } + | { type: "set_delay"; delayUntil: string } + | { type: "mark_cancelled"; cancelledAt: string; cancelReason?: string }; + +export type MutateSnapshotResult = "applied_to_snapshot" | "not_found" | "busy"; + +export type CasSetMetadataResult = + | { kind: "applied"; newVersion: number } + | { kind: "version_conflict"; currentVersion: number } + | { kind: "not_found" } + | { kind: "busy" }; + +export type AcceptResult = + | { kind: "accepted" } + | { kind: "duplicate_run_id" } + | { kind: "duplicate_idempotency"; existingRunId: string }; + +export type IdempotencyLookupInput = { + envId: string; + taskIdentifier: string; + idempotencyKey: string; +}; + +function makeIdempotencyLookupKey(input: IdempotencyLookupInput): string { + return `mollifier:idempotency:${input.envId}:${input.taskIdentifier}:${input.idempotencyKey}`; +} + +// Pre-gate claim key namespace, distinct from `mollifier:idempotency` so the +// existing B6a buffer-side dedup stays isolated. The claim is the +// authoritative cross-store "this idempotency key is in flight or +// resolved" pointer used by the trigger hot path +// (`_plans/2026-05-21-mollifier-idempotency-claim.md`). Values: +// "pending" → a trigger pipeline owns the key and hasn't published yet +// → the winning trigger's runId (resolved) +export const IDEMPOTENCY_CLAIM_PENDING = "pending"; + +function makeIdempotencyClaimKey(input: IdempotencyLookupInput): string { + return `mollifier:claim:${input.envId}:${input.taskIdentifier}:${input.idempotencyKey}`; +} + +export type IdempotencyClaimResult = + | { kind: "claimed" } + | { kind: "pending" } + | { kind: "resolved"; runId: string }; + export class MollifierBuffer { private readonly redis: Redis; - private readonly entryTtlSeconds: number; private readonly logger: Logger; constructor(options: MollifierBufferOptions) { - this.entryTtlSeconds = options.entryTtlSeconds; this.logger = options.logger ?? new Logger("MollifierBuffer", "debug"); this.redis = createRedisClient( @@ -41,19 +90,47 @@ export class MollifierBuffer { this.#registerCommands(); } - // Returns true if the entry was newly written; false if a duplicate runId - // was already buffered (idempotent no-op). Callers can use the boolean to - // record a duplicate-accept metric without affecting buffer state. + // Three outcomes: + // - { kind: "accepted" } — entry was newly written. + // - { kind: "duplicate_run_id" } — runId was already buffered (idempotent + // no-op, same semantic as the previous boolean-false return). + // - { kind: "duplicate_idempotency", existingRunId } — the (env, task, + // idempotencyKey) tuple was already bound to another buffered run. + // The Lua's atomic SETNX is the race-winner; the second caller gets + // the winner's runId so it can return that as the trigger response. async accept(input: { runId: string; envId: string; orgId: string; payload: string; - }): Promise { + // Optional idempotency-key triple. When all three are present we + // SETNX a Redis lookup at `mollifier:idempotency:{env}:{task}:{key}` + // pointing at the runId so trigger-time dedup during the buffered + // window resolves the same way PG's unique constraint resolves it + // post-materialisation (Q5). + idempotencyKey?: string; + taskIdentifier?: string; + }): Promise { const entryKey = `mollifier:entries:${input.runId}`; const queueKey = `mollifier:queue:${input.envId}`; const orgsKey = "mollifier:orgs"; - const createdAt = new Date().toISOString(); + const nowMs = Date.now(); + const createdAt = new Date(nowMs).toISOString(); + // Microsecond epoch. JS only has millisecond precision, so multiple + // accepts in the same ms share a score; ZSET ties resolve by member + // (runId) lex order, which is deterministic and acceptable for FIFO + // pop. The hash carries the same value as `createdAtMicros` so the + // listing helper (Phase E) can read a stable per-run timestamp + // without re-fetching the score. + const createdAtMicros = nowMs * 1000; + const idempotencyLookupKey = + input.idempotencyKey && input.taskIdentifier + ? makeIdempotencyLookupKey({ + envId: input.envId, + taskIdentifier: input.taskIdentifier, + idempotencyKey: input.idempotencyKey, + }) + : ""; const result = await this.redis.acceptMollifierEntry( entryKey, queueKey, @@ -63,10 +140,17 @@ export class MollifierBuffer { input.orgId, input.payload, createdAt, - String(this.entryTtlSeconds), + String(createdAtMicros), "mollifier:org-envs:", + idempotencyLookupKey, ); - return result === 1; + // Lua returns 1 (accepted), 0 (duplicate runId), or a string runId + // (duplicate idempotency — value is the existing winner's runId). + if (typeof result === "string" && result.length > 0) { + return { kind: "duplicate_idempotency", existingRunId: result }; + } + if (result === 1) return { kind: "accepted" }; + return { kind: "duplicate_run_id" }; } async pop(envId: string): Promise { @@ -128,8 +212,247 @@ export class MollifierBuffer { return this.redis.smembers(`mollifier:org-envs:${orgId}`); } + // Paginated read of currently-queued entries newest-first, bounded by + // an optional `(createdAtMicros, runId)` watermark. Q1 listing design. + // Returns hydrated `BufferEntry` rows up to `pageSize`. Skips orphans + // (queue ref without an entry hash) silently. Non-destructive — the + // drainer keeps popping these entries in createdAt order regardless. + async listForEnvWithWatermark(input: { + envId: string; + watermark?: { createdAtMicros: number; runId: string }; + pageSize: number; + }): Promise { + if (input.pageSize <= 0) return []; + const queueKey = `mollifier:queue:${input.envId}`; + + let runIds: string[]; + if (!input.watermark) { + // Page 1 — newest first. + runIds = await this.redis.zrevrangebyscore( + queueKey, + "+inf", + "-inf", + "LIMIT", + 0, + input.pageSize, + ); + } else { + // Page N — strictly below the watermark score. + const belowScore = await this.redis.zrevrangebyscore( + queueKey, + `(${input.watermark.createdAtMicros}`, + "-inf", + "LIMIT", + 0, + input.pageSize, + ); + runIds = belowScore; + // Tied-score scan: ZSET ties broken by member-DESC, so entries + // sharing the watermark score with a lex-smaller runId still + // need to surface. Cheap second range over the tied band. + if (belowScore.length < input.pageSize) { + const remaining = input.pageSize - belowScore.length; + const tied = await this.redis.zrangebyscore( + queueKey, + input.watermark.createdAtMicros, + input.watermark.createdAtMicros, + ); + // Filter to runIds lex-less than the watermark anchor, sort + // member-DESC, take `remaining`. + const tiedFiltered = tied + .filter((r) => r < input.watermark!.runId) + .sort((a, b) => (a < b ? 1 : a > b ? -1 : 0)) + .slice(0, remaining); + runIds = [...belowScore, ...tiedFiltered]; + } + } + + if (runIds.length === 0) return []; + + // Parallel HGETALL — one round-trip per entry, all in flight. + const fetched = await Promise.all( + runIds.map((runId) => this.redis.hgetall(`mollifier:entries:${runId}`)), + ); + const entries: BufferEntry[] = []; + for (const value of fetched) { + if (!value || Object.keys(value).length === 0) continue; + const parsed = BufferEntrySchema.safeParse(value); + if (parsed.success) entries.push(parsed.data); + } + return entries; + } + + // Read-only listing of currently-queued entries for a single env. Used by + // the dashboard's "Recently queued" surface — non-destructive, so the + // drainer still pops these entries in order. Returns up to `maxCount` + // entries newest-first (highest score, which is `createdAtMicros`). + // Each entry hash is fetched separately; a `null` from getEntry (TTL + // expired between ZREVRANGE and HGETALL) is skipped. + async listEntriesForEnv(envId: string, maxCount: number): Promise { + if (maxCount <= 0) return []; + const runIds = await this.redis.zrevrange( + `mollifier:queue:${envId}`, + 0, + maxCount - 1, + ); + const entries: BufferEntry[] = []; + for (const runId of runIds) { + const entry = await this.getEntry(runId); + if (entry) entries.push(entry); + } + return entries; + } + + // Atomic snapshot mutation. Used by customer-mutation API endpoints + // (tags, metadata-put, reschedule, cancel) when the run is still in + // the buffer. Three outcomes: + // - "applied_to_snapshot": entry was QUEUED + not materialised; the + // drainer will read the patched payload on its next pop. + // - "not_found": no entry hash exists for this runId. + // - "busy": entry is DRAINING / FAILED / materialised. The API + // wait-and-bounces through PG (Q3 design). + async mutateSnapshot(runId: string, patch: SnapshotPatch): Promise { + const result = (await this.redis.mutateMollifierSnapshot( + `mollifier:entries:${runId}`, + JSON.stringify(patch), + )) as string; + if ( + result === "applied_to_snapshot" || + result === "not_found" || + result === "busy" + ) { + return result; + } + throw new Error(`MollifierBuffer.mutateSnapshot: unexpected Lua return value: ${result}`); + } + + // Optimistic compare-and-swap on the snapshot's metadata. Caller reads + // the current metadataVersion via getEntry, applies operations in JS via + // `applyMetadataOperations`, then calls this with the new metadata + the + // expected version. Lua refuses if the version has moved (caller retries + // up to N times). Mirrors the PG-side `UpdateMetadataService` retry + // loop so concurrent increment/append operations don't lose deltas. + async casSetMetadata(input: { + runId: string; + expectedVersion: number; + newMetadata: string; + newMetadataType: string; + }): Promise { + const entryKey = `mollifier:entries:${input.runId}`; + const raw = (await this.redis.casSetMollifierMetadata( + entryKey, + String(input.expectedVersion), + input.newMetadata, + input.newMetadataType, + )) as string; + if (raw === "not_found") return { kind: "not_found" }; + if (raw === "busy") return { kind: "busy" }; + if (raw.startsWith("conflict:")) { + return { kind: "version_conflict", currentVersion: Number(raw.slice("conflict:".length)) }; + } + if (raw.startsWith("applied:")) { + return { kind: "applied", newVersion: Number(raw.slice("applied:".length)) }; + } + throw new Error(`MollifierBuffer.casSetMetadata: unexpected Lua return: ${raw}`); + } + + // Atomic pre-gate claim on a (env, task, idempotencyKey) tuple. One + // call across both PG and buffer paths serialises through this claim; + // closes the race the buffer-side B6a SETNX leaves open during the + // gate-transition burst window (see + // `_plans/2026-05-21-mollifier-idempotency-claim.md`). + // + // - "claimed": we now own the claim, the caller proceeds with the + // trigger pipeline and must `publishClaim` on success or + // `releaseClaim` on failure. + // - "pending": another trigger owns the claim and hasn't published + // yet; the caller should poll. + // - "resolved": the claim already holds a runId; the caller can + // return that runId as a cached hit. + async claimIdempotency( + input: IdempotencyLookupInput & { ttlSeconds: number }, + ): Promise { + const claimKey = makeIdempotencyClaimKey(input); + const raw = (await this.redis.claimMollifierIdempotency( + claimKey, + IDEMPOTENCY_CLAIM_PENDING, + String(input.ttlSeconds), + )) as string; + if (raw === "claimed") return { kind: "claimed" }; + if (raw === "pending") return { kind: "pending" }; + if (raw.startsWith("resolved:")) { + return { kind: "resolved", runId: raw.slice("resolved:".length) }; + } + throw new Error(`MollifierBuffer.claimIdempotency: unexpected return: ${raw}`); + } + + // Publish the winning runId to the claim so subsequent claimants / + // waiters see "resolved". TTL bounded by the customer's + // `idempotencyKeyExpiresAt` minus now; caller computes. + async publishClaim( + input: IdempotencyLookupInput & { runId: string; ttlSeconds: number }, + ): Promise { + const claimKey = makeIdempotencyClaimKey(input); + await this.redis.set(claimKey, input.runId, "EX", input.ttlSeconds); + } + + // Release the claim on pipeline error so waiters can re-claim and + // retry. Idempotent. + async releaseClaim(input: IdempotencyLookupInput): Promise { + const claimKey = makeIdempotencyClaimKey(input); + await this.redis.del(claimKey); + } + + // Read the current claim value, used by the wait/poll loop on losers + // to detect "pending" → "resolved" transitions and timeouts. + async readClaim(input: IdempotencyLookupInput): Promise { + const claimKey = makeIdempotencyClaimKey(input); + const value = await this.redis.get(claimKey); + if (value === null) return null; + if (value === IDEMPOTENCY_CLAIM_PENDING) return { kind: "pending" }; + return { kind: "resolved", runId: value }; + } + + // Resolve a buffered run by (env, task, idempotencyKey) tuple. Used by + // `IdempotencyKeyConcern.handleTriggerRequest` after the PG check + // misses — same key may belong to a buffered run waiting to drain. The + // lookup self-heals: if the lookup points at an entry hash that's + // expired, we DEL the lookup and report a miss. + async lookupIdempotency(input: IdempotencyLookupInput): Promise { + const lookupKey = makeIdempotencyLookupKey(input); + const runId = await this.redis.get(lookupKey); + if (!runId) return null; + const entry = await this.getEntry(runId); + if (!entry) { + await this.redis.del(lookupKey); + return null; + } + return runId; + } + + // Clear the idempotency binding from a buffered run. Used by + // `ResetIdempotencyKeyService` alongside the existing PG-side + // `updateMany`. Returns the runId that was cleared, or null if no + // buffered run held this key. + async resetIdempotency(input: IdempotencyLookupInput): Promise<{ clearedRunId: string | null }> { + const lookupKey = makeIdempotencyLookupKey(input); + const clearedRunId = (await this.redis.resetMollifierIdempotency( + lookupKey, + "mollifier:entries:", + )) as string; + return { clearedRunId: clearedRunId.length > 0 ? clearedRunId : null }; + } + + // Marks the entry as materialised (PG row written) and resets its TTL to + // the grace window. Entry hash persists past ack as a read-fallback + // safety net for the brief PG replica-lag window between drainer-side + // write and reader-side visibility (Q1 D2). Also clears the associated + // idempotency lookup if one was set on accept (Q5). async ack(runId: string): Promise { - await this.redis.del(`mollifier:entries:${runId}`); + await this.redis.ackMollifierEntry( + `mollifier:entries:${runId}`, + String(ACK_GRACE_TTL_SECONDS), + ); } async requeue(runId: string): Promise { @@ -153,10 +476,16 @@ export class MollifierBuffer { return result === 1; } + // Returns Redis-side TTL on the entry hash. Returns -1 for entries + // with no TTL — the steady state under the current design, where + // entries persist until drainer ack/fail. The ack grace TTL (30s + // post-materialise) is the only context where this returns a + // positive value; tests around the grace TTL still rely on it. async getEntryTtlSeconds(runId: string): Promise { return this.redis.ttl(`mollifier:entries:${runId}`); } + async evaluateTrip( envId: string, options: { windowMs: number; threshold: number; holdMs: number }, @@ -190,8 +519,9 @@ export class MollifierBuffer { local orgId = ARGV[3] local payload = ARGV[4] local createdAt = ARGV[5] - local ttlSeconds = tonumber(ARGV[6]) + local createdAtMicros = ARGV[6] local orgEnvsPrefix = ARGV[7] + local idempotencyLookupKey = ARGV[8] or '' -- Idempotent: refuse if an entry for this runId already exists in any -- state. Caller-side dedup is also enforced via API idempotency keys, @@ -200,6 +530,20 @@ export class MollifierBuffer { return 0 end + -- Idempotency-key dedup (Q5). If the caller passed a lookup key + -- and it's already bound to another buffered run, return the + -- winner's runId so the loser's API response can echo it as a + -- cached hit. Otherwise SET the lookup (no TTL — lifecycle is + -- paired with the entry hash; drainer ack/fail clear it + -- explicitly). + if idempotencyLookupKey ~= '' then + local existing = redis.call('GET', idempotencyLookupKey) + if existing then + return existing + end + redis.call('SET', idempotencyLookupKey, runId) + end + redis.call('HSET', entryKey, 'runId', runId, 'envId', envId, @@ -207,9 +551,22 @@ export class MollifierBuffer { 'payload', payload, 'status', 'QUEUED', 'attempts', '0', - 'createdAt', createdAt) - redis.call('EXPIRE', entryKey, ttlSeconds) - redis.call('LPUSH', queueKey, runId) + 'createdAt', createdAt, + 'createdAtMicros', createdAtMicros, + 'idempotencyLookupKey', idempotencyLookupKey, + 'metadataVersion', '0') + -- No EXPIRE on the entry hash. Buffer entries persist until the + -- drainer ACKs (post-materialise grace) or FAILs them — the + -- drainer is the only recovery mechanism, so silent TTL-based + -- eviction would lose runs with no customer-visible signal. + -- Memory pressure from an offline drainer is the alertable + -- failure mode instead; see _ops/mollifier-ops.md. + -- ZSET keyed by createdAtMicros: ZPOPMIN drains oldest-first + -- (FIFO); listing pagination uses ZREVRANGEBYSCORE with a + -- (createdAt, runId) cursor anchor. Score is stable across the + -- entry's lifecycle — requeue does not bump it (see Phase 3b / + -- Q1 design). + redis.call('ZADD', queueKey, createdAtMicros, runId) -- Org-level membership: maintained atomically with the per-env -- queue so the drainer can walk orgs → envs-for-org and -- schedule one env per org per tick. SADDs are idempotent if the @@ -231,7 +588,8 @@ export class MollifierBuffer { local envId = redis.call('HGET', entryKey, 'envId') local orgId = redis.call('HGET', entryKey, 'orgId') - if not envId then + local createdAtMicros = redis.call('HGET', entryKey, 'createdAtMicros') + if not envId or not createdAtMicros then return 0 end @@ -239,7 +597,11 @@ export class MollifierBuffer { local nextAttempts = tonumber(currentAttempts or '0') + 1 redis.call('HSET', entryKey, 'status', 'QUEUED', 'attempts', tostring(nextAttempts)) - redis.call('LPUSH', queuePrefix .. envId, runId) + -- Requeue re-adds with the ORIGINAL createdAtMicros score. + -- createdAt is immutable across retries (Phase 3b decision). + -- The drainer's maxAttempts caps the retry loop so a poisoned + -- entry doesn't head-of-line forever. + redis.call('ZADD', queuePrefix .. envId, tonumber(createdAtMicros), runId) -- Re-track the org/env: pop may have SREM'd them when the queue -- last emptied. SADDs are idempotent if the values are still -- present. @@ -279,7 +641,9 @@ export class MollifierBuffer { -- hash without a TTL, leaking memory. The loop is bounded by queue -- length; entire Lua script remains atomic. while true do - local runId = redis.call('RPOP', queueKey) + -- ZPOPMIN returns {member, score} as a flat array, or {} when empty. + local popped = redis.call('ZPOPMIN', queueKey) + local runId = popped[1] if not runId then -- Queue is empty AND we have no entry to read orgId from, so -- skip org-level cleanup. Stale org-envs entries are bounded @@ -296,9 +660,9 @@ export class MollifierBuffer { result[raw[i]] = raw[i + 1] end -- Prune org-level membership if this pop drained the queue. - -- Atomic with the RPOP above — a concurrent accept AFTER this - -- script will SADD both back along with its LPUSH. - if redis.call('LLEN', queueKey) == 0 then + -- Atomic with the ZPOPMIN above — a concurrent accept AFTER + -- this script will SADD both back along with its ZADD. + if redis.call('ZCARD', queueKey) == 0 then pruneOrgMembership(result['orgId']) end return cjson.encode(result) @@ -309,19 +673,220 @@ export class MollifierBuffer { `, }); + this.redis.defineCommand("casSetMollifierMetadata", { + numberOfKeys: 1, + lua: ` + local entryKey = KEYS[1] + local expectedVersion = tonumber(ARGV[1]) + local newMetadata = ARGV[2] + local newMetadataType = ARGV[3] + + if redis.call('EXISTS', entryKey) == 0 then + return 'not_found' + end + + local status = redis.call('HGET', entryKey, 'status') + local materialised = redis.call('HGET', entryKey, 'materialised') + if status ~= 'QUEUED' or materialised == 'true' then + return 'busy' + end + + local currentVersionStr = redis.call('HGET', entryKey, 'metadataVersion') or '0' + local currentVersion = tonumber(currentVersionStr) or 0 + if currentVersion ~= expectedVersion then + return 'conflict:' .. tostring(currentVersion) + end + + -- Write the new metadata onto the snapshot's payload JSON. We + -- keep the rest of the payload intact — only metadata/metadataType + -- change. metadataVersion is denormalised on the hash for cheap + -- CAS reads; it's intentionally NOT stored inside the payload + -- itself (PG-side metadataVersion is a column, not a JSON field). + local payloadJson = redis.call('HGET', entryKey, 'payload') + local ok, payload = pcall(cjson.decode, payloadJson) + if not ok then return 'busy' end + payload.metadata = newMetadata + payload.metadataType = newMetadataType + + local newVersion = currentVersion + 1 + redis.call('HSET', entryKey, + 'payload', cjson.encode(payload), + 'metadataVersion', tostring(newVersion)) + return 'applied:' .. tostring(newVersion) + `, + }); + + this.redis.defineCommand("claimMollifierIdempotency", { + numberOfKeys: 1, + lua: ` + local claimKey = KEYS[1] + local pending = ARGV[1] + local ttl = tonumber(ARGV[2]) + + -- SETNX-with-TTL: atomic; only one caller can win. + local won = redis.call('SET', claimKey, pending, 'NX', 'EX', ttl) + if won then + return 'claimed' + end + + local existing = redis.call('GET', claimKey) + if existing == pending then + return 'pending' + end + return 'resolved:' .. existing + `, + }); + + this.redis.defineCommand("resetMollifierIdempotency", { + numberOfKeys: 1, + lua: ` + local lookupKey = KEYS[1] + local entryPrefix = ARGV[1] + + local runId = redis.call('GET', lookupKey) + if not runId then + return '' + end + + local entryKey = entryPrefix .. runId + if redis.call('EXISTS', entryKey) == 0 then + -- Stale lookup. Lazy cleanup. + redis.call('DEL', lookupKey) + return '' + end + + -- Clear the idempotency fields on the snapshot payload so the + -- drainer's eventual engine.trigger call inserts a PG row + -- without the key set. + local payloadJson = redis.call('HGET', entryKey, 'payload') + if payloadJson then + local ok, payload = pcall(cjson.decode, payloadJson) + if ok then + payload.idempotencyKey = cjson.null + payload.idempotencyKeyExpiresAt = cjson.null + redis.call('HSET', entryKey, 'payload', cjson.encode(payload)) + end + end + -- Clear the denormalised lookup pointer on the hash so a later + -- ack doesn't try to DEL a key that's already gone. + redis.call('HSET', entryKey, 'idempotencyLookupKey', '') + redis.call('DEL', lookupKey) + return runId + `, + }); + + this.redis.defineCommand("mutateMollifierSnapshot", { + numberOfKeys: 1, + lua: ` + local entryKey = KEYS[1] + local patchJson = ARGV[1] + + if redis.call('EXISTS', entryKey) == 0 then + return 'not_found' + end + + local status = redis.call('HGET', entryKey, 'status') + local materialised = redis.call('HGET', entryKey, 'materialised') + if status ~= 'QUEUED' or materialised == 'true' then + return 'busy' + end + + local payloadJson = redis.call('HGET', entryKey, 'payload') + local ok, payload = pcall(cjson.decode, payloadJson) + if not ok then return 'busy' end + + local patch = cjson.decode(patchJson) + + if patch.type == 'append_tags' then + -- cjson decode of an absent or empty-array field gives nil or + -- an empty table; we rebuild as a dense array. Existing tags + -- are preserved; new tags are appended only if not present. + local existing = payload.tags or {} + local seen = {} + local merged = {} + for _, t in ipairs(existing) do + if not seen[t] then + seen[t] = true + table.insert(merged, t) + end + end + for _, t in ipairs(patch.tags or {}) do + if not seen[t] then + seen[t] = true + table.insert(merged, t) + end + end + payload.tags = merged + elseif patch.type == 'set_metadata' then + payload.metadata = patch.metadata + payload.metadataType = patch.metadataType + elseif patch.type == 'set_delay' then + payload.delayUntil = patch.delayUntil + elseif patch.type == 'mark_cancelled' then + payload.cancelledAt = patch.cancelledAt + payload.cancelReason = patch.cancelReason + else + return 'busy' + end + + redis.call('HSET', entryKey, 'payload', cjson.encode(payload)) + return 'applied_to_snapshot' + `, + }); + + this.redis.defineCommand("ackMollifierEntry", { + numberOfKeys: 1, + lua: ` + local entryKey = KEYS[1] + local graceTtlSeconds = tonumber(ARGV[1]) + + -- Guard: never create a partial entry. If the hash expired between + -- pop and ack, the run is gone — nothing to mark materialised. + if redis.call('EXISTS', entryKey) == 0 then + return 0 + end + + -- If the entry was accepted with an idempotency key, the lookup + -- string was stored on the hash at accept time. Clear it now — + -- PG becomes canonical for the key post-materialisation (Q5). + local lookupKey = redis.call('HGET', entryKey, 'idempotencyLookupKey') + if lookupKey and lookupKey ~= '' then + redis.call('DEL', lookupKey) + end + + redis.call('HSET', entryKey, 'materialised', 'true') + redis.call('EXPIRE', entryKey, graceTtlSeconds) + return 1 + `, + }); + this.redis.defineCommand("failMollifierEntry", { numberOfKeys: 1, lua: ` local entryKey = KEYS[1] local errorPayload = ARGV[1] - -- Guard: never create a partial entry. If the hash expired between - -- pop and fail, the run is gone — nothing to mark FAILED. + -- Guard: nothing to mark FAILED if the hash is gone (concurrent + -- ack/manual cleanup). Returning 0 lets the caller distinguish + -- "marked failed" from "no-op". if redis.call('EXISTS', entryKey) == 0 then return 0 end redis.call('HSET', entryKey, 'status', 'FAILED', 'lastError', errorPayload) + + -- The drainer has already written a SYSTEM_FAILURE PG row for + -- terminal failures (see mollifierDrainerHandler.server.ts), so + -- the buffer entry is no longer load-bearing. Clear the + -- idempotency lookup — PG's unique constraint is the canonical + -- dedup mechanism post-materialise — and drop the entry hash so + -- failed runs don't accrete forever now that there's no + -- accept-time TTL. + local lookupKey = redis.call('HGET', entryKey, 'idempotencyLookupKey') + if lookupKey and lookupKey ~= '' then + redis.call('DEL', lookupKey) + end + redis.call('DEL', entryKey) return 1 `, }); @@ -362,10 +927,11 @@ declare module "@internal/redis" { orgId: string, payload: string, createdAt: string, - ttlSeconds: string, + createdAtMicros: string, orgEnvsPrefix: string, - callback?: Callback, - ): Result; + idempotencyLookupKey: string, + callback?: Callback, + ): Result; popAndMarkDraining( queueKey: string, orgsKey: string, @@ -382,6 +948,34 @@ declare module "@internal/redis" { orgEnvsPrefix: string, callback?: Callback, ): Result; + mutateMollifierSnapshot( + entryKey: string, + patchJson: string, + callback?: Callback, + ): Result; + casSetMollifierMetadata( + entryKey: string, + expectedVersion: string, + newMetadata: string, + newMetadataType: string, + callback?: Callback, + ): Result; + resetMollifierIdempotency( + lookupKey: string, + entryPrefix: string, + callback?: Callback, + ): Result; + claimMollifierIdempotency( + claimKey: string, + pendingMarker: string, + ttlSeconds: string, + callback?: Callback, + ): Result; + ackMollifierEntry( + entryKey: string, + graceTtlSeconds: string, + callback?: Callback, + ): Result; failMollifierEntry( entryKey: string, errorPayload: string, diff --git a/packages/redis-worker/src/mollifier/index.ts b/packages/redis-worker/src/mollifier/index.ts index 5e6fe202e3d..2751a6615eb 100644 --- a/packages/redis-worker/src/mollifier/index.ts +++ b/packages/redis-worker/src/mollifier/index.ts @@ -1,4 +1,13 @@ -export { MollifierBuffer, type MollifierBufferOptions } from "./buffer.js"; +export { + MollifierBuffer, + type MollifierBufferOptions, + type SnapshotPatch, + type MutateSnapshotResult, + type CasSetMetadataResult, + type IdempotencyClaimResult, + type IdempotencyLookupInput, + IDEMPOTENCY_CLAIM_PENDING, +} from "./buffer.js"; export { MollifierDrainer, type MollifierDrainerOptions, diff --git a/packages/redis-worker/src/mollifier/schemas.ts b/packages/redis-worker/src/mollifier/schemas.ts index f93b0f0a3c3..c5d9915575a 100644 --- a/packages/redis-worker/src/mollifier/schemas.ts +++ b/packages/redis-worker/src/mollifier/schemas.ts @@ -27,6 +27,10 @@ const stringToDate = z.string().transform((v, ctx) => { return d; }); +const stringToBool = z + .union([z.literal("true"), z.literal("false")]) + .transform((v) => v === "true"); + const stringToError = z.string().transform((v, ctx) => { try { return BufferEntryError.parse(JSON.parse(v)); @@ -44,6 +48,24 @@ export const BufferEntrySchema = z.object({ status: BufferEntryStatus, attempts: stringToInt, createdAt: stringToDate, + // Microsecond epoch matching the ZSET queue score. Stable across + // requeues — the score never moves once set at accept time. + createdAtMicros: stringToInt, + // Drainer-ack flag: `true` once the drainer has materialised this run + // into PG. The hash persists for a short grace TTL after ack so direct + // reads (retrieve, trace, etc.) still resolve while PG replica lag + // settles. Absent on pre-ack entries. + materialised: stringToBool.default("false"), + // Denormalised pointer to the Redis idempotency lookup key (set when + // the run was accepted with an idempotency key, empty otherwise). The + // ack Lua reads this to DEL the lookup atomically with marking the + // entry materialised (Q5). + idempotencyLookupKey: z.string().optional().default(""), + // Optimistic-lock counter for the snapshot's `metadata` field. + // Incremented atomically by the CAS metadata Lua. Matches the + // semantic of `TaskRun.metadataVersion` on the PG side (which the + // UpdateMetadataService uses for the same retry-on-conflict pattern). + metadataVersion: stringToInt.default("0"), lastError: stringToError.optional(), }); From 816584658e705162aa6986a7c6d89fd2be588797 Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Tue, 26 May 2026 12:11:51 +0100 Subject: [PATCH 2/3] test(redis-worker): align drainer tests with buffer ack/fail semantics After the buffer extensions in this PR: - ack() keeps the entry alive with a grace TTL as a read-fallback safety net. Test asserts the entry persists with materialised=true. - fail() deletes the entry once the drainer-handler has written the canonical SYSTEM_FAILURE PG row. Tests assert the entry is null and use runOnce()'s `failed` counter as the surviving signal. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/mollifier/drainer.test.ts | 23 +++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/packages/redis-worker/src/mollifier/drainer.test.ts b/packages/redis-worker/src/mollifier/drainer.test.ts index c8f68977f69..c98c733b0fc 100644 --- a/packages/redis-worker/src/mollifier/drainer.test.ts +++ b/packages/redis-worker/src/mollifier/drainer.test.ts @@ -6,7 +6,6 @@ import { MollifierDrainer } from "./drainer.js"; import { serialiseSnapshot } from "./schemas.js"; const noopOptions = { - entryTtlSeconds: 600, logger: new Logger("test", "log"), }; @@ -87,8 +86,11 @@ describe("MollifierDrainer.runOnce", () => { payload: { foo: 1 }, }); + // After ack the entry persists as a read-fallback safety net with + // materialised=true and a fresh grace TTL (Q1 D2 / Phase B2). const entry = await buffer.getEntry("run_1"); - expect(entry).toBeNull(); + expect(entry).not.toBeNull(); + expect(entry!.materialised).toBe(true); } finally { await buffer.close(); } @@ -167,9 +169,14 @@ describe("MollifierDrainer error handling", () => { expect(after2!.status).toBe("QUEUED"); expect(after2!.attempts).toBe(2); - await drainer.runOnce(); + const result3 = await drainer.runOnce(); + // On attempt 3 the drainer hits maxAttempts and calls fail(), + // which deletes the entry — once the drainer-handler has written + // the SYSTEM_FAILURE PG row the buffer entry is no longer + // load-bearing. The runOnce result is the surviving signal. const after3 = await buffer.getEntry("run_r"); - expect(after3!.status).toBe("FAILED"); + expect(after3).toBeNull(); + expect(result3.failed).toBe(1); expect(calls).toBe(3); } finally { await buffer.close(); @@ -202,11 +209,13 @@ describe("MollifierDrainer error handling", () => { try { await buffer.accept({ runId: "run_nr", envId: "env_a", orgId: "org_1", payload: "{}" }); - await drainer.runOnce(); + const result = await drainer.runOnce(); + // fail() deletes the entry once the drainer-handler has written + // the canonical SYSTEM_FAILURE PG row. const entry = await buffer.getEntry("run_nr"); - expect(entry!.status).toBe("FAILED"); - expect(entry!.lastError).toEqual({ code: "Error", message: "validation failure" }); + expect(entry).toBeNull(); + expect(result.failed).toBe(1); } finally { await buffer.close(); } From 02cfe1af10563182744f9856e731b6be11b38b7a Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Tue, 26 May 2026 14:10:13 +0100 Subject: [PATCH 3/3] fix(redis-worker): encode mollifier composite-key segments + per-claim ownership token Addresses code-review feedback on the buffer's idempotency keying: - Encode `envId` / `taskIdentifier` / `idempotencyKey` with base64url before concatenation so customer-supplied segments containing `:` cannot alias each other onto the same Redis key. Exports `idempotencyLookupKeyFor` so tests assert against the same encoding the buffer writes. - Replace the shared `"pending"` claim marker with a caller-supplied ownership token (`"pending:"`). `publishClaim` and `releaseClaim` become compare-and-set / compare-and-delete via Lua, so a late release from a previous claimant whose TTL expired cannot erase a new owner's claim. New buffer tests cover the alias-collision case, the encoded-key-shape contract, and the token-ownership safety properties (stale release is a no-op, wrong-token publish is a no-op, fresh claim survives the post-TTL-expiry stale-release race). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../redis-worker/src/mollifier/buffer.test.ts | 301 +++++++++++++++++- packages/redis-worker/src/mollifier/buffer.ts | 148 +++++++-- packages/redis-worker/src/mollifier/index.ts | 2 +- 3 files changed, 420 insertions(+), 31 deletions(-) diff --git a/packages/redis-worker/src/mollifier/buffer.test.ts b/packages/redis-worker/src/mollifier/buffer.test.ts index a4c1be35eb3..5ce6be7f09b 100644 --- a/packages/redis-worker/src/mollifier/buffer.test.ts +++ b/packages/redis-worker/src/mollifier/buffer.test.ts @@ -2,7 +2,7 @@ import { describe, expect, it } from "vitest"; import { BufferEntrySchema, serialiseSnapshot, deserialiseSnapshot } from "./schemas.js"; import { redisTest } from "@internal/testcontainers"; import { Logger } from "@trigger.dev/core/logger"; -import { MollifierBuffer } from "./buffer.js"; +import { MollifierBuffer, idempotencyLookupKeyFor } from "./buffer.js"; describe("schemas", () => { it("serialiseSnapshot then deserialiseSnapshot is identity for plain objects", () => { @@ -1110,7 +1110,11 @@ describe("MollifierBuffer idempotency lookup", () => { }); expect(result).toEqual({ kind: "accepted" }); - const lookupKey = "mollifier:idempotency:env_i:my-task:ikey-1"; + const lookupKey = idempotencyLookupKeyFor({ + envId: "env_i", + taskIdentifier: "my-task", + idempotencyKey: "ikey-1", + }); const stored = await buffer["redis"].get(lookupKey); expect(stored).toBe("ri1"); // -1 = key exists with no TTL set. @@ -1241,7 +1245,11 @@ describe("MollifierBuffer idempotency lookup", () => { }); try { // Plant a stale lookup pointing at a non-existent entry. - const lookupKey = "mollifier:idempotency:env_i:t:stale"; + const lookupKey = idempotencyLookupKeyFor({ + envId: "env_i", + taskIdentifier: "t", + idempotencyKey: "stale", + }); await buffer["redis"].set(lookupKey, "rl-stale", "EX", 600); expect(await buffer["redis"].get(lookupKey)).toBe("rl-stale"); @@ -1283,7 +1291,11 @@ describe("MollifierBuffer idempotency lookup", () => { await buffer.pop("env_i"); await buffer.ack("ra1"); - const lookupKey = "mollifier:idempotency:env_i:t:ka"; + const lookupKey = idempotencyLookupKeyFor({ + envId: "env_i", + taskIdentifier: "t", + idempotencyKey: "ka", + }); expect(await buffer["redis"].get(lookupKey)).toBeNull(); const entry = await buffer.getEntry("ra1"); expect(entry!.materialised).toBe(true); @@ -1327,7 +1339,11 @@ describe("MollifierBuffer idempotency lookup", () => { expect(result.clearedRunId).toBe("rr1"); // Lookup is gone. - const lookupKey = "mollifier:idempotency:env_i:t:kr"; + const lookupKey = idempotencyLookupKeyFor({ + envId: "env_i", + taskIdentifier: "t", + idempotencyKey: "kr", + }); expect(await buffer["redis"].get(lookupKey)).toBeNull(); // Snapshot's idempotency fields are nulled, other fields kept. @@ -2028,3 +2044,278 @@ describe("MollifierBuffer.listEntriesForEnv", () => { } }); }); + +// Composite-key safety. The Redis-key builders concatenate +// `(envId, taskIdentifier, idempotencyKey)` with `:` separators; without +// per-segment encoding, `taskIdentifier="a:b"` and `idempotencyKey="x"` +// would map to the same key as `taskIdentifier="a"` and +// `idempotencyKey="b:x"`. base64url encoding has no `:` in its alphabet, +// so the encoded keys are unique per tuple. +describe("MollifierBuffer composite-key encoding (collision resistance)", () => { + redisTest( + "two accepts whose unencoded keys would alias don't collide on the idempotency lookup", + { timeout: 30_000 }, + async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + try { + // Aliased tuples under raw `:` concatenation: + // env_x : "a:b" : "x" → "mollifier:idempotency:env_x:a:b:x" + // env_x : "a" : "b:x" → "mollifier:idempotency:env_x:a:b:x" + const r1 = await buffer.accept({ + runId: "ck_run_1", + envId: "env_x", + orgId: "org_1", + payload: "{}", + taskIdentifier: "a:b", + idempotencyKey: "x", + }); + const r2 = await buffer.accept({ + runId: "ck_run_2", + envId: "env_x", + orgId: "org_1", + payload: "{}", + taskIdentifier: "a", + idempotencyKey: "b:x", + }); + // Both accepted — no false-positive collision. + expect(r1).toEqual({ kind: "accepted" }); + expect(r2).toEqual({ kind: "accepted" }); + + // Each tuple resolves to its own runId. + const hit1 = await buffer.lookupIdempotency({ + envId: "env_x", + taskIdentifier: "a:b", + idempotencyKey: "x", + }); + const hit2 = await buffer.lookupIdempotency({ + envId: "env_x", + taskIdentifier: "a", + idempotencyKey: "b:x", + }); + expect(hit1).toBe("ck_run_1"); + expect(hit2).toBe("ck_run_2"); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "encoded lookup key contains no ':' separator beyond the namespace", + { timeout: 20_000 }, + async () => { + // Pure-function test — verifies the encoding bijection without + // needing a live buffer. Re-uses the redisTest fixture for + // parallelism with other describe blocks but doesn't touch redis. + const key = idempotencyLookupKeyFor({ + envId: "env_x", + taskIdentifier: "a:b", + idempotencyKey: "x:y:z", + }); + // namespace prefix is exactly `mollifier:idempotency:` (two `:`), + // then three base64url segments separated by two more `:` — + // never the customer-supplied colons. + const colonCount = key.split(":").length - 1; + expect(colonCount).toBe(4); + // base64url alphabet has no `:`, `+`, `/`, or `=`. + const afterNamespace = key.slice("mollifier:idempotency:".length); + expect(afterNamespace).toMatch(/^[A-Za-z0-9_\-]+:[A-Za-z0-9_\-]+:[A-Za-z0-9_\-]+$/); + }, + ); +}); + +// Pre-gate claim ownership protection. The claim slot stores +// `"pending:"` so publish and release compare-and-act on the +// caller's token — a late release from a previous claimant whose TTL +// expired cannot erase a new owner's claim. +describe("MollifierBuffer pre-gate claim — ownership token safety", () => { + const claimInput = { + envId: "env_c", + taskIdentifier: "task_c", + idempotencyKey: "key_c", + }; + + redisTest( + "claimIdempotency: first caller gets 'claimed', second concurrent caller gets 'pending'", + { timeout: 20_000 }, + async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + try { + const first = await buffer.claimIdempotency({ + ...claimInput, + token: "token-A", + ttlSeconds: 30, + }); + expect(first.kind).toBe("claimed"); + + // Second concurrent caller with a different token sees pending. + const second = await buffer.claimIdempotency({ + ...claimInput, + token: "token-B", + ttlSeconds: 30, + }); + expect(second.kind).toBe("pending"); + + // readClaim distinguishes pending from resolved without leaking + // the token to the loser. + const read = await buffer.readClaim(claimInput); + expect(read?.kind).toBe("pending"); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "releaseClaim with the wrong token is a no-op (compare-and-delete)", + { timeout: 20_000 }, + async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + try { + await buffer.claimIdempotency({ ...claimInput, token: "owner", ttlSeconds: 30 }); + + // Pretend a stale claimant fires a release with their old token. + await buffer.releaseClaim({ ...claimInput, token: "stale-impostor" }); + + // The owner's claim survives. + const stillThere = await buffer.readClaim(claimInput); + expect(stillThere?.kind).toBe("pending"); + + // The owner can still release. + await buffer.releaseClaim({ ...claimInput, token: "owner" }); + expect(await buffer.readClaim(claimInput)).toBeNull(); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "publishClaim with the wrong token is a no-op and returns false", + { timeout: 20_000 }, + async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + try { + await buffer.claimIdempotency({ ...claimInput, token: "owner", ttlSeconds: 30 }); + + const wrongTokenPublish = await buffer.publishClaim({ + ...claimInput, + token: "stale-impostor", + runId: "imposter-run", + ttlSeconds: 60, + }); + expect(wrongTokenPublish).toBe(false); + + // Claim slot unchanged. + const stillPending = await buffer.readClaim(claimInput); + expect(stillPending?.kind).toBe("pending"); + + const goodPublish = await buffer.publishClaim({ + ...claimInput, + token: "owner", + runId: "real-run", + ttlSeconds: 60, + }); + expect(goodPublish).toBe(true); + + const resolved = await buffer.readClaim(claimInput); + expect(resolved).toEqual({ kind: "resolved", runId: "real-run" }); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "regression: stale release after TTL expiry does NOT erase a fresh claim", + { timeout: 20_000 }, + async ({ redisContainer }) => { + // Hazard from CodeRabbit r3290070707: + // 1. Claimant A SETNXs the slot with their token, then stalls. + // 2. TTL expires, slot vanishes. + // 3. Claimant B SETNXs the slot with a DIFFERENT token. + // 4. Claimant A finally finishes (or errors) and calls + // releaseClaim with their original token. + // Without compare-and-delete, A's release would wipe B's slot and + // any concurrent customer of B's idempotency key would see "no + // claim" and re-issue, breaking same-key dedup. + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + try { + // Step 1: A claims with token "A". + const a = await buffer.claimIdempotency({ + ...claimInput, + token: "A", + ttlSeconds: 1, // short TTL to simulate expiry quickly + }); + expect(a.kind).toBe("claimed"); + + // Step 2: simulate TTL expiry — DEL the slot directly so the + // test doesn't rely on wall-clock sleeping. + await buffer["redis"].del(`mollifier:claim:${[claimInput.envId, claimInput.taskIdentifier, claimInput.idempotencyKey] + .map((s) => Buffer.from(s, "utf8").toString("base64url")) + .join(":")}`); + + // Step 3: B claims with token "B". + const b = await buffer.claimIdempotency({ + ...claimInput, + token: "B", + ttlSeconds: 30, + }); + expect(b.kind).toBe("claimed"); + + // Step 4: A's late release. MUST be a no-op. + await buffer.releaseClaim({ ...claimInput, token: "A" }); + + // B's claim survives intact. + const after = await buffer.readClaim(claimInput); + expect(after?.kind).toBe("pending"); + + // B can still publish. + const published = await buffer.publishClaim({ + ...claimInput, + token: "B", + runId: "B-run", + ttlSeconds: 60, + }); + expect(published).toBe(true); + } finally { + await buffer.close(); + } + }, + ); +}); diff --git a/packages/redis-worker/src/mollifier/buffer.ts b/packages/redis-worker/src/mollifier/buffer.ts index fd53f59efea..37752d21623 100644 --- a/packages/redis-worker/src/mollifier/buffer.ts +++ b/packages/redis-worker/src/mollifier/buffer.ts @@ -43,21 +43,38 @@ export type IdempotencyLookupInput = { idempotencyKey: string; }; -function makeIdempotencyLookupKey(input: IdempotencyLookupInput): string { - return `mollifier:idempotency:${input.envId}:${input.taskIdentifier}:${input.idempotencyKey}`; +// Reversible encoding for Redis-key segments. The composite-key builders +// concatenate `envId`, `taskIdentifier`, and `idempotencyKey` with `:` +// separators; if any segment contains a literal `:` (envId is internal +// and `:`-free, but taskIdentifier and idempotencyKey are +// customer-supplied) different tuples would map to the same Redis key +// and dedupe the wrong run. base64url has no `:` in its alphabet and is +// bijective on the input string, so the encoded keys are +// collision-free. +function encodeKeyPart(value: string): string { + return Buffer.from(value, "utf8").toString("base64url"); +} + +// Exported so tests can compute the same Redis key the buffer writes +// without hard-coding the encoding (which is a buffer-internal detail). +export function idempotencyLookupKeyFor(input: IdempotencyLookupInput): string { + return `mollifier:idempotency:${encodeKeyPart(input.envId)}:${encodeKeyPart(input.taskIdentifier)}:${encodeKeyPart(input.idempotencyKey)}`; } // Pre-gate claim key namespace, distinct from `mollifier:idempotency` so the // existing B6a buffer-side dedup stays isolated. The claim is the // authoritative cross-store "this idempotency key is in flight or -// resolved" pointer used by the trigger hot path -// (`_plans/2026-05-21-mollifier-idempotency-claim.md`). Values: -// "pending" → a trigger pipeline owns the key and hasn't published yet -// → the winning trigger's runId (resolved) -export const IDEMPOTENCY_CLAIM_PENDING = "pending"; +// resolved" pointer used by the trigger hot path. Values: +// "pending:" → claimed by a trigger pipeline; `` is the +// caller-supplied ownership token. Release and +// publish compare-and-act on this token so a +// late release from a previous claimant whose TTL +// expired cannot erase a new owner's claim. +// → the winning trigger's resolved runId. +const PENDING_PREFIX = "pending:"; function makeIdempotencyClaimKey(input: IdempotencyLookupInput): string { - return `mollifier:claim:${input.envId}:${input.taskIdentifier}:${input.idempotencyKey}`; + return `mollifier:claim:${encodeKeyPart(input.envId)}:${encodeKeyPart(input.taskIdentifier)}:${encodeKeyPart(input.idempotencyKey)}`; } export type IdempotencyClaimResult = @@ -125,7 +142,7 @@ export class MollifierBuffer { const createdAtMicros = nowMs * 1000; const idempotencyLookupKey = input.idempotencyKey && input.taskIdentifier - ? makeIdempotencyLookupKey({ + ? idempotencyLookupKeyFor({ envId: input.envId, taskIdentifier: input.taskIdentifier, idempotencyKey: input.idempotencyKey, @@ -359,8 +376,12 @@ export class MollifierBuffer { // Atomic pre-gate claim on a (env, task, idempotencyKey) tuple. One // call across both PG and buffer paths serialises through this claim; // closes the race the buffer-side B6a SETNX leaves open during the - // gate-transition burst window (see - // `_plans/2026-05-21-mollifier-idempotency-claim.md`). + // gate-transition burst window. + // + // The caller supplies an opaque `token` (UUID) on claim. The same token + // MUST be passed to `publishClaim` / `releaseClaim`, which compare-and- + // act so a late release from a previous claimant whose TTL expired + // cannot erase a new owner's claim. // // - "claimed": we now own the claim, the caller proceeds with the // trigger pipeline and must `publishClaim` on success or @@ -370,12 +391,13 @@ export class MollifierBuffer { // - "resolved": the claim already holds a runId; the caller can // return that runId as a cached hit. async claimIdempotency( - input: IdempotencyLookupInput & { ttlSeconds: number }, + input: IdempotencyLookupInput & { token: string; ttlSeconds: number }, ): Promise { const claimKey = makeIdempotencyClaimKey(input); const raw = (await this.redis.claimMollifierIdempotency( claimKey, - IDEMPOTENCY_CLAIM_PENDING, + `${PENDING_PREFIX}${input.token}`, + PENDING_PREFIX, String(input.ttlSeconds), )) as string; if (raw === "claimed") return { kind: "claimed" }; @@ -389,18 +411,39 @@ export class MollifierBuffer { // Publish the winning runId to the claim so subsequent claimants / // waiters see "resolved". TTL bounded by the customer's // `idempotencyKeyExpiresAt` minus now; caller computes. + // + // Compare-and-set on the caller's token: if the current value isn't + // our pending marker (TTL expired and another claimant moved in, or + // someone else already published), the publish is a no-op. The caller + // can treat any such case as "we lost the claim" and re-read. + // Returns true if we published; false if the claim slot was no longer + // ours. async publishClaim( - input: IdempotencyLookupInput & { runId: string; ttlSeconds: number }, - ): Promise { + input: IdempotencyLookupInput & { token: string; runId: string; ttlSeconds: number }, + ): Promise { const claimKey = makeIdempotencyClaimKey(input); - await this.redis.set(claimKey, input.runId, "EX", input.ttlSeconds); + const result = (await this.redis.publishMollifierClaim( + claimKey, + `${PENDING_PREFIX}${input.token}`, + input.runId, + String(input.ttlSeconds), + )) as number; + return result === 1; } // Release the claim on pipeline error so waiters can re-claim and // retry. Idempotent. - async releaseClaim(input: IdempotencyLookupInput): Promise { + // + // Compare-and-delete on the caller's token: only deletes if the + // current value is exactly our pending marker. A late release from a + // claimant whose TTL expired is a no-op, so a new owner's claim is + // never wiped by a slow predecessor. + async releaseClaim(input: IdempotencyLookupInput & { token: string }): Promise { const claimKey = makeIdempotencyClaimKey(input); - await this.redis.del(claimKey); + await this.redis.releaseMollifierClaim( + claimKey, + `${PENDING_PREFIX}${input.token}`, + ); } // Read the current claim value, used by the wait/poll loop on losers @@ -409,7 +452,7 @@ export class MollifierBuffer { const claimKey = makeIdempotencyClaimKey(input); const value = await this.redis.get(claimKey); if (value === null) return null; - if (value === IDEMPOTENCY_CLAIM_PENDING) return { kind: "pending" }; + if (value.startsWith(PENDING_PREFIX)) return { kind: "pending" }; return { kind: "resolved", runId: value }; } @@ -419,7 +462,7 @@ export class MollifierBuffer { // lookup self-heals: if the lookup points at an entry hash that's // expired, we DEL the lookup and report a miss. async lookupIdempotency(input: IdempotencyLookupInput): Promise { - const lookupKey = makeIdempotencyLookupKey(input); + const lookupKey = idempotencyLookupKeyFor(input); const runId = await this.redis.get(lookupKey); if (!runId) return null; const entry = await this.getEntry(runId); @@ -435,7 +478,7 @@ export class MollifierBuffer { // `updateMany`. Returns the runId that was cleared, or null if no // buffered run held this key. async resetIdempotency(input: IdempotencyLookupInput): Promise<{ clearedRunId: string | null }> { - const lookupKey = makeIdempotencyLookupKey(input); + const lookupKey = idempotencyLookupKeyFor(input); const clearedRunId = (await this.redis.resetMollifierIdempotency( lookupKey, "mollifier:entries:", @@ -720,23 +763,65 @@ export class MollifierBuffer { numberOfKeys: 1, lua: ` local claimKey = KEYS[1] - local pending = ARGV[1] - local ttl = tonumber(ARGV[2]) + local pendingMarker = ARGV[1] -- "pending:" + local pendingPrefix = ARGV[2] -- "pending:" + local ttl = tonumber(ARGV[3]) -- SETNX-with-TTL: atomic; only one caller can win. - local won = redis.call('SET', claimKey, pending, 'NX', 'EX', ttl) + local won = redis.call('SET', claimKey, pendingMarker, 'NX', 'EX', ttl) if won then return 'claimed' end local existing = redis.call('GET', claimKey) - if existing == pending then + -- Any "pending:*" value is a live claim — the caller-supplied + -- token differentiates ownership but is opaque to losers. + if string.sub(existing, 1, string.len(pendingPrefix)) == pendingPrefix then return 'pending' end return 'resolved:' .. existing `, }); + // Publish a winning runId to a claim slot we own. Compare-and-set on + // the caller's pending marker: if the slot is no longer ours (TTL + // expired and another claimant moved in, or already resolved by + // someone else), we no-op. Returns 1 on publish, 0 on no-op. + this.redis.defineCommand("publishMollifierClaim", { + numberOfKeys: 1, + lua: ` + local claimKey = KEYS[1] + local ownerMarker = ARGV[1] -- "pending:" + local runId = ARGV[2] + local ttl = tonumber(ARGV[3]) + + local existing = redis.call('GET', claimKey) + if existing == ownerMarker then + redis.call('SET', claimKey, runId, 'EX', ttl) + return 1 + end + return 0 + `, + }); + + // Release a claim slot we own. Compare-and-delete on the caller's + // pending marker: a late release from a previous claimant whose TTL + // expired is a no-op, so a new owner's claim is never wiped. + this.redis.defineCommand("releaseMollifierClaim", { + numberOfKeys: 1, + lua: ` + local claimKey = KEYS[1] + local ownerMarker = ARGV[1] -- "pending:" + + local existing = redis.call('GET', claimKey) + if existing == ownerMarker then + redis.call('DEL', claimKey) + return 1 + end + return 0 + `, + }); + this.redis.defineCommand("resetMollifierIdempotency", { numberOfKeys: 1, lua: ` @@ -968,9 +1053,22 @@ declare module "@internal/redis" { claimMollifierIdempotency( claimKey: string, pendingMarker: string, + pendingPrefix: string, ttlSeconds: string, callback?: Callback, ): Result; + publishMollifierClaim( + claimKey: string, + ownerMarker: string, + runId: string, + ttlSeconds: string, + callback?: Callback, + ): Result; + releaseMollifierClaim( + claimKey: string, + ownerMarker: string, + callback?: Callback, + ): Result; ackMollifierEntry( entryKey: string, graceTtlSeconds: string, diff --git a/packages/redis-worker/src/mollifier/index.ts b/packages/redis-worker/src/mollifier/index.ts index 2751a6615eb..6e93f105fee 100644 --- a/packages/redis-worker/src/mollifier/index.ts +++ b/packages/redis-worker/src/mollifier/index.ts @@ -6,7 +6,7 @@ export { type CasSetMetadataResult, type IdempotencyClaimResult, type IdempotencyLookupInput, - IDEMPOTENCY_CLAIM_PENDING, + idempotencyLookupKeyFor, } from "./buffer.js"; export { MollifierDrainer,