Skip to content

Dependency Injection

Rapidy uses the dishka library as its built-in Dependency Injection (DI) mechanism.

We aimed to choose a DI library aligned with the philosophy of Rapidy: simplicity, speed, transparency, and scalability. dishka perfectly fits these principles, offering developers a powerful tool without unnecessary complexity.

dishka is a modern and lightweight library for asynchronous dependency injection in Python applications. It focuses on high performance, minimalism, and configuration flexibility, making it ideal for next-generation web frameworks like Rapidy.

Its key advantages include:

  • Native asyncio support: full support for async/await, proper lifecycle management of dependencies.
  • Minimalist architecture: compact core, no magic, no excessive abstractions — dependency management is transparent and predictable.
  • Invisibility to business logic: injection via type annotations (FromDI).
  • Flexible scope management: App, Session, Request, etc.
  • Variety of providers: factories, objects, async functions.
  • Contextual injection: allows passing dynamic context — e.g., for the current user.
  • Framework integration: aiohttp, FastStream, Rapidy, and others.
  • Testing convenience: providers can be easily replaced in tests, no mocks or patches needed.

In Rapidy, dishka is available out-of-the-box — no additional setup required.

Examples of using dishka in Rapidy

Simple dependency injection example

Rapidy provides a FromDI wrapper (alias for FromDishka), which can be used to inject dependencies:

from rapidy import Rapidy
from rapidy.http import get
from rapidy.depends import FromDI, provide, Provider, Scope

class FooProvider(Provider):
    @provide(scope=Scope.REQUEST)
    async def c(self) -> int:
        return 1

@get('/')
async def handler(c: FromDI[int]) -> dict:
    return {"value": c}

app = Rapidy(
    http_route_handlers=[handler],
    di_providers=[FooProvider()],
)

What happens here:

  1. Provider definition The FooProvider class inherits from Provider and defines a dependency c of type int, created per request (Scope.REQUEST).

  2. Request handler registration The @get('/') decorator registers the handler function to handle GET requests at /.

  3. Injecting the dependency The argument c: FromDI[int] indicates that c should be obtained from the DI container at call time.

  4. Creating the application A Rapidy instance is created with the handler and the FooProvider.

SQLAlchemy session example

Open example
from typing import AsyncIterable

from pydantic_settings import SettingsConfigDict, BaseSettings
from rapidy import Rapidy, run_app
from rapidy.depends import FromDI, Provider, Scope, provide, from_context
from rapidy.web_response import Response
from sqlalchemy import make_url, DateTime, MetaData, Table, delete, select, update
from sqlalchemy.ext.asyncio import async_sessionmaker, AsyncSession, AsyncEngine, create_async_engine
from rapidy.http import controller, get, post, PathParam, Body, put, delete
from pydantic import BaseModel, ConfigDict
from datetime import datetime
from typing import Any, Callable
from uuid import UUID, uuid4

from sqlalchemy.ext.compiler import compiles
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy.sql import expression
from sqlalchemy.sql.schema import ColumnCollectionConstraint, Column


# --- App config ---
class PoolConfig(BaseModel):
    recycle_sec: int = 3600
    max_size: int = 10
    max_overflow_size: int = 10


class DBConfig(BaseModel):
    echo: bool = False
    pool: PoolConfig = PoolConfig()

    timeout: int = 30

    db_name: str
    user: str
    password: str
    host: str
    port: int = 5432

    @property
    def dsn(self) -> str:
        return f'postgresql+asyncpg://{self.user}:{self.password}@{self.host}:{self.port}/{self.db_name}'


class AppConfig(BaseSettings):
    host: str = "0.0.0.0"
    port: int = 8080

    db: DBConfig

    model_config = SettingsConfigDict(
        extra="ignore",
        env_file=".env",
        env_file_encoding="utf-8",
        env_nested_delimiter="__",
    )


# --- DB models ---

def _col_names_convertor(constraint: ColumnCollectionConstraint, table: Table) -> str:
    return "_".join([column.name for column in constraint.columns.values()])


convention: dict[str, str | Callable[[ColumnCollectionConstraint, Table], str]] = {
    "all_column_names": _col_names_convertor,
    "ix": "ix__%(table_name)s__%(all_column_names)s",
    "uq": "uq__%(table_name)s__%(all_column_names)s",
    "ck": "ck__%(table_name)s__%(constraint_name)s",
    "fk": "fk__%(table_name)s__%(all_column_names)s__%(referred_table_name)s",
    "pk": "pk__%(table_name)s",
}


class UtcNow(expression.FunctionElement[Any]):
    type = DateTime()
    inherit_cache = True


@compiles(UtcNow, "postgresql")
def pg_utcnow(element: Any, compiler: Any, **kw: Any) -> str:
    return "TIMEZONE('utc', CURRENT_TIMESTAMP)"


class BaseDBModel(DeclarativeBase):
    __tablename__: str

    id: Mapped[UUID] = mapped_column(primary_key=True, default=uuid4)

    create_date: Mapped[datetime] = Column(
        DateTime(timezone=True),
        nullable=False,
        server_default=UtcNow(),
    )
    update_date: Mapped[datetime] = Column(
        DateTime(timezone=True),
        nullable=False,
        server_default=UtcNow(),
        onupdate=UtcNow(),
    )

    metadata = MetaData(
        schema="data",
        naming_convention=convention,
    )


class Article(BaseDBModel):
    __tablename__ = "article"

    title: Mapped[str]
    text: Mapped[str]


# --- DI Providers ---

class ConfigProvider(Provider):
    scope = Scope.APP
    config = from_context(provides=AppConfig)

    @provide
    def get_db_config(self, config: AppConfig) -> DBConfig:
        return config.db


class DBProvider(Provider):
    scope = Scope.APP

    @provide
    async def get_engine(self, db_config: DBConfig) -> AsyncIterable[AsyncEngine]:
        engine = create_async_engine(
            url=make_url(db_config.dsn),
            echo=db_config.echo,
            pool_size=db_config.pool.max_size,
            pool_recycle=db_config.pool.recycle_sec,
            max_overflow=db_config.pool.max_overflow_size,
            execution_options={
                "asyncpg_timeout": db_config.timeout,
            },
        )
        try:
            yield engine
        finally:
            await engine.dispose(True)

    @provide
    def get_pool(self, engine: AsyncEngine) -> async_sessionmaker[AsyncSession]:
        return async_sessionmaker(bind=engine, autoflush=False)

    @provide(scope=Scope.REQUEST)
    async def get_session(self, pool: async_sessionmaker[AsyncSession]) -> AsyncIterable[AsyncSession]:
        async with pool() as session, session.begin():
            exc = yield session
            if exc is not None:
                await session.rollback()


# --- Api ---

class ArticleCreate(BaseModel):
    title: str
    text: str


class ArticleUpdate(BaseModel):
    title: str | None = None
    text: str | None = None


class ArticleResult(BaseModel):
    model_config = ConfigDict(from_attributes=True)

    id: UUID
    title: str
    text: str
    create_date: datetime
    update_date: datetime


@controller("/article")
class ArticleController:

    @get("/{article_id}")
    async def get_one(
        self,
        session: FromDI[AsyncSession],
        response: Response,
        article_id: UUID = PathParam(),
    ) -> ArticleResult | None:
        article = await session.get(Article, article_id)
        if article is None:
            response.set_status(404)
            return None

        return ArticleResult.model_validate(article)

    @get(response_type=list[ArticleResult])
    async def get_all(self, session: FromDI[AsyncSession]) -> list[ArticleResult]:
        result = await session.execute(select(Article))
        return [ArticleResult.model_validate(row) for row in result.scalars().all()]

    @post()
    async def create(self, session: FromDI[AsyncSession], data: ArticleCreate = Body()) -> ArticleResult:
        article = Article(**data.model_dump())
        session.add(article)

        await session.flush()
        await session.refresh(article)
        return ArticleResult.model_validate(article)

    @put("/{article_id}")
    async def put(
        self,
        session: FromDI[AsyncSession],
        article_id: UUID = PathParam(),
        data: ArticleUpdate = Body(),
    ) -> ArticleResult | None:
        stmt = (
            update(Article)
            .where(Article.id == article_id)
            .values(**data.model_dump(exclude_unset=True))
            .returning(Article)
        )
        result = await session.execute(stmt)
        updated_article = result.scalar_one_or_none()
        return ArticleResult.model_validate(updated_article) if updated_article else None

    @delete("/{article_id}")
    async def delete(
        self,
        session: FromDI[AsyncSession],
        article_id: UUID = PathParam(),
    ) -> None:
        await session.execute(delete(Article).where(Article.id == article_id))


def create_app() -> Rapidy:
    return Rapidy(
        http_route_handlers=[ArticleController],
        di_providers=[
            ConfigProvider(),
            DBProvider(),
        ],
        di_context={
            AppConfig: AppConfig(),
        },
    )

if __name__ == '__main__':
    app = create_app()
    run_app(app)

Injection methods

To inject dependencies, use rapidy.depends.FromDI (or dishka.FromDishka) and rapidy.depends.FromComponent (or dishka.FromComponent).

from rapidy import Rapidy
from rapidy.http import get
from rapidy.depends import FromDI, provide, Provider, Scope

class FooProvider(Provider):
    @provide(scope=Scope.REQUEST)
    async def c(self) -> int:
        return 1

@get('/')
async def handler(c: FromDI[int]) -> dict:
    return {"value": c}

app = Rapidy(
    http_route_handlers=[handler],
    di_providers=[FooProvider()],
)
from typing import Annotated

from rapidy import Rapidy
from rapidy.http import get
from rapidy.depends import FromComponent, provide, Provider, Scope

class FooProvider(Provider):
    @provide(scope=Scope.REQUEST)
    async def c(self) -> int:
        return 1

@get('/')
async def handler(c: Annotated[int, FromComponent()]) -> dict:
    return {"value": c}

app = Rapidy(
    http_route_handlers=[handler],
    di_providers=[FooProvider()],
)

Features

Accessing the container

You can access the current async container via the root Rapidy application. In child apps, it will be None.

from rapidy import Rapidy

root_app = Rapidy()
v1_app = Rapidy()
root_app.add_subapp('/v1', v1_app)

root_app.di_container  # AsyncContainer
v1_app.di_container  # None

Handlers and middleware

dishka is fully integrated with Rapidy, supporting auto-injection in all handler types:

# providers.py
from rapidy.depends import provide, Provider, Scope

class FooProvider(Provider):
    @provide(scope=Scope.REQUEST)
    async def c(self) -> int:
        return 1

Controllers:

from rapidy import Rapidy
from rapidy.http import controller, get
from rapidy.depends import FromDI
from .providers import FooProvider

@controller('/')
class MyController:
    @get()
    async def handler(self, c: FromDI[int]) -> dict:
        return {"value": c}

app = Rapidy(http_route_handlers=[MyController], di_providers=[FooProvider()])

View classes:

from rapidy import Rapidy
from rapidy.web import View
from rapidy.depends import FromDI
from .providers import FooProvider

class FooView(View):
    async def get(self, c: FromDI[int]) -> dict:
        return {"value": c}

app = Rapidy(di_providers=[FooProvider()])
app.router.add_view('/', FooView)

Middleware:

from rapidy import Rapidy
from rapidy.http import Request, StreamResponse, get, middleware
from rapidy.typedefs import CallNext
from rapidy.depends import FromDI
from .providers import FooProvider

@middleware
async def some_middleware(
    request: Request,
    call_next: CallNext,
    c: FromDI[int],
) -> StreamResponse:
    print({"value": c})
    return await call_next(request)

@get('/')
async def handler(c: FromDI[int]) -> dict:
    return {"value": c}

app = Rapidy(
    middlewares=[some_middleware],
    http_route_handlers=[handler],
    di_providers=[FooProvider()],
)

Additional notes

  • If the first handler argument lacks an annotation, Rapidy skips it and continues injection.
  • All dishka features are supported: factories, nested providers, lifecycle control, and explicit dependency retrieval.

External container

You can manually pass your own AsyncContainer — useful if the container is pre-created and reused across contexts.

from rapidy import Rapidy
from rapidy.depends import make_async_container  # or: from dishka import make_async_container
from .providers import FooProvider

container = make_async_container(FooProvider())

async def shutdown_di_container() -> None:
    await container.close()

app = Rapidy(
   di_container=container,
   http_route_handlers=[...],
   on_shutdown=[shutdown_di_container],  # manual shutdown
)

In this case:

  • Rapidy won’t create a container.
  • All di_* parameters are ignored.
  • You must manually close the container.

dishka limitations

dishka only works with HTTP handlers and middleware. If you use a container in both faststream and rapidy, you’ll need to use @inject explicitly in faststream.

Rapidy (Application) DI attributes

di_container

External dependency container.

di_container: AsyncContainer | None = None

If di_container is provided, a new container won’t be created.

Dishka docs — container.


di_providers

List of providers to register.

di_providers: Sequence[BaseProvider] = ()

Ignored if di_container is provided.

Dishka docs — providers.


di_scopes

Scope class.

di_scopes: type[BaseScope] = Scope

Dishka docs — scopes.


di_context

Additional context for providers.

di_context: dict[Any, Any] | None = None

Dishka docs — context.


di_lock_factory

Factory for container locks.

di_lock_factory: Callable[[], contextlib.AbstractAsyncContextManager[Any]] | None = Lock

Dishka docs — lock_factory.

import threading

container = make_container(provider, lock_factory=threading.Lock):
with container(lock_factory=threading.Lock) as nested_container:
    ...

import asyncio

container = make_async_container(provider, lock_factory=asyncio.Lock)
async with container(lock_factory=asyncio.Lock) as nested_container:
    ...

di_skip_validation

Flag to disable provider type checks.

di_skip_validation: bool = False

Dishka docs — skip_validation.


di_start_scope

Initial container scope.

di_start_scope: BaseScope | None = None

Dishka docs — start_scope.


di_validation_settings

Container validation settings.

di_validation_settings: ValidationSettings = DEFAULT_VALIDATION

Dishka docs — alias.

Dishka docs — from_context.

Dishka docs — provide.