feat(webapp): mollifier trigger-time decisions — mollify, claim, read fallback#3753
feat(webapp): mollifier trigger-time decisions — mollify, claim, read fallback#3753d-cs wants to merge 4 commits into
Conversation
|
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Repository UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
626a8dc to
af7368e
Compare
… fallback The trigger hot path's mollifier integration: - `mollifyTrigger`: when the gate trips, write the engine.trigger snapshot to the buffer and return a synthesised QUEUED response. - Pre-gate idempotency-key claim: same-key triggers serialise through Redis so a burst lands in PG / buffer exactly once. - Read-fallback extensions: `findRunByIdWithMollifierFallback` for the trigger-time idempotency lookup that must see buffered runs. - Gate bypasses: debounce, oneTimeUseToken, parentTaskRun (triggerAndWait) skip the mollify path entirely. - triggerTask + IdempotencyKeyConcern wired to the above. Stacked on buffer extensions PR. All behaviour gated by the master `TRIGGER_MOLLIFIER_ENABLED` switch; off-state hot path is unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
`new Date("not-a-date")` returns a truthy Invalid Date object, which would
mis-classify the run as CANCELED in the read-fallback synthesised shape.
Add an `asDate` helper that rejects NaN-valued parses and use it for
`cancelledAt` and `delayUntil`.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The buffer's claim API now requires a caller-supplied ownership token so compare-and-act protects the slot against a stale predecessor. Wires the token end-to-end: - `claimOrAwait` generates the token (UUID) up front and reuses it across the retry path; returns it on the `claimed` outcome. - `publishClaim` and `releaseClaim` wrappers accept and forward the token to the buffer. - `ClaimedIdempotency` carries the token so the trigger pipeline can publish or release with the same token it claimed under. - `triggerTask.server.ts` threads the token into the publish call. Tests pin token round-trip: claimOrAwait → claimIdempotency, plus the publish and release pass-through. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
5a7bc19 to
baa6f17
Compare
| if (mollifierOutcome?.action === "mollify") { | ||
| const buffer = this.getMollifierBuffer(); | ||
| if (buffer) { | ||
| const canonicalPayload = buildBufferedTriggerPayload({ | ||
| const mollifierBuffer = this.getMollifierBuffer(); | ||
| if (mollifierBuffer && !body.options?.debounce) { | ||
| event.setAttribute("mollifier.reason", mollifierOutcome.decision.reason); | ||
| event.setAttribute("mollifier.count", String(mollifierOutcome.decision.count)); | ||
| event.setAttribute( | ||
| "mollifier.threshold", | ||
| String(mollifierOutcome.decision.threshold) | ||
| ); | ||
| event.setAttribute("taskRunId", runFriendlyId); | ||
|
|
||
| const payloadPacket = await this.payloadProcessor.process(triggerRequest); | ||
|
|
||
| const engineTriggerInput = this.#buildEngineTriggerInput({ | ||
| runFriendlyId, | ||
| environment, | ||
| idempotencyKey, | ||
| idempotencyKeyExpiresAt, | ||
| body, | ||
| options, | ||
| queueName, | ||
| lockedQueueId, | ||
| workerQueue, | ||
| enableFastPath, | ||
| lockedToBackgroundWorker: lockedToBackgroundWorker ?? undefined, | ||
| delayUntil, | ||
| ttl, | ||
| metadataPacket, | ||
| tags, | ||
| depth, | ||
| parentRun: parentRun ?? undefined, | ||
| annotations, | ||
| planType, | ||
| taskId, | ||
| payloadPacket, | ||
| traceContext: this.#propagateExternalTraceContext( | ||
| event.traceContext, | ||
| parentRun?.traceContext, | ||
| event.traceparent?.spanId | ||
| ), | ||
| traceId: event.traceId, | ||
| spanId: event.spanId, | ||
| parentSpanId: | ||
| options.parentAsLinkType === "replay" | ||
| ? undefined | ||
| : event.traceparent?.spanId, | ||
| taskEventStore: store, | ||
| }); | ||
|
|
||
| const result = await mollifyTrigger({ | ||
| runFriendlyId, | ||
| environmentId: environment.id, | ||
| organizationId: environment.organizationId, | ||
| engineTriggerInput, | ||
| decision: mollifierOutcome.decision, | ||
| buffer: mollifierBuffer, | ||
| // Idempotency-key triple wires the buffer's SETNX into | ||
| // the trigger-time dedup symmetric with PG (Q5). | ||
| idempotencyKey, | ||
| taskIdentifier: taskId, | ||
| }); | ||
|
|
||
| logger.info("mollifier.buffered", { | ||
| runId: runFriendlyId, | ||
| envId: environment.id, | ||
| envType: environment.type, | ||
| envSlug: environment.slug, | ||
| orgId: environment.organizationId, | ||
| orgSlug: environment.organization.slug, | ||
| projectId: environment.projectId, | ||
| projectRef: environment.project.externalRef, | ||
| body, | ||
| idempotencyKey: idempotencyKey ?? null, | ||
| idempotencyKeyExpiresAt: idempotencyKey | ||
| ? idempotencyKeyExpiresAt ?? null | ||
| : null, | ||
| tags, | ||
| parentRunFriendlyId: parentRun?.friendlyId ?? null, | ||
| traceContext: event.traceContext, | ||
| triggerSource, | ||
| triggerAction, | ||
| serviceOptions: options, | ||
| createdAt: new Date(), | ||
| taskId, | ||
| reason: mollifierOutcome.decision.reason, | ||
| }); | ||
|
|
||
| try { | ||
| const serialisedPayload = serialiseSnapshot(canonicalPayload); | ||
| await buffer.accept({ | ||
| runId: runFriendlyId, | ||
| envId: environment.id, | ||
| orgId: environment.organizationId, | ||
| payload: serialisedPayload, | ||
| }); | ||
| // Light log on the hot path — keep this synchronous work | ||
| // O(1) per trigger. The drainer computes the payload hash | ||
| // off-path; operators correlate `mollifier.buffered` → | ||
| // `mollifier.drained` by runId. | ||
| logger.debug("mollifier.buffered", { | ||
| runId: runFriendlyId, | ||
| envId: environment.id, | ||
| orgId: environment.organizationId, | ||
| taskId, | ||
| payloadBytes: serialisedPayload.length, | ||
| }); | ||
| } catch (err) { | ||
| // Fail-open: buffer write must never block the customer's | ||
| // trigger. engine.trigger below is the primary write path | ||
| // in Phase 1 — the customer still gets a valid run. | ||
| logger.error("mollifier.buffer_accept_failed", { | ||
| runId: runFriendlyId, | ||
| envId: environment.id, | ||
| taskId, | ||
| err: err instanceof Error ? err.message : String(err), | ||
| }); | ||
| } | ||
| // Synthetic result is structurally narrower than the full | ||
| // TaskRun; the route handler only reads | ||
| // `result.run.friendlyId`. traceRun flushes the PARTIAL | ||
| // run-span event to ClickHouse on callback return. | ||
| return result as unknown as TriggerTaskServiceResult; |
There was a problem hiding this comment.
🚩 Gate evaluation moved inside traceRun callback — changes error-handling scope
The evaluateGate call was moved from before the try { traceEventConcern.traceRun(...) } block (old code line ~355) to inside the traceRun callback (new code triggerTask.server.ts:390). This means gate failures now propagate through the traceRun span rather than being caught at the outer try/catch level. Since the gate has its own fail-open error handling (returns pass_through on any error per mollifierGate.server.ts:217-228), this is safe in practice. However, the getMollifierBuffer() call at line 418 does NOT have fail-open protection — if getMollifierBuffer returns a non-null buffer but mollifyTrigger throws (e.g., Redis connection error during buffer.accept), the error now propagates through traceRun and hits the inner catch at line 601, which only handles RunDuplicateIdempotencyKeyError and RunOneTimeUseTokenError. Other errors are re-thrown, eventually reaching the outer catch at line 635 which releases the claim and re-throws to the route handler. This means a buffer failure on the mollify path results in a 500 to the customer, whereas Phase 1's dual-write had explicit fail-open with fallback to engine.trigger. This is a design choice (buffer IS the primary write in Phase 3), but operators should be aware that buffer Redis availability is now on the critical path for mollified triggers.
Was this helpful? React with 👍 or 👎 to provide feedback.
There was a problem hiding this comment.
You read it right — this is intentional for Phase 3 and worth calling out for operators.
Phase 1 was dual-write (buffer.accept + engine.trigger) with explicit fail-open if either side hiccuped. Phase 3 makes the buffer the sole write on the mollify path: the drainer is what materialises the PG row. So a buffer.accept failure has nowhere safe to fall through to — falling back to engine.trigger would either (a) duplicate the run if the Lua's atomic SETNX had actually executed before the network drop, or (b) bypass the rate-limiting gate entirely under sustained Redis outage, defeating the whole burst-buffer mechanism.
So the trade-off is: customers see 5xx on a buffer outage (vs Phase 1's graceful degradation), but the rate-limit-during-burst guarantee is preserved. The intended ops responses:
- Short blip: ioredis retries (
maxRetriesPerRequest: 20, ~50ms-1s backoff) absorb it before the request times out. - Sustained outage: operator flips
TRIGGER_MOLLIFIER_ENABLED=0. The hot-path guard attriggerTask.server.tsshort-circuits the gate beforegetMollifierBuffer()ever runs (no buffer construction attempt; gate isn't even consulted), so triggers immediately revert to directengine.trigger.
Not adding fail-open from the mollify branch to engine.trigger because of the duplicate-run / gate-bypass risks above. The runbook entry for buffer Redis outage points at the master kill switch.
…t leak fix)
Devin review flagged a security + correctness bug: the mollify path
returned \`MollifySyntheticResult\` with run shape
\`{ friendlyId, spanId }\` and the call site cast it to
\`TriggerTaskServiceResult\` (which expects \`run: TaskRun\` with an
\`id: string\`). Downstream, the trigger route calls
\`saveRequestIdempotency(requestKey, "trigger", result.run.id)\` — so
the cache stored \`undefined\` as the entity id. On SDK retry the
request-idempotency flow then ran
\`prisma.taskRun.findFirst({ where: { id: undefined } })\`. Prisma
strips \`undefined\` from where clauses, so the query degenerates to an
unfiltered \`findFirst\` and returns an arbitrary TaskRun row —
potentially from another env / user.
Fix:
- Add \`id: string\` to \`MollifySyntheticResult.run\`.
- Compute it via \`RunId.fromFriendlyId(...)\` on both branches:
the happy-accept path (id from \`args.runFriendlyId\`) and the
\`duplicate_idempotency\` race-loser path (id from
\`result.existingRunId\` so the response carries the WINNER's id).
- Add regression tests pinning both branches.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Summary
The trigger hot path's mollifier integration:
mollifyTrigger: when the gate trips, write the engine.trigger snapshot to the buffer and return a synthesised QUEUED response. Postgres write is deferred to drainer-replay (next PR in the stack).findRunByIdWithMollifierFallbackfor the trigger-time idempotency lookup that must see buffered runs.debounce,oneTimeUseToken,parentTaskRunId/triggerAndWaitskip the mollify path entirely.triggerTask+IdempotencyKeyConcernwired to the above.All behaviour gated by the master
TRIGGER_MOLLIFIER_ENABLEDswitch; off-state hot path is unchanged (the gate is not even consulted).Stacked on the buffer extensions PR.
Test plan