From 7c87c6ac3bebd421c81495aab55dfd49a176060e Mon Sep 17 00:00:00 2001 From: onefeng Date: Tue, 26 May 2026 17:59:00 +0800 Subject: [PATCH 1/2] fix: run send middlewares when requeueing tasks --- taskiq/context.py | 14 +++++++++++++- tests/test_requeue.py | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/taskiq/context.py b/taskiq/context.py index f9f1d0ee..f1c46c2f 100644 --- a/taskiq/context.py +++ b/taskiq/context.py @@ -1,8 +1,10 @@ from typing import TYPE_CHECKING from taskiq.abc.broker import AsyncBroker +from taskiq.abc.middleware import TaskiqMiddleware from taskiq.exceptions import NoResultError, TaskRejectedError from taskiq.message import TaskiqMessage +from taskiq.utils import maybe_awaitable if TYPE_CHECKING: # pragma: no cover from taskiq.state import TaskiqState @@ -30,7 +32,17 @@ async def requeue(self) -> None: requeue_count = int(self.message.labels.get("X-Taskiq-requeue", 0)) requeue_count += 1 self.message.labels["X-Taskiq-requeue"] = str(requeue_count) - await self.broker.kick(self.broker.formatter.dumps(self.message)) + message = self.message + for middleware in self.broker.middlewares: + if middleware.__class__.pre_send != TaskiqMiddleware.pre_send: + message = await maybe_awaitable(middleware.pre_send(message)) + + await self.broker.kick(self.broker.formatter.dumps(message)) + + for middleware in reversed(self.broker.middlewares): + if middleware.__class__.post_send != TaskiqMiddleware.post_send: + await maybe_awaitable(middleware.post_send(message)) + raise NoResultError def reject(self) -> None: diff --git a/tests/test_requeue.py b/tests/test_requeue.py index c451b6df..38439b6d 100644 --- a/tests/test_requeue.py +++ b/tests/test_requeue.py @@ -1,4 +1,6 @@ from taskiq import Context, InMemoryBroker, TaskiqDepends +from taskiq.abc.middleware import TaskiqMiddleware +from taskiq.message import TaskiqMessage async def test_requeue() -> None: @@ -46,3 +48,37 @@ async def task(_: None = TaskiqDepends(dep_func)) -> None: ) assert runs_count == 2 + + +async def test_requeue_triggers_send_middlewares() -> None: + broker = InMemoryBroker() + runs_count = 0 + + class CountingMiddleware(TaskiqMiddleware): + def __init__(self) -> None: + super().__init__() + self.pre_send_calls = 0 + self.post_send_calls = 0 + + def pre_send(self, message: TaskiqMessage) -> TaskiqMessage: + self.pre_send_calls += 1 + return message + + def post_send(self, message: TaskiqMessage) -> None: + self.post_send_calls += 1 + + middleware = CountingMiddleware() + broker.add_middlewares(middleware) + + @broker.task + async def task(context: Context = TaskiqDepends()) -> None: + nonlocal runs_count + runs_count += 1 + if runs_count < 2: + await context.requeue() + + kicked = await task.kiq() + await kicked.wait_result() + + assert middleware.pre_send_calls == 2 + assert middleware.post_send_calls == 2 From c0c1963d162e609753a68297745499f52a42f3e3 Mon Sep 17 00:00:00 2001 From: onefeng Date: Tue, 26 May 2026 18:11:38 +0800 Subject: [PATCH 2/2] docs: use pnpm for local docs commands --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 3f9d57ea..9dba668d 100644 --- a/README.md +++ b/README.md @@ -119,15 +119,15 @@ pytest ## Docs -To run docs locally, you need to install [yarn](https://yarnpkg.com/getting-started/install). +To run docs locally, you need to install [pnpm](https://pnpm.io/installation). First, you need to install dependencies. ``` -yarn install +pnpm install ``` After that you can set up a docs server by running: ``` -yarn docs:dev +pnpm docs:dev ```