Перейти к содержанию

Dependency Injection

Rapidy использует библиотеку dishka в качестве встроенного механизма внедрения зависимостей (Dependency Injection, DI).

Мы стремились выбрать DI-библиотеку, соответствующую философии Rapidy: простота, скорость, прозрачность и масштабируемость. dishka идеально вписалась в эти принципы, предоставляя разработчикам мощный инструмент без лишней сложности.

dishka — современная и лёгкая библиотека для асинхронного внедрения зависимостей в Python-приложениях. Она ориентирована на высокую производительность, минимализм и гибкость настройки, что делает её идеальной для использования в веб-фреймворках нового поколения, таких как Rapidy.

Её ключевые преимущества:

  • Нативная поддержка asyncio: полная поддержка async/await, корректное управление жизненным циклом зависимостей.
  • Минималистичная архитектура: компактное ядро, без магии и избыточных абстракций — управление зависимостями прозрачно и предсказуемо.
  • Незаметность для бизнес-логики: внедрение через аннотации типов (FromDI).
  • Гибкое управление областями видимости (scopes): App, Session, Request и др.
  • Разнообразие провайдеров: фабрики, объекты, асинхронные функции.
  • Контекстное внедрение: возможность передавать динамический контекст — например, для текущего пользователя.
  • Интеграция с фреймворками: aiohttp, FastStream, Rapidy и другие.
  • Удобство тестирования: провайдеры легко заменяются в тестах, без сторонних моков и патчей.

В Rapidy dishka доступна «из коробки» — дополнительная настройка не требуется.

Примеры использования dishka в Rapidy

Простой пример внедрения зависимости

В Rapidy доступна обёртка FromDI (alias для FromDishka), которую можно использовать для инжекта зависимостей:

from rapidy import Rapidy
from rapidy.http import get
from rapidy.depends import FromDI, provide, Provider, Scope

class FooProvider(Provider):
    @provide(scope=Scope.REQUEST)
    async def c(self) -> int:
        return 1

@get('/')
async def handler(c: FromDI[int]) -> dict:
    return {"value": c}

app = Rapidy(
    http_route_handlers=[handler],
    di_providers=[FooProvider()],
)

Что здесь происходит:

  1. Определение провайдера Класс FooProvider наследуется от Provider и определяет зависимость c типа int, создаваемую на каждый запрос (Scope.REQUEST).

  2. Регистрация обработчика запроса Декоратор @get('/') регистрирует функцию handler как обработчик GET-запросов по пути /.

  3. Внедрение зависимости в обработчик Аргумент c: FromDI[int] сообщает, что c должен быть получен из DI-контейнера на момент вызова.

  4. Создание приложения Экземпляр Rapidy создаётся с обработчиком handler и провайдером FooProvider.

Пример с SQLAlchemy-сессией

Откройте пример
from typing import AsyncIterable

from pydantic_settings import SettingsConfigDict, BaseSettings
from rapidy import Rapidy, run_app
from rapidy.depends import FromDI, Provider, Scope, provide, from_context
from rapidy.web_response import Response
from sqlalchemy import make_url, DateTime, MetaData, Table, delete, select, update
from sqlalchemy.ext.asyncio import async_sessionmaker, AsyncSession, AsyncEngine, create_async_engine
from rapidy.http import controller, get, post, PathParam, Body, put, delete
from pydantic import BaseModel, ConfigDict
from datetime import datetime
from typing import Any, Callable
from uuid import UUID, uuid4

from sqlalchemy.ext.compiler import compiles
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy.sql import expression
from sqlalchemy.sql.schema import ColumnCollectionConstraint, Column


# --- App config ---
class PoolConfig(BaseModel):
    recycle_sec: int = 3600
    max_size: int = 10
    max_overflow_size: int = 10


