Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,15 @@ pytest

## Docs

To run docs locally, you need to install [yarn](https://yarnpkg.com/getting-started/install).
To run docs locally, you need to install [pnpm](https://pnpm.io/installation).

First, you need to install dependencies.
```
yarn install
pnpm install
```

After that you can set up a docs server by running:

```
yarn docs:dev
pnpm docs:dev
```
14 changes: 13 additions & 1 deletion taskiq/context.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from typing import TYPE_CHECKING

from taskiq.abc.broker import AsyncBroker
from taskiq.abc.middleware import TaskiqMiddleware
from taskiq.exceptions import NoResultError, TaskRejectedError
from taskiq.message import TaskiqMessage
from taskiq.utils import maybe_awaitable

if TYPE_CHECKING: # pragma: no cover
from taskiq.state import TaskiqState
Expand Down Expand Up @@ -30,7 +32,17 @@ async def requeue(self) -> None:
requeue_count = int(self.message.labels.get("X-Taskiq-requeue", 0))
requeue_count += 1
self.message.labels["X-Taskiq-requeue"] = str(requeue_count)
await self.broker.kick(self.broker.formatter.dumps(self.message))
message = self.message
for middleware in self.broker.middlewares:
if middleware.__class__.pre_send != TaskiqMiddleware.pre_send:
message = await maybe_awaitable(middleware.pre_send(message))

await self.broker.kick(self.broker.formatter.dumps(message))

for middleware in reversed(self.broker.middlewares):
if middleware.__class__.post_send != TaskiqMiddleware.post_send:
await maybe_awaitable(middleware.post_send(message))

raise NoResultError

def reject(self) -> None:
Expand Down
36 changes: 36 additions & 0 deletions tests/test_requeue.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from taskiq import Context, InMemoryBroker, TaskiqDepends
from taskiq.abc.middleware import TaskiqMiddleware
from taskiq.message import TaskiqMessage


async def test_requeue() -> None:
Expand Down Expand Up @@ -46,3 +48,37 @@ async def task(_: None = TaskiqDepends(dep_func)) -> None:
)

assert runs_count == 2


async def test_requeue_triggers_send_middlewares() -> None:
broker = InMemoryBroker()
runs_count = 0

class CountingMiddleware(TaskiqMiddleware):
def __init__(self) -> None:
super().__init__()
self.pre_send_calls = 0
self.post_send_calls = 0

def pre_send(self, message: TaskiqMessage) -> TaskiqMessage:
self.pre_send_calls += 1
return message

def post_send(self, message: TaskiqMessage) -> None:
self.post_send_calls += 1

middleware = CountingMiddleware()
broker.add_middlewares(middleware)

@broker.task
async def task(context: Context = TaskiqDepends()) -> None:
nonlocal runs_count
runs_count += 1
if runs_count < 2:
await context.requeue()

kicked = await task.kiq()
await kicked.wait_result()

assert middleware.pre_send_calls == 2
assert middleware.post_send_calls == 2