Skip to content

Asyncio

Petisco has several classes available to help you take advantage of the asyncio library. For example, the AsyncController class provides a way to handle requests asynchronously, while the AsyncUseCase class offers an async interface for implementing your business logic. These classes can be used to write highly performant and efficient code that takes full advantage of the capabilities of the asyncio library.

AsyncAppService

Inherit from AsyncAppService and don't forget to add async prefix before execution method.

from petisco import AsyncAppService
from meiga import BoolResult, isSuccess

class MyAsyncAppService(AsyncAppService):
    async def execute(self, my_input: str) -> BoolResult:
        return isSuccess

To call this app service, its necessary to await for it and call it from async function-

result = await MyAsyncAppService().execute

AsyncUseCase

Inherit from AsyncUseCase and don't forget to add async prefix before execution method.

from petisco import AsyncUseCase
from meiga import BoolResult

 class MyUseCase(AsyncUseCase):

    def __init__(self, app_service: MyAsyncAppService) -> None:
        self.app_service = app_service

    async def execute(self) -> BoolResult:
        return await self.app_service.execute()

AsyncController

from petisco import AsyncController
from meiga import Result, Success, Error
import random

class MyController(AsyncController): # (1)
    async def execute(self) -> Result[bool, Error]:
        return Success(random.choice([True, False]))
  1. Inherit from petisco AsyncController class

FastAPI ⚡️

Use AsyncFastAPIController instead of using FastAPIController.

from meiga import BoolResult
from petisco import Container, DomainEventBus
from petisco.extra.fastapi import AsyncFastAPIController

from app.src.task.create.application.task_creator import TaskCreator
from app.src.task.shared.domain.task import Task


class CreateTaskController(AsyncFastAPIController):
    async def execute(self, task: Task) -> BoolResult:
        task_creator = TaskCreator(
            labeler=Container.get(TaskLabeler),
            repository=Container.get(TaskRepository),
            domain_event_bus=Container.get(DomainEventBus),
        )
        return await task_creator.execute(task=task)

Then, we have to instantiate and execute the controller object in the FastAPI routers.

from uuid import UUID

from fastapi import APIRouter
from petisco.extra.fastapi import as_fastapi
from petisco import Uuid

from app.api.models import TaskIn, TaskOut
from app.src.task.create.application.create_task_controller import CreateTaskController

router = APIRouter(tags=["Tasks"])

@router.post("/task")
async def create_task(task: TaskIn):
    result = await CreateTaskController().execute(task.to_task())
    return as_fastapi(result)