Skip to content

feat(webapp): mollifier trigger-time decisions — mollify, claim, read fallback#3753

Draft
d-cs wants to merge 4 commits into
mollifier-phase-3-bufferfrom
mollifier-phase-3-trigger
Draft

feat(webapp): mollifier trigger-time decisions — mollify, claim, read fallback#3753
d-cs wants to merge 4 commits into
mollifier-phase-3-bufferfrom
mollifier-phase-3-trigger

Conversation

@d-cs
Copy link
Copy Markdown
Collaborator

@d-cs d-cs commented May 26, 2026

Summary

The trigger hot path's mollifier integration:

  • mollifyTrigger: when the gate trips, write the engine.trigger snapshot to the buffer and return a synthesised QUEUED response. Postgres write is deferred to drainer-replay (next PR in the stack).
  • Pre-gate idempotency-key claim: same-key triggers serialise through Redis so a burst lands in PG / buffer exactly once.
  • Read-fallback extensions: findRunByIdWithMollifierFallback for the trigger-time idempotency lookup that must see buffered runs.
  • Gate bypasses: debounce, oneTimeUseToken, parentTaskRunId/triggerAndWait skip the mollify path entirely.
  • triggerTask + IdempotencyKeyConcern wired to the above.

All behaviour gated by the master TRIGGER_MOLLIFIER_ENABLED switch; off-state hot path is unchanged (the gate is not even consulted).

Stacked on the buffer extensions PR.

Test plan

  • `pnpm run typecheck --filter webapp` passes
  • `pnpm run test --filter webapp test/mollifierMollify.test.ts` passes
  • `pnpm run test --filter webapp test/mollifierIdempotencyClaim.test.ts` passes
  • `pnpm run test --filter webapp test/mollifierReadFallback.test.ts` passes
  • `pnpm run test --filter webapp test/mollifierGate.test.ts` passes
  • `pnpm run test --filter webapp test/engine/triggerTask.test.ts` passes

@changeset-bot
Copy link
Copy Markdown

changeset-bot Bot commented May 26, 2026

⚠️ No Changeset found

Latest commit: 01f3958

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 26, 2026

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 6cd69154-28b5-490c-afe6-f4e58bb55506

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch mollifier-phase-3-trigger

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Comment thread apps/webapp/app/v3/mollifier/readFallback.server.ts
@d-cs d-cs self-assigned this May 26, 2026
@d-cs d-cs force-pushed the mollifier-phase-3-trigger branch from 626a8dc to af7368e Compare May 26, 2026 11:12
d-cs and others added 3 commits May 26, 2026 14:10
… fallback

The trigger hot path's mollifier integration:
- `mollifyTrigger`: when the gate trips, write the engine.trigger
  snapshot to the buffer and return a synthesised QUEUED response.
- Pre-gate idempotency-key claim: same-key triggers serialise through
  Redis so a burst lands in PG / buffer exactly once.
- Read-fallback extensions: `findRunByIdWithMollifierFallback` for the
  trigger-time idempotency lookup that must see buffered runs.
- Gate bypasses: debounce, oneTimeUseToken, parentTaskRun
  (triggerAndWait) skip the mollify path entirely.
- triggerTask + IdempotencyKeyConcern wired to the above.

Stacked on buffer extensions PR. All behaviour gated by the master
`TRIGGER_MOLLIFIER_ENABLED` switch; off-state hot path is unchanged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
`new Date("not-a-date")` returns a truthy Invalid Date object, which would
mis-classify the run as CANCELED in the read-fallback synthesised shape.
Add an `asDate` helper that rejects NaN-valued parses and use it for
`cancelledAt` and `delayUntil`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The buffer's claim API now requires a caller-supplied ownership token
so compare-and-act protects the slot against a stale predecessor.
Wires the token end-to-end:

- `claimOrAwait` generates the token (UUID) up front and reuses it
  across the retry path; returns it on the `claimed` outcome.
- `publishClaim` and `releaseClaim` wrappers accept and forward the
  token to the buffer.