class DBConfig(BaseModel):
    echo: bool = False
    pool: PoolConfig = PoolConfig()

    timeout: int = 30

    db_name: str
    user: str
    password: str
    host: str
    port: int = 5432

    @property
    def dsn(self) -> str:
        return f'postgresql+asyncpg://{self.user}:{self.password}@{self.host}:{self.port}/{self.db_name}'


class AppConfig(BaseSettings):
    host: str = "0.0.0.0"
    port: int = 8080

    db: DBConfig

    model_config = SettingsConfigDict(
        extra="ignore",
        env_file=".env",
        env_file_encoding="utf-8",
        env_nested_delimiter="__",
    )


# --- DB models ---

def _col_names_convertor(constraint: ColumnCollectionConstraint, table: Table) -> str:
    return "_".join([column.name for column in constraint.columns.values()])


convention: dict[str, str | Callable[[ColumnCollectionConstraint, Table], str]] = {
    "all_column_names": _col_names_convertor,
    "ix": "ix__%(table_name)s__%(all_column_names)s",
    "uq": "uq__%(table_name)s__%(all_column_names)s",
    "ck": "ck__%(table_name)s__%(constraint_name)s",
    "fk": "fk__%(table_name)s__%(all_column_names)s__%(referred_table_name)s",
    "pk": "pk__%(table_name)s",
}


class UtcNow(expression.FunctionElement[Any]):
    type = DateTime()
    inherit_cache = True


@compiles(UtcNow, "postgresql")
def pg_utcnow(element: Any, compiler: Any, **kw: Any) -> str:
    return "TIMEZONE('utc', CURRENT_TIMESTAMP)"


class BaseDBModel(DeclarativeBase):
    __tablename__: str

    id: Mapped[UUID] = mapped_column(primary_key=True, default=uuid4)

    create_date: Mapped[datetime] = Column(
        DateTime(timezone=True),
        nullable=False,
        server_default=UtcNow(),
    )
    update_date: Mapped[datetime] = Column(
        DateTime(timezone=True),
        nullable=False,
        server_default=UtcNow(),
        onupdate=UtcNow(),
    )

    metadata = MetaData(
        schema="data",
        naming_convention=convention,
    )


class Article(BaseDBModel):
    __tablename__ = "article"

    title: Mapped[str]
    text: Mapped[str]


# --- DI Providers ---

class ConfigProvider(Provider):
    scope = Scope.APP
    config = from_context(provides=AppConfig)

    @provide
    def get_db_config(self, config: AppConfig) -> DBConfig:
        return config.db


class DBProvider(Provider):
    scope = Scope.APP

    @provide
    async def get_engine(self, db_config: DBConfig) -> AsyncIterable[AsyncEngine]:
        engine = create_async_engine(
            url=make_url(db_config.dsn),
            echo=db_config.echo,
            pool_size=db_config.pool.max_size,
            pool_recycle=db_config.pool.recycle_sec,
            max_overflow=db_config.pool.max_overflow_size,
            execution_options={
                "asyncpg_timeout": db_config.timeout,
            },
        )
        try:
            yield engine
        finally:
            await engine.dispose(True)

    @provide
    def get_pool(self, engine: AsyncEngine) -> async_sessionmaker[AsyncSession]:
        return async_sessionmaker(bind=engine, autoflush=False)

    @provide(scope=Scope.REQUEST)
    async def get_session(self, pool: async_sessionmaker[AsyncSession]) -> AsyncIterable[AsyncSession]:
        async with pool() as session, session.begin():
            exc = yield session
            if exc is not None:
                await session.rollback()


# --- Api ---

class ArticleCreate(BaseModel):
    title: str
    text: str


class ArticleUpdate(BaseModel):
    title: str | None = None
    text: str | None = None


class ArticleResult(BaseModel):
    model_config = ConfigDict(from_attributes=True)

    id: UUID
    title: str
    text: str
    create_date: datetime
    update_date: datetime


@controller("/article")
class ArticleController:

    @get("/{article_id}")
    async def get_one(
        self,
        session: FromDI[AsyncSession],
        response: Response,
        article_id: UUID = PathParam(),
    ) -> ArticleResult | None:
        article = await session.get(Article, article_id)
        if article is None:
            response.set_status(404)
            return None

        return ArticleResult.model_validate(article)

    @get(response_type=list[ArticleResult])
    async def get_all(self, session: FromDI[AsyncSession]) -> list[ArticleResult]:
        result = await session.execute(select(Article))
        return [ArticleResult.model_validate(row) for row in result.scalars().all()]

    @post()
    async def create(self, session: FromDI[AsyncSession], data: ArticleCreate = Body()) -> ArticleResult:
        article = Article(**data.model_dump())
        session.add(article)

        await session.flush()
        await session.refresh(article)
        return ArticleResult.model_validate(article)

    @put("/{article_id}")
    async def put(
        self,
        session: FromDI[AsyncSession],
        article_id: UUID = PathParam(),
        data: ArticleUpdate = Body(),
    ) -> ArticleResult | None:
        stmt = (
            update(Article)
            .where(Article.id == article_id)
            .values(**data.model_dump(exclude_unset=True))
            .returning(Article)
        )
        result = await session.execute(stmt)
        updated_article = result.scalar_one_or_none()
        return ArticleResult.model_validate(updated_article) if updated_article else None

    @delete("/{article_id}")
    async def delete(
        self,
        session: FromDI[AsyncSession],
        article_id: UUID = PathParam(),
    ) -> None:
        await session.execute(delete(Article).where(Article.id == article_id))


def create_app() -> Rapidy:
    return Rapidy(
        http_route_handlers=[ArticleController],
        di_providers=[
            ConfigProvider(),
            DBProvider(),
        ],
        di_context={
            AppConfig: AppConfig(),
        },
    )

if __name__ == '__main__':
    app = create_app()
    run_app(app)

Способы инжекта

Для внедрения зависимостей используйте rapidy.depends.FromDI (или dishka.FromDishka) и rapidy.depends.FromComponent (или dishka.FromComponent).

from rapidy import Rapidy
from rapidy.http import get
from rapidy.depends import FromDI, provide, Provider, Scope

class FooProvider(Provider):
    @provide(scope=Scope.REQUEST)
    async def c(self) -> int:
        return 1

@get('/')
async def handler(c: FromDI[int]) -> dict:
    return {"value": c}

app = Rapidy(
    http_route_handlers=[handler],
    di_providers=[FooProvider()],
)
from typing import Annotated

from rapidy import Rapidy
from rapidy.http import get
from rapidy.depends import FromComponent, provide, Provider, Scope

class FooProvider(Provider):
    @provide(scope=Scope.REQUEST)
    async def c(self) -> int:
        return 1

@get('/')
async def handler(c: Annotated[int, FromComponent()]) -> dict:
    return {"value": c}

app = Rapidy(
    http_route_handlers=[handler],
    di_providers=[FooProvider()],
)

Особенности

Доступ к контейнеру

Получить текущий асинхронный контейнер можно через корневое приложение Rapidy. Из дочернего приложения он будет равен None.

from rapidy import Rapidy

root_app = Rapidy()
v1_app = Rapidy()
root_app.add_subapp('/v1', v1_app)

root_app.di_container  # AsyncContainer
v1_app.di_container  # None

Обработчики и middleware

dishka полностью интегрирована с Rapidy, и поддерживает автоинжект во всех видах обработчиков:

# providers.py
from rapidy.depends import provide, Provider, Scope

class FooProvider(Provider):
    @provide(scope=Scope.REQUEST)
    async def c(self) -> int:
        return 1

Контроллеры:

from rapidy import Rapidy
from rapidy.http import controller, get
from rapidy.depends import FromDI
from .providers import FooProvider

@controller('/')
class MyController:
    @get()
    async def handler(self, c: FromDI[int]) -> dict:
        return {"value": c}

app = Rapidy(http_route_handlers=[MyController], di_providers=[FooProvider()])

View-классы:

from rapidy import Rapidy
from rapidy.web import View
from rapidy.depends import FromDI
from .providers import FooProvider

class FooView(View):
    async def get(self, c: FromDI[int]) -> dict:
        return {"value": c}

app = Rapidy(di_providers=[FooProvider()])
app.router.add_view('/', FooView)

Middleware:

from rapidy import Rapidy
from rapidy.http import Request, StreamResponse, get, middleware
from rapidy.typedefs import CallNext
from rapidy.depends import FromDI
from .providers import FooProvider

@middleware
async def some_middleware(
    request: Request,
    call_next: CallNext,
    c: FromDI[int],
) -> StreamResponse:
    print({"value": c})
    return await call_next(request)

@get('/')
async def handler(c: FromDI[int]) -> dict:
    return {"value": c}

app = Rapidy(
    middlewares=[some_middleware],
    http_route_handlers=[handler],
    di_providers=[FooProvider()],
)

Дополнительно

  • Если у первого аргумента обработчика нет аннотации, Rapidy пропустит его и продолжит инжект.
  • Поддерживаются все возможности dishka: фабрики, вложенные провайдеры, контроль жизненного цикла и явное получение зависимостей.

Внешний контейнер

Вы можете вручную передать свой AsyncContainer — например, если контейнер создаётся заранее и используется в разных местах.

from rapidy import Rapidy
from rapidy.depends import make_async_container  # or: from dishka import make_async_container
from .providers import FooProvider

container = make_async_container(FooProvider())

async def shutdown_di_container() -> None:
    await container.close()

app = Rapidy(
   di_container=container,
   http_route_handlers=[...],
   on_shutdown=[shutdown_di_container],  # manual shutdown
)

В этом случае:

  • Rapidy не создаёт контейнер.
  • Все di_* параметры игнорируются.
  • Контейнер необходимо закрывать вручную.

Ограничения dishka

dishka работает только с HTTP-обработчиками и middleware. Если вы используете контейнер и в faststream, и в rapidy, в faststream потребуется явное использование @inject.

Атрибуты Rapidy (Application) для управления DI

di_container

Внешний контейнер зависимостей.

di_container: AsyncContainer | None = None

Если передан di_container, новый контейнер создан не будет.

Документация Dishka — container.


di_providers

Список провайдеров для регистрации.

di_providers: Sequence[BaseProvider] = ()

Игнорируется, если передан di_container.

Документация Dishka — providers.


di_scopes

Класс области видимости (scope).

di_scopes: type[BaseScope] = Scope

Документация Dishka — scopes.


di_context

Дополнительный контекст для провайдеров.

di_context: dict[Any, Any] | None = None

Документация Dishka — context.


di_lock_factory

Фабрика для блокировок контейнера.

di_lock_factory: Callable[[], contextlib.AbstractAsyncContextManager[Any]] | None = Lock

Документация Dishka — lock_factory.

import threading

container = make_container(provider, lock_factory=threading.Lock):
with container(lock_factory=threading.Lock) as nested_container:
    ...

import asyncio

container = make_async_container(provider, lock_factory=asyncio.Lock)
async with container(lock_factory=asyncio.Lock) as nested_container:
    ...

di_skip_validation

Флаг, отключающий проверку типов провайдеров.

di_skip_validation: bool = False

Документация Dishka — skip_validation.


di_start_scope

Начальный scope контейнера.

di_start_scope: BaseScope | None = None

Документация Dishka — start_scope.


di_validation_settings

Настройки валидации контейнера.

di_validation_settings: ValidationSettings = DEFAULT_VALIDATION

Документация Dishka — alias.

Документация Dishka — from_context.

Документация Dishka — provide.