- `ClaimedIdempotency` carries the token so the trigger pipeline can
  publish or release with the same token it claimed under.
- `triggerTask.server.ts` threads the token into the publish call.

Tests pin token round-trip: claimOrAwait → claimIdempotency, plus the
publish and release pass-through.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@d-cs d-cs force-pushed the mollifier-phase-3-trigger branch from 5a7bc19 to baa6f17 Compare May 26, 2026 13:24
Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 2 potential issues.

View 5 additional findings in Devin Review.

Open in Devin Review

Comment thread apps/webapp/app/runEngine/services/triggerTask.server.ts
Comment on lines 417 to +491
if (mollifierOutcome?.action === "mollify") {
const buffer = this.getMollifierBuffer();
if (buffer) {
const canonicalPayload = buildBufferedTriggerPayload({
const mollifierBuffer = this.getMollifierBuffer();
if (mollifierBuffer && !body.options?.debounce) {
event.setAttribute("mollifier.reason", mollifierOutcome.decision.reason);
event.setAttribute("mollifier.count", String(mollifierOutcome.decision.count));
event.setAttribute(
"mollifier.threshold",
String(mollifierOutcome.decision.threshold)
);
event.setAttribute("taskRunId", runFriendlyId);

const payloadPacket = await this.payloadProcessor.process(triggerRequest);

const engineTriggerInput = this.#buildEngineTriggerInput({
runFriendlyId,
environment,
idempotencyKey,
idempotencyKeyExpiresAt,
body,
options,
queueName,
lockedQueueId,
workerQueue,
enableFastPath,
lockedToBackgroundWorker: lockedToBackgroundWorker ?? undefined,
delayUntil,
ttl,
metadataPacket,
tags,
depth,
parentRun: parentRun ?? undefined,
annotations,
planType,
taskId,
payloadPacket,
traceContext: this.#propagateExternalTraceContext(
event.traceContext,
parentRun?.traceContext,
event.traceparent?.spanId
),
traceId: event.traceId,
spanId: event.spanId,
parentSpanId:
options.parentAsLinkType === "replay"
? undefined
: event.traceparent?.spanId,
taskEventStore: store,
});

const result = await mollifyTrigger({
runFriendlyId,
environmentId: environment.id,
organizationId: environment.organizationId,
engineTriggerInput,
decision: mollifierOutcome.decision,
buffer: mollifierBuffer,
// Idempotency-key triple wires the buffer's SETNX into
// the trigger-time dedup symmetric with PG (Q5).
idempotencyKey,
taskIdentifier: taskId,
});

logger.info("mollifier.buffered", {
runId: runFriendlyId,
envId: environment.id,
envType: environment.type,
envSlug: environment.slug,
orgId: environment.organizationId,
orgSlug: environment.organization.slug,
projectId: environment.projectId,
projectRef: environment.project.externalRef,
body,
idempotencyKey: idempotencyKey ?? null,
idempotencyKeyExpiresAt: idempotencyKey
? idempotencyKeyExpiresAt ?? null
: null,
tags,
parentRunFriendlyId: parentRun?.friendlyId ?? null,
traceContext: event.traceContext,
triggerSource,
triggerAction,
serviceOptions: options,
createdAt: new Date(),
taskId,
reason: mollifierOutcome.decision.reason,
});

try {
const serialisedPayload = serialiseSnapshot(canonicalPayload);
await buffer.accept({
runId: runFriendlyId,
envId: environment.id,
orgId: environment.organizationId,
payload: serialisedPayload,
});
// Light log on the hot path — keep this synchronous work
// O(1) per trigger. The drainer computes the payload hash
// off-path; operators correlate `mollifier.buffered` →
// `mollifier.drained` by runId.
logger.debug("mollifier.buffered", {
runId: runFriendlyId,
envId: environment.id,
orgId: environment.organizationId,
taskId,
payloadBytes: serialisedPayload.length,
});
} catch (err) {
// Fail-open: buffer write must never block the customer's
// trigger. engine.trigger below is the primary write path
// in Phase 1 — the customer still gets a valid run.
logger.error("mollifier.buffer_accept_failed", {
runId: runFriendlyId,
envId: environment.id,
taskId,
err: err instanceof Error ? err.message : String(err),
});
}
// Synthetic result is structurally narrower than the full
// TaskRun; the route handler only reads
// `result.run.friendlyId`. traceRun flushes the PARTIAL
// run-span event to ClickHouse on callback return.
return result as unknown as TriggerTaskServiceResult;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Gate evaluation moved inside traceRun callback — changes error-handling scope

The evaluateGate call was moved from before the try { traceEventConcern.traceRun(...) } block (old code line ~355) to inside the traceRun callback (new code triggerTask.server.ts:390). This means gate failures now propagate through the traceRun span rather than being caught at the outer try/catch level. Since the gate has its own fail-open error handling (returns pass_through on any error per mollifierGate.server.ts:217-228), this is safe in practice. However, the getMollifierBuffer() call at line 418 does NOT have fail-open protection — if getMollifierBuffer returns a non-null buffer but mollifyTrigger throws (e.g., Redis connection error during buffer.accept), the error now propagates through traceRun and hits the inner catch at line 601, which only handles RunDuplicateIdempotencyKeyError and RunOneTimeUseTokenError. Other errors are re-thrown, eventually reaching the outer catch at line 635 which releases the claim and re-throws to the route handler. This means a buffer failure on the mollify path results in a 500 to the customer, whereas Phase 1's dual-write had explicit fail-open with fallback to engine.trigger. This is a design choice (buffer IS the primary write in Phase 3), but operators should be aware that buffer Redis availability is now on the critical path for mollified triggers.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You read it right — this is intentional for Phase 3 and worth calling out for operators.

Phase 1 was dual-write (buffer.accept + engine.trigger) with explicit fail-open if either side hiccuped. Phase 3 makes the buffer the sole write on the mollify path: the drainer is what materialises the PG row. So a buffer.accept failure has nowhere safe to fall through to — falling back to engine.trigger would either (a) duplicate the run if the Lua's atomic SETNX had actually executed before the network drop, or (b) bypass the rate-limiting gate entirely under sustained Redis outage, defeating the whole burst-buffer mechanism.

So the trade-off is: customers see 5xx on a buffer outage (vs Phase 1's graceful degradation), but the rate-limit-during-burst guarantee is preserved. The intended ops responses:

  • Short blip: ioredis retries (maxRetriesPerRequest: 20, ~50ms-1s backoff) absorb it before the request times out.
  • Sustained outage: operator flips TRIGGER_MOLLIFIER_ENABLED=0. The hot-path guard at triggerTask.server.ts short-circuits the gate before getMollifierBuffer() ever runs (no buffer construction attempt; gate isn't even consulted), so triggers immediately revert to direct engine.trigger.

Not adding fail-open from the mollify branch to engine.trigger because of the duplicate-run / gate-bypass risks above. The runbook entry for buffer Redis outage points at the master kill switch.

…t leak fix)

Devin review flagged a security + correctness bug: the mollify path
returned \`MollifySyntheticResult\` with run shape
\`{ friendlyId, spanId }\` and the call site cast it to
\`TriggerTaskServiceResult\` (which expects \`run: TaskRun\` with an
\`id: string\`). Downstream, the trigger route calls
\`saveRequestIdempotency(requestKey, "trigger", result.run.id)\` — so
the cache stored \`undefined\` as the entity id. On SDK retry the
request-idempotency flow then ran
\`prisma.taskRun.findFirst({ where: { id: undefined } })\`. Prisma
strips \`undefined\` from where clauses, so the query degenerates to an
unfiltered \`findFirst\` and returns an arbitrary TaskRun row —
potentially from another env / user.

Fix:
- Add \`id: string\` to \`MollifySyntheticResult.run\`.
- Compute it via \`RunId.fromFriendlyId(...)\` on both branches:
  the happy-accept path (id from \`args.runFriendlyId\`) and the
  \`duplicate_idempotency\` race-loser path (id from
  \`result.existingRunId\` so the response carries the WINNER's id).
- Add regression tests pinning both branches.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant