initial commit

This commit is contained in:
behruz-dev
2025-09-01 19:12:16 +05:00
commit ca7061bfe7
2880 changed files with 370683 additions and 0 deletions

View File

@@ -0,0 +1,41 @@
import asyncio as _asyncio
from contextlib import suppress
from aiogram.dispatcher.flags import FlagGenerator
from . import enums, methods, types
from .__meta__ import __api_version__, __version__
from .client import session
from .client.bot import Bot
from .dispatcher.dispatcher import Dispatcher
from .dispatcher.middlewares.base import BaseMiddleware
from .dispatcher.router import Router
from .utils.magic_filter import MagicFilter
from .utils.text_decorations import html_decoration as html
from .utils.text_decorations import markdown_decoration as md
with suppress(ImportError):
import uvloop as _uvloop
_asyncio.set_event_loop_policy(_uvloop.EventLoopPolicy())
F = MagicFilter()
flags = FlagGenerator()
__all__ = (
"__api_version__",
"__version__",
"types",
"methods",
"enums",
"Bot",
"session",
"Dispatcher",
"Router",
"BaseMiddleware",
"F",
"html",
"md",
"flags",
)

View File

@@ -0,0 +1,2 @@
__version__ = "3.22.0"
__api_version__ = "9.2"

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,33 @@
from typing import TYPE_CHECKING, Any, Optional
from pydantic import BaseModel, PrivateAttr
from typing_extensions import Self
if TYPE_CHECKING:
from aiogram.client.bot import Bot
class BotContextController(BaseModel):
_bot: Optional["Bot"] = PrivateAttr()
def model_post_init(self, __context: Any) -> None:
self._bot = __context.get("bot") if __context else None
def as_(self, bot: Optional["Bot"]) -> Self:
"""
Bind object to a bot instance.
:param bot: Bot instance
:return: self
"""
self._bot = bot
return self
@property
def bot(self) -> Optional["Bot"]:
"""
Get bot instance.
:return: Bot instance
"""
return self._bot

View File

@@ -0,0 +1,80 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Optional
from aiogram.utils.dataclass import dataclass_kwargs
if TYPE_CHECKING:
from aiogram.types import LinkPreviewOptions
# @dataclass ??
class Default:
# Is not a dataclass because of JSON serialization.
__slots__ = ("_name",)
def __init__(self, name: str) -> None:
self._name = name
@property
def name(self) -> str:
return self._name
def __str__(self) -> str:
return f"Default({self._name!r})"
def __repr__(self) -> str:
return f"<{self}>"
@dataclass(**dataclass_kwargs(slots=True, kw_only=True))
class DefaultBotProperties:
"""
Default bot properties.
"""
parse_mode: Optional[str] = None
"""Default parse mode for messages."""
disable_notification: Optional[bool] = None
"""Sends the message silently. Users will receive a notification with no sound."""
protect_content: Optional[bool] = None
"""Protects content from copying."""
allow_sending_without_reply: Optional[bool] = None
"""Allows to send messages without reply."""
link_preview: Optional[LinkPreviewOptions] = None
"""Link preview settings."""
link_preview_is_disabled: Optional[bool] = None
"""Disables link preview."""
link_preview_prefer_small_media: Optional[bool] = None
"""Prefer small media in link preview."""
link_preview_prefer_large_media: Optional[bool] = None
"""Prefer large media in link preview."""
link_preview_show_above_text: Optional[bool] = None
"""Show link preview above text."""
show_caption_above_media: Optional[bool] = None
"""Show caption above media."""
def __post_init__(self) -> None:
has_any_link_preview_option = any(
(
self.link_preview_is_disabled,
self.link_preview_prefer_small_media,
self.link_preview_prefer_large_media,
self.link_preview_show_above_text,
)
)
if has_any_link_preview_option and self.link_preview is None:
from ..types import LinkPreviewOptions
self.link_preview = LinkPreviewOptions(
is_disabled=self.link_preview_is_disabled,
prefer_small_media=self.link_preview_prefer_small_media,
prefer_large_media=self.link_preview_prefer_large_media,
show_above_text=self.link_preview_show_above_text,
)
def __getitem__(self, item: str) -> Any:
return getattr(self, item, None)

View File

@@ -0,0 +1,211 @@
from __future__ import annotations
import asyncio
import ssl
from typing import (
TYPE_CHECKING,
Any,
AsyncGenerator,
Dict,
Iterable,
List,
Optional,
Tuple,
Type,
Union,
cast,
)
import certifi
from aiohttp import BasicAuth, ClientError, ClientSession, FormData, TCPConnector
from aiohttp.hdrs import USER_AGENT
from aiohttp.http import SERVER_SOFTWARE
from aiogram.__meta__ import __version__
from aiogram.methods import TelegramMethod
from ...exceptions import TelegramNetworkError
from ...methods.base import TelegramType
from ...types import InputFile
from .base import BaseSession
if TYPE_CHECKING:
from ..bot import Bot
_ProxyBasic = Union[str, Tuple[str, BasicAuth]]
_ProxyChain = Iterable[_ProxyBasic]
_ProxyType = Union[_ProxyChain, _ProxyBasic]
def _retrieve_basic(basic: _ProxyBasic) -> Dict[str, Any]:
from aiohttp_socks.utils import parse_proxy_url
proxy_auth: Optional[BasicAuth] = None
if isinstance(basic, str):
proxy_url = basic
else:
proxy_url, proxy_auth = basic
proxy_type, host, port, username, password = parse_proxy_url(proxy_url)
if isinstance(proxy_auth, BasicAuth):
username = proxy_auth.login
password = proxy_auth.password
return {
"proxy_type": proxy_type,
"host": host,
"port": port,
"username": username,
"password": password,
"rdns": True,
}
def _prepare_connector(chain_or_plain: _ProxyType) -> Tuple[Type["TCPConnector"], Dict[str, Any]]:
from aiohttp_socks import ChainProxyConnector, ProxyConnector, ProxyInfo
# since tuple is Iterable(compatible with _ProxyChain) object, we assume that
# user wants chained proxies if tuple is a pair of string(url) and BasicAuth
if isinstance(chain_or_plain, str) or (
isinstance(chain_or_plain, tuple) and len(chain_or_plain) == 2
):
chain_or_plain = cast(_ProxyBasic, chain_or_plain)
return ProxyConnector, _retrieve_basic(chain_or_plain)
chain_or_plain = cast(_ProxyChain, chain_or_plain)
infos: List[ProxyInfo] = []
for basic in chain_or_plain:
infos.append(ProxyInfo(**_retrieve_basic(basic)))
return ChainProxyConnector, {"proxy_infos": infos}
class AiohttpSession(BaseSession):
def __init__(
self, proxy: Optional[_ProxyType] = None, limit: int = 100, **kwargs: Any
) -> None:
"""
Client session based on aiohttp.
:param proxy: The proxy to be used for requests. Default is None.
:param limit: The total number of simultaneous connections. Default is 100.
:param kwargs: Additional keyword arguments.
"""
super().__init__(**kwargs)
self._session: Optional[ClientSession] = None
self._connector_type: Type[TCPConnector] = TCPConnector
self._connector_init: Dict[str, Any] = {
"ssl": ssl.create_default_context(cafile=certifi.where()),
"limit": limit,
"ttl_dns_cache": 3600, # Workaround for https://github.com/aiogram/aiogram/issues/1500
}
self._should_reset_connector = True # flag determines connector state
self._proxy: Optional[_ProxyType] = None
if proxy is not None:
try:
self._setup_proxy_connector(proxy)
except ImportError as exc: # pragma: no cover
raise RuntimeError(
"In order to use aiohttp client for proxy requests, install "
"https://pypi.org/project/aiohttp-socks/"
) from exc
def _setup_proxy_connector(self, proxy: _ProxyType) -> None:
self._connector_type, self._connector_init = _prepare_connector(proxy)
self._proxy = proxy
@property
def proxy(self) -> Optional[_ProxyType]:
return self._proxy
@proxy.setter
def proxy(self, proxy: _ProxyType) -> None:
self._setup_proxy_connector(proxy)
self._should_reset_connector = True
async def create_session(self) -> ClientSession:
if self._should_reset_connector:
await self.close()
if self._session is None or self._session.closed:
self._session = ClientSession(
connector=self._connector_type(**self._connector_init),
headers={
USER_AGENT: f"{SERVER_SOFTWARE} aiogram/{__version__}",
},
)
self._should_reset_connector = False
return self._session
async def close(self) -> None:
if self._session is not None and not self._session.closed:
await self._session.close()
# Wait 250 ms for the underlying SSL connections to close
# https://docs.aiohttp.org/en/stable/client_advanced.html#graceful-shutdown
await asyncio.sleep(0.25)
def build_form_data(self, bot: Bot, method: TelegramMethod[TelegramType]) -> FormData:
form = FormData(quote_fields=False)
files: Dict[str, InputFile] = {}
for key, value in method.model_dump(warnings=False).items():
value = self.prepare_value(value, bot=bot, files=files)
if not value:
continue
form.add_field(key, value)
for key, value in files.items():
form.add_field(
key,
value.read(bot),
filename=value.filename or key,
)
return form
async def make_request(
self, bot: Bot, method: TelegramMethod[TelegramType], timeout: Optional[int] = None
) -> TelegramType:
session = await self.create_session()
url = self.api.api_url(token=bot.token, method=method.__api_method__)
form = self.build_form_data(bot=bot, method=method)
try:
async with session.post(
url, data=form, timeout=self.timeout if timeout is None else timeout
) as resp:
raw_result = await resp.text()
except asyncio.TimeoutError:
raise TelegramNetworkError(method=method, message="Request timeout error")
except ClientError as e:
raise TelegramNetworkError(method=method, message=f"{type(e).__name__}: {e}")
response = self.check_response(
bot=bot, method=method, status_code=resp.status, content=raw_result
)
return cast(TelegramType, response.result)
async def stream_content(
self,
url: str,
headers: Optional[Dict[str, Any]] = None,
timeout: int = 30,
chunk_size: int = 65536,
raise_for_status: bool = True,
) -> AsyncGenerator[bytes, None]:
if headers is None:
headers = {}
session = await self.create_session()
async with session.get(
url, timeout=timeout, headers=headers, raise_for_status=raise_for_status
) as resp:
async for chunk in resp.content.iter_chunked(chunk_size):
yield chunk
async def __aenter__(self) -> AiohttpSession:
await self.create_session()
return self

View File

@@ -0,0 +1,265 @@
from __future__ import annotations
import abc
import datetime
import json
import secrets
from enum import Enum
from http import HTTPStatus
from types import TracebackType
from typing import (
TYPE_CHECKING,
Any,
AsyncGenerator,
Callable,
Dict,
Final,
Optional,
Type,
cast,
)
from pydantic import ValidationError
from aiogram.exceptions import (
ClientDecodeError,
RestartingTelegram,
TelegramAPIError,
TelegramBadRequest,
TelegramConflictError,
TelegramEntityTooLarge,
TelegramForbiddenError,
TelegramMigrateToChat,
TelegramNotFound,
TelegramRetryAfter,
TelegramServerError,
TelegramUnauthorizedError,
)
from ...methods import Response, TelegramMethod
from ...methods.base import TelegramType
from ...types import InputFile, TelegramObject
from ..default import Default
from ..telegram import PRODUCTION, TelegramAPIServer
from .middlewares.manager import RequestMiddlewareManager
if TYPE_CHECKING:
from ..bot import Bot
_JsonLoads = Callable[..., Any]
_JsonDumps = Callable[..., str]
DEFAULT_TIMEOUT: Final[float] = 60.0
class BaseSession(abc.ABC):
"""
This is base class for all HTTP sessions in aiogram.
If you want to create your own session, you must inherit from this class.
"""
def __init__(
self,
api: TelegramAPIServer = PRODUCTION,
json_loads: _JsonLoads = json.loads,
json_dumps: _JsonDumps = json.dumps,
timeout: float = DEFAULT_TIMEOUT,
) -> None:
"""
:param api: Telegram Bot API URL patterns
:param json_loads: JSON loader
:param json_dumps: JSON dumper
:param timeout: Session scope request timeout
"""
self.api = api
self.json_loads = json_loads
self.json_dumps = json_dumps
self.timeout = timeout
self.middleware = RequestMiddlewareManager()
def check_response(
self, bot: Bot, method: TelegramMethod[TelegramType], status_code: int, content: str
) -> Response[TelegramType]:
"""
Check response status
"""
try:
json_data = self.json_loads(content)
except Exception as e:
# Handled error type can't be classified as specific error
# in due to decoder can be customized and raise any exception
raise ClientDecodeError("Failed to decode object", e, content)
try:
response_type = Response[method.__returning__] # type: ignore
response = response_type.model_validate(json_data, context={"bot": bot})
except ValidationError as e:
raise ClientDecodeError("Failed to deserialize object", e, json_data)
if HTTPStatus.OK <= status_code <= HTTPStatus.IM_USED and response.ok:
return response
description = cast(str, response.description)
if parameters := response.parameters:
if parameters.retry_after:
raise TelegramRetryAfter(
method=method, message=description, retry_after=parameters.retry_after
)
if parameters.migrate_to_chat_id:
raise TelegramMigrateToChat(
method=method,
message=description,
migrate_to_chat_id=parameters.migrate_to_chat_id,
)
if status_code == HTTPStatus.BAD_REQUEST:
raise TelegramBadRequest(method=method, message=description)
if status_code == HTTPStatus.NOT_FOUND:
raise TelegramNotFound(method=method, message=description)
if status_code == HTTPStatus.CONFLICT:
raise TelegramConflictError(method=method, message=description)
if status_code == HTTPStatus.UNAUTHORIZED:
raise TelegramUnauthorizedError(method=method, message=description)
if status_code == HTTPStatus.FORBIDDEN:
raise TelegramForbiddenError(method=method, message=description)
if status_code == HTTPStatus.REQUEST_ENTITY_TOO_LARGE:
raise TelegramEntityTooLarge(method=method, message=description)
if status_code >= HTTPStatus.INTERNAL_SERVER_ERROR:
if "restart" in description:
raise RestartingTelegram(method=method, message=description)
raise TelegramServerError(method=method, message=description)
raise TelegramAPIError(
method=method,
message=description,
)
@abc.abstractmethod
async def close(self) -> None: # pragma: no cover
"""
Close client session
"""
pass
@abc.abstractmethod
async def make_request(
self,
bot: Bot,
method: TelegramMethod[TelegramType],
timeout: Optional[int] = None,
) -> TelegramType: # pragma: no cover
"""
Make request to Telegram Bot API
:param bot: Bot instance
:param method: Method instance
:param timeout: Request timeout
:return:
:raise TelegramApiError:
"""
pass
@abc.abstractmethod
async def stream_content(
self,
url: str,
headers: Optional[Dict[str, Any]] = None,
timeout: int = 30,
chunk_size: int = 65536,
raise_for_status: bool = True,
) -> AsyncGenerator[bytes, None]: # pragma: no cover
"""
Stream reader
"""
yield b""
def prepare_value(
self,
value: Any,
bot: Bot,
files: Dict[str, Any],
_dumps_json: bool = True,
) -> Any:
"""
Prepare value before send
"""
if value is None:
return None
if isinstance(value, str):
return value
if isinstance(value, Default):
default_value = bot.default[value.name]
return self.prepare_value(default_value, bot=bot, files=files, _dumps_json=_dumps_json)
if isinstance(value, InputFile):
key = secrets.token_urlsafe(10)
files[key] = value
return f"attach://{key}"
if isinstance(value, dict):
value = {
key: prepared_item
for key, item in value.items()
if (
prepared_item := self.prepare_value(
item, bot=bot, files=files, _dumps_json=False
)
)
is not None
}
if _dumps_json:
return self.json_dumps(value)
return value
if isinstance(value, list):
value = [
prepared_item
for item in value
if (
prepared_item := self.prepare_value(
item, bot=bot, files=files, _dumps_json=False
)
)
is not None
]
if _dumps_json:
return self.json_dumps(value)
return value
if isinstance(value, datetime.timedelta):
now = datetime.datetime.now()
return str(round((now + value).timestamp()))
if isinstance(value, datetime.datetime):
return str(round(value.timestamp()))
if isinstance(value, Enum):
return self.prepare_value(value.value, bot=bot, files=files)
if isinstance(value, TelegramObject):
return self.prepare_value(
value.model_dump(warnings=False),
bot=bot,
files=files,
_dumps_json=_dumps_json,
)
if _dumps_json:
return self.json_dumps(value)
return value
async def __call__(
self,
bot: Bot,
method: TelegramMethod[TelegramType],
timeout: Optional[int] = None,
) -> TelegramType:
middleware = self.middleware.wrap_middlewares(self.make_request, timeout=timeout)
return cast(TelegramType, await middleware(bot, method))
async def __aenter__(self) -> BaseSession:
return self
async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
await self.close()

View File

@@ -0,0 +1,53 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Protocol
from aiogram.methods import Response, TelegramMethod
from aiogram.methods.base import TelegramType
if TYPE_CHECKING:
from ...bot import Bot
class NextRequestMiddlewareType(Protocol[TelegramType]): # pragma: no cover
async def __call__(
self,
bot: "Bot",
method: TelegramMethod[TelegramType],
) -> Response[TelegramType]:
pass
class RequestMiddlewareType(Protocol): # pragma: no cover
async def __call__(
self,
make_request: NextRequestMiddlewareType[TelegramType],
bot: "Bot",
method: TelegramMethod[TelegramType],
) -> Response[TelegramType]:
pass
class BaseRequestMiddleware(ABC):
"""
Generic middleware class
"""
@abstractmethod
async def __call__(
self,
make_request: NextRequestMiddlewareType[TelegramType],
bot: "Bot",
method: TelegramMethod[TelegramType],
) -> Response[TelegramType]:
"""
Execute middleware
:param make_request: Wrapped make_request in middlewares chain
:param bot: bot for request making
:param method: Request method (Subclass of :class:`aiogram.methods.base.TelegramMethod`)
:return: :class:`aiogram.methods.Response`
"""
pass

View File

@@ -0,0 +1,62 @@
from __future__ import annotations
from functools import partial
from typing import Any, Callable, List, Optional, Sequence, Union, cast, overload
from aiogram.client.session.middlewares.base import (
NextRequestMiddlewareType,
RequestMiddlewareType,
)
from aiogram.methods.base import TelegramType
class RequestMiddlewareManager(Sequence[RequestMiddlewareType]):
def __init__(self) -> None:
self._middlewares: List[RequestMiddlewareType] = []
def register(
self,
middleware: RequestMiddlewareType,
) -> RequestMiddlewareType:
self._middlewares.append(middleware)
return middleware
def unregister(self, middleware: RequestMiddlewareType) -> None:
self._middlewares.remove(middleware)
def __call__(
self,
middleware: Optional[RequestMiddlewareType] = None,
) -> Union[
Callable[[RequestMiddlewareType], RequestMiddlewareType],
RequestMiddlewareType,
]:
if middleware is None:
return self.register
return self.register(middleware)
@overload
def __getitem__(self, item: int) -> RequestMiddlewareType:
pass
@overload
def __getitem__(self, item: slice) -> Sequence[RequestMiddlewareType]:
pass
def __getitem__(
self, item: Union[int, slice]
) -> Union[RequestMiddlewareType, Sequence[RequestMiddlewareType]]:
return self._middlewares[item]
def __len__(self) -> int:
return len(self._middlewares)
def wrap_middlewares(
self,
callback: NextRequestMiddlewareType[TelegramType],
**kwargs: Any,
) -> NextRequestMiddlewareType[TelegramType]:
middleware = partial(callback, **kwargs)
for m in reversed(self._middlewares):
middleware = partial(m, middleware)
return cast(NextRequestMiddlewareType[TelegramType], middleware)

View File

@@ -0,0 +1,37 @@
import logging
from typing import TYPE_CHECKING, Any, List, Optional, Type
from aiogram import loggers
from aiogram.methods import TelegramMethod
from aiogram.methods.base import Response, TelegramType
from .base import BaseRequestMiddleware, NextRequestMiddlewareType
if TYPE_CHECKING:
from ...bot import Bot
logger = logging.getLogger(__name__)
class RequestLogging(BaseRequestMiddleware):
def __init__(self, ignore_methods: Optional[List[Type[TelegramMethod[Any]]]] = None):
"""
Middleware for logging outgoing requests
:param ignore_methods: methods to ignore in logging middleware
"""
self.ignore_methods = ignore_methods if ignore_methods else []
async def __call__(
self,
make_request: NextRequestMiddlewareType[TelegramType],
bot: "Bot",
method: TelegramMethod[TelegramType],
) -> Response[TelegramType]:
if type(method) not in self.ignore_methods:
loggers.middlewares.info(
"Make request with method=%r by bot id=%d",
type(method).__name__,
bot.id,
)
return await make_request(bot, method)

View File

@@ -0,0 +1,103 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Union
class FilesPathWrapper(ABC):
@abstractmethod
def to_local(self, path: Union[Path, str]) -> Union[Path, str]:
pass
@abstractmethod
def to_server(self, path: Union[Path, str]) -> Union[Path, str]:
pass
class BareFilesPathWrapper(FilesPathWrapper):
def to_local(self, path: Union[Path, str]) -> Union[Path, str]:
return path
def to_server(self, path: Union[Path, str]) -> Union[Path, str]:
return path
class SimpleFilesPathWrapper(FilesPathWrapper):
def __init__(self, server_path: Path, local_path: Path) -> None:
self.server_path = server_path
self.local_path = local_path
@classmethod
def _resolve(
cls, base1: Union[Path, str], base2: Union[Path, str], value: Union[Path, str]
) -> Path:
relative = Path(value).relative_to(base1)
return base2 / relative
def to_local(self, path: Union[Path, str]) -> Union[Path, str]:
return self._resolve(base1=self.server_path, base2=self.local_path, value=path)
def to_server(self, path: Union[Path, str]) -> Union[Path, str]:
return self._resolve(base1=self.local_path, base2=self.server_path, value=path)
@dataclass(frozen=True)
class TelegramAPIServer:
"""
Base config for API Endpoints
"""
base: str
"""Base URL"""
file: str
"""Files URL"""
is_local: bool = False
"""Mark this server is
in `local mode <https://core.telegram.org/bots/api#using-a-local-bot-api-server>`_."""
wrap_local_file: FilesPathWrapper = BareFilesPathWrapper()
"""Callback to wrap files path in local mode"""
def api_url(self, token: str, method: str) -> str:
"""
Generate URL for API methods
:param token: Bot token
:param method: API method name (case insensitive)
:return: URL
"""
return self.base.format(token=token, method=method)
def file_url(self, token: str, path: Union[str, Path]) -> str:
"""
Generate URL for downloading files
:param token: Bot token
:param path: file path
:return: URL
"""
return self.file.format(token=token, path=path)
@classmethod
def from_base(cls, base: str, **kwargs: Any) -> "TelegramAPIServer":
"""
Use this method to auto-generate TelegramAPIServer instance from base URL
:param base: Base URL
:return: instance of :class:`TelegramAPIServer`
"""
base = base.rstrip("/")
return cls(
base=f"{base}/bot{{token}}/{{method}}",
file=f"{base}/file/bot{{token}}/{{path}}",
**kwargs,
)
PRODUCTION = TelegramAPIServer(
base="https://api.telegram.org/bot{token}/{method}",
file="https://api.telegram.org/file/bot{token}/{path}",
)
TEST = TelegramAPIServer(
base="https://api.telegram.org/bot{token}/test/{method}",
file="https://api.telegram.org/file/bot{token}/test/{path}",
)

View File

@@ -0,0 +1,642 @@
from __future__ import annotations
import asyncio
import contextvars
import signal
import warnings
from asyncio import CancelledError, Event, Future, Lock
from contextlib import suppress
from typing import Any, AsyncGenerator, Awaitable, Dict, List, Optional, Set, Union
from .. import loggers
from ..client.bot import Bot
from ..exceptions import TelegramAPIError
from ..fsm.middleware import FSMContextMiddleware
from ..fsm.storage.base import BaseEventIsolation, BaseStorage
from ..fsm.storage.memory import DisabledEventIsolation, MemoryStorage
from ..fsm.strategy import FSMStrategy
from ..methods import GetUpdates, TelegramMethod
from ..methods.base import TelegramType
from ..types import Update, User
from ..types.base import UNSET, UNSET_TYPE
from ..types.update import UpdateTypeLookupError
from ..utils.backoff import Backoff, BackoffConfig
from .event.bases import UNHANDLED, SkipHandler
from .event.telegram import TelegramEventObserver
from .middlewares.error import ErrorsMiddleware
from .middlewares.user_context import UserContextMiddleware
from .router import Router
DEFAULT_BACKOFF_CONFIG = BackoffConfig(min_delay=1.0, max_delay=5.0, factor=1.3, jitter=0.1)
class Dispatcher(Router):
"""
Root router
"""
def __init__(
self,
*, # * - Preventing to pass instance of Bot to the FSM storage
storage: Optional[BaseStorage] = None,
fsm_strategy: FSMStrategy = FSMStrategy.USER_IN_CHAT,
events_isolation: Optional[BaseEventIsolation] = None,
disable_fsm: bool = False,
name: Optional[str] = None,
**kwargs: Any,
) -> None:
"""
Root router
:param storage: Storage for FSM
:param fsm_strategy: FSM strategy
:param events_isolation: Events isolation
:param disable_fsm: Disable FSM, note that if you disable FSM
then you should not use storage and events isolation
:param kwargs: Other arguments, will be passed as keyword arguments to handlers
"""
super(Dispatcher, self).__init__(name=name)
if storage and not isinstance(storage, BaseStorage):
raise TypeError(
f"FSM storage should be instance of 'BaseStorage' not {type(storage).__name__}"
)
# Telegram API provides originally only one event type - Update
# For making easily interactions with events here is registered handler which helps
# to separate Update to different event types like Message, CallbackQuery etc.
self.update = self.observers["update"] = TelegramEventObserver(
router=self, event_name="update"
)
self.update.register(self._listen_update)
# Error handlers should work is out of all other functions
# and should be registered before all others middlewares
self.update.outer_middleware(ErrorsMiddleware(self))
# User context middleware makes small optimization for all other builtin
# middlewares via caching the user and chat instances in the event context
self.update.outer_middleware(UserContextMiddleware())
# FSM middleware should always be registered after User context middleware
# because here is used context from previous step
self.fsm = FSMContextMiddleware(
storage=storage or MemoryStorage(),
strategy=fsm_strategy,
events_isolation=events_isolation or DisabledEventIsolation(),
)
if not disable_fsm:
# Note that when FSM middleware is disabled, the event isolation is also disabled
# Because the isolation mechanism is a part of the FSM
self.update.outer_middleware(self.fsm)
self.shutdown.register(self.fsm.close)
self.workflow_data: Dict[str, Any] = kwargs
self._running_lock = Lock()
self._stop_signal: Optional[Event] = None
self._stopped_signal: Optional[Event] = None
self._handle_update_tasks: Set[asyncio.Task[Any]] = set()
def __getitem__(self, item: str) -> Any:
return self.workflow_data[item]
def __setitem__(self, key: str, value: Any) -> None:
self.workflow_data[key] = value
def __delitem__(self, key: str) -> None:
del self.workflow_data[key]
def get(self, key: str, /, default: Optional[Any] = None) -> Optional[Any]:
return self.workflow_data.get(key, default)
@property
def storage(self) -> BaseStorage:
return self.fsm.storage
@property
def parent_router(self) -> Optional[Router]:
"""
Dispatcher has no parent router and can't be included to any other routers or dispatchers
:return:
"""
return None # noqa: RET501
@parent_router.setter
def parent_router(self, value: Router) -> None:
"""
Dispatcher is root Router then configuring parent router is not allowed
:param value:
:return:
"""
raise RuntimeError("Dispatcher can not be attached to another Router.")
async def feed_update(self, bot: Bot, update: Update, **kwargs: Any) -> Any:
"""
Main entry point for incoming updates
Response of this method can be used as Webhook response
:param bot:
:param update:
"""
loop = asyncio.get_running_loop()
handled = False
start_time = loop.time()
if update.bot != bot:
# Re-mounting update to the current bot instance for making possible to
# use it in shortcuts.
# Here is update is re-created because we need to propagate context to
# all nested objects and attributes of the Update, but it
# is impossible without roundtrip to JSON :(
# The preferred way is that pass already mounted Bot instance to this update
# before call feed_update method
update = Update.model_validate(update.model_dump(), context={"bot": bot})
try:
response = await self.update.wrap_outer_middleware(
self.update.trigger,
update,
{
**self.workflow_data,
**kwargs,
"bot": bot,
},
)
handled = response is not UNHANDLED
return response
finally:
finish_time = loop.time()
duration = (finish_time - start_time) * 1000
loggers.event.info(
"Update id=%s is %s. Duration %d ms by bot id=%d",
update.update_id,
"handled" if handled else "not handled",
duration,
bot.id,
)
async def feed_raw_update(self, bot: Bot, update: Dict[str, Any], **kwargs: Any) -> Any:
"""
Main entry point for incoming updates with automatic Dict->Update serializer
:param bot:
:param update:
:param kwargs:
"""
parsed_update = Update.model_validate(update, context={"bot": bot})
return await self._feed_webhook_update(bot=bot, update=parsed_update, **kwargs)
@classmethod
async def _listen_updates(
cls,
bot: Bot,
polling_timeout: int = 30,
backoff_config: BackoffConfig = DEFAULT_BACKOFF_CONFIG,
allowed_updates: Optional[List[str]] = None,
) -> AsyncGenerator[Update, None]:
"""
Endless updates reader with correctly handling any server-side or connection errors.
So you may not worry that the polling will stop working.
"""
backoff = Backoff(config=backoff_config)
get_updates = GetUpdates(timeout=polling_timeout, allowed_updates=allowed_updates)
kwargs = {}
if bot.session.timeout:
# Request timeout can be lower than session timeout and that's OK.
# To prevent false-positive TimeoutError we should wait longer than polling timeout
kwargs["request_timeout"] = int(bot.session.timeout + polling_timeout)
failed = False
while True:
try:
updates = await bot(get_updates, **kwargs)
except Exception as e:
failed = True
# In cases when Telegram Bot API was inaccessible don't need to stop polling
# process because some developers can't make auto-restarting of the script
loggers.dispatcher.error("Failed to fetch updates - %s: %s", type(e).__name__, e)
# And also backoff timeout is best practice to retry any network activity
loggers.dispatcher.warning(
"Sleep for %f seconds and try again... (tryings = %d, bot id = %d)",
backoff.next_delay,
backoff.counter,
bot.id,
)
await backoff.asleep()
continue
# In case when network connection was fixed let's reset the backoff
# to initial value and then process updates
if failed:
loggers.dispatcher.info(
"Connection established (tryings = %d, bot id = %d)",
backoff.counter,
bot.id,
)
backoff.reset()
failed = False
for update in updates:
yield update
# The getUpdates method returns the earliest 100 unconfirmed updates.
# To confirm an update, use the offset parameter when calling getUpdates
# All updates with update_id less than or equal to offset will be marked
# as confirmed on the server and will no longer be returned.
get_updates.offset = update.update_id + 1
async def _listen_update(self, update: Update, **kwargs: Any) -> Any:
"""
Main updates listener
Workflow:
- Detect content type and propagate to observers in current router
- If no one filter is pass - propagate update to child routers as Update
:param update:
:param kwargs:
:return:
"""
try:
update_type = update.event_type
event = update.event
except UpdateTypeLookupError as e:
warnings.warn(
"Detected unknown update type.\n"
"Seems like Telegram Bot API was updated and you have "
"installed not latest version of aiogram framework"
f"\nUpdate: {update.model_dump_json(exclude_unset=True)}",
RuntimeWarning,
)
raise SkipHandler() from e
kwargs.update(event_update=update)
return await self.propagate_event(update_type=update_type, event=event, **kwargs)
@classmethod
async def silent_call_request(cls, bot: Bot, result: TelegramMethod[Any]) -> None:
"""
Simulate answer into WebHook
:param bot:
:param result:
:return:
"""
try:
await bot(result)
except TelegramAPIError as e:
# In due to WebHook mechanism doesn't allow getting response for
# requests called in answer to WebHook request.
# Need to skip unsuccessful responses.
# For debugging here is added logging.
loggers.event.error("Failed to make answer: %s: %s", e.__class__.__name__, e)
async def _process_update(
self, bot: Bot, update: Update, call_answer: bool = True, **kwargs: Any
) -> bool:
"""
Propagate update to event listeners
:param bot: instance of Bot
:param update: instance of Update
:param call_answer: need to execute response as Telegram method (like answer into webhook)
:param kwargs: contextual data for middlewares, filters and handlers
:return: status
"""
try:
response = await self.feed_update(bot, update, **kwargs)
if call_answer and isinstance(response, TelegramMethod):
await self.silent_call_request(bot=bot, result=response)
return response is not UNHANDLED
except Exception as e:
loggers.event.exception(
"Cause exception while process update id=%d by bot id=%d\n%s: %s",
update.update_id,
bot.id,
e.__class__.__name__,
e,
)
return True # because update was processed but unsuccessful
async def _process_with_semaphore(
self, handle_update: Awaitable[bool], semaphore: asyncio.Semaphore
) -> bool:
"""
Process update with semaphore to limit concurrent tasks
:param handle_update: Coroutine that processes the update
:param semaphore: Semaphore to limit concurrent tasks
:return: bool indicating the result of the update processing
"""
try:
return await handle_update
finally:
semaphore.release()
async def _polling(
self,
bot: Bot,
polling_timeout: int = 30,
handle_as_tasks: bool = True,
backoff_config: BackoffConfig = DEFAULT_BACKOFF_CONFIG,
allowed_updates: Optional[List[str]] = None,
tasks_concurrency_limit: Optional[int] = None,
**kwargs: Any,
) -> None:
"""
Internal polling process
:param bot:
:param polling_timeout: Long-polling wait time
:param handle_as_tasks: Run task for each event and no wait result
:param backoff_config: backoff-retry config
:param allowed_updates: List of the update types you want your bot to receive
:param tasks_concurrency_limit: Maximum number of concurrent updates to process
(None = no limit), used only if handle_as_tasks is True
:param kwargs:
:return:
"""
user: User = await bot.me()
loggers.dispatcher.info(
"Run polling for bot @%s id=%d - %r", user.username, bot.id, user.full_name
)
# Create semaphore if tasks_concurrency_limit is specified
semaphore = None
if tasks_concurrency_limit is not None and handle_as_tasks:
semaphore = asyncio.Semaphore(tasks_concurrency_limit)
try:
async for update in self._listen_updates(
bot,
polling_timeout=polling_timeout,
backoff_config=backoff_config,
allowed_updates=allowed_updates,
):
handle_update = self._process_update(bot=bot, update=update, **kwargs)
if handle_as_tasks:
if semaphore:
# Use semaphore to limit concurrent tasks
await semaphore.acquire()
handle_update_task = asyncio.create_task(
self._process_with_semaphore(handle_update, semaphore)
)
else:
handle_update_task = asyncio.create_task(handle_update)
self._handle_update_tasks.add(handle_update_task)
handle_update_task.add_done_callback(self._handle_update_tasks.discard)
else:
await handle_update
finally:
loggers.dispatcher.info(
"Polling stopped for bot @%s id=%d - %r", user.username, bot.id, user.full_name
)
async def _feed_webhook_update(self, bot: Bot, update: Update, **kwargs: Any) -> Any:
"""
The same with `Dispatcher.process_update()` but returns real response instead of bool
"""
try:
return await self.feed_update(bot, update, **kwargs)
except Exception as e:
loggers.event.exception(
"Cause exception while process update id=%d by bot id=%d\n%s: %s",
update.update_id,
bot.id,
e.__class__.__name__,
e,
)
raise
async def feed_webhook_update(
self, bot: Bot, update: Union[Update, Dict[str, Any]], _timeout: float = 55, **kwargs: Any
) -> Optional[TelegramMethod[TelegramType]]:
if not isinstance(update, Update): # Allow to use raw updates
update = Update.model_validate(update, context={"bot": bot})
ctx = contextvars.copy_context()
loop = asyncio.get_running_loop()
waiter = loop.create_future()
def release_waiter(*_: Any) -> None:
if not waiter.done():
waiter.set_result(None)
timeout_handle = loop.call_later(_timeout, release_waiter)
process_updates: Future[Any] = asyncio.ensure_future(
self._feed_webhook_update(bot=bot, update=update, **kwargs)
)
process_updates.add_done_callback(release_waiter, context=ctx)
def process_response(task: Future[Any]) -> None:
warnings.warn(
"Detected slow response into webhook.\n"
"Telegram is waiting for response only first 60 seconds and then re-send update.\n"
"For preventing this situation response into webhook returned immediately "
"and handler is moved to background and still processing update.",
RuntimeWarning,
)
try:
result = task.result()
except Exception as e:
raise e
if isinstance(result, TelegramMethod):
asyncio.ensure_future(self.silent_call_request(bot=bot, result=result))
try:
try:
await waiter
except CancelledError: # pragma: no cover
process_updates.remove_done_callback(release_waiter)
process_updates.cancel()
raise
if process_updates.done():
# TODO: handle exceptions
response: Any = process_updates.result()
if isinstance(response, TelegramMethod):
return response
else:
process_updates.remove_done_callback(release_waiter)
process_updates.add_done_callback(process_response, context=ctx)
finally:
timeout_handle.cancel()
return None
async def stop_polling(self) -> None:
"""
Execute this method if you want to stop polling programmatically
:return:
"""
if not self._running_lock.locked():
raise RuntimeError("Polling is not started")
if not self._stop_signal or not self._stopped_signal:
return
self._stop_signal.set()
await self._stopped_signal.wait()
def _signal_stop_polling(self, sig: signal.Signals) -> None:
if not self._running_lock.locked():
return
loggers.dispatcher.warning("Received %s signal", sig.name)
if not self._stop_signal:
return
self._stop_signal.set()
async def start_polling(
self,
*bots: Bot,
polling_timeout: int = 10,
handle_as_tasks: bool = True,
backoff_config: BackoffConfig = DEFAULT_BACKOFF_CONFIG,
allowed_updates: Optional[Union[List[str], UNSET_TYPE]] = UNSET,
handle_signals: bool = True,
close_bot_session: bool = True,
tasks_concurrency_limit: Optional[int] = None,
**kwargs: Any,
) -> None:
"""
Polling runner
:param bots: Bot instances (one or more)
:param polling_timeout: Long-polling wait time
:param handle_as_tasks: Run task for each event and no wait result
:param backoff_config: backoff-retry config
:param allowed_updates: List of the update types you want your bot to receive
By default, all used update types are enabled (resolved from handlers)
:param handle_signals: handle signals (SIGINT/SIGTERM)
:param close_bot_session: close bot sessions on shutdown
:param tasks_concurrency_limit: Maximum number of concurrent updates to process
(None = no limit), used only if handle_as_tasks is True
:param kwargs: contextual data
:return:
"""
if not bots:
raise ValueError("At least one bot instance is required to start polling")
if "bot" in kwargs:
raise ValueError(
"Keyword argument 'bot' is not acceptable, "
"the bot instance should be passed as positional argument"
)
async with self._running_lock: # Prevent to run this method twice at a once
if self._stop_signal is None:
self._stop_signal = Event()
if self._stopped_signal is None:
self._stopped_signal = Event()
if allowed_updates is UNSET:
allowed_updates = self.resolve_used_update_types()
self._stop_signal.clear()
self._stopped_signal.clear()
if handle_signals:
loop = asyncio.get_running_loop()
with suppress(NotImplementedError): # pragma: no cover
# Signals handling is not supported on Windows
# It also can't be covered on Windows
loop.add_signal_handler(
signal.SIGTERM, self._signal_stop_polling, signal.SIGTERM
)
loop.add_signal_handler(
signal.SIGINT, self._signal_stop_polling, signal.SIGINT
)
workflow_data = {
"dispatcher": self,
"bots": bots,
**self.workflow_data,
**kwargs,
}
if "bot" in workflow_data:
workflow_data.pop("bot")
await self.emit_startup(bot=bots[-1], **workflow_data)
loggers.dispatcher.info("Start polling")
try:
tasks: List[asyncio.Task[Any]] = [
asyncio.create_task(
self._polling(
bot=bot,
handle_as_tasks=handle_as_tasks,
polling_timeout=polling_timeout,
backoff_config=backoff_config,
allowed_updates=allowed_updates,
tasks_concurrency_limit=tasks_concurrency_limit,
**workflow_data,
)
)
for bot in bots
]
tasks.append(asyncio.create_task(self._stop_signal.wait()))
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for task in pending:
# (mostly) Graceful shutdown unfinished tasks
task.cancel()
with suppress(CancelledError):
await task
# Wait finished tasks to propagate unhandled exceptions
await asyncio.gather(*done)
finally:
loggers.dispatcher.info("Polling stopped")
try:
await self.emit_shutdown(bot=bots[-1], **workflow_data)
finally:
if close_bot_session:
await asyncio.gather(*(bot.session.close() for bot in bots))
self._stopped_signal.set()
def run_polling(
self,
*bots: Bot,
polling_timeout: int = 10,
handle_as_tasks: bool = True,
backoff_config: BackoffConfig = DEFAULT_BACKOFF_CONFIG,
allowed_updates: Optional[Union[List[str], UNSET_TYPE]] = UNSET,
handle_signals: bool = True,
close_bot_session: bool = True,
tasks_concurrency_limit: Optional[int] = None,
**kwargs: Any,
) -> None:
"""
Run many bots with polling
:param bots: Bot instances (one or more)
:param polling_timeout: Long-polling wait time
:param handle_as_tasks: Run task for each event and no wait result
:param backoff_config: backoff-retry config
:param allowed_updates: List of the update types you want your bot to receive
:param handle_signals: handle signals (SIGINT/SIGTERM)
:param close_bot_session: close bot sessions on shutdown
:param tasks_concurrency_limit: Maximum number of concurrent updates to process
(None = no limit), used only if handle_as_tasks is True
:param kwargs: contextual data
:return:
"""
with suppress(KeyboardInterrupt):
return asyncio.run(
self.start_polling(
*bots,
**kwargs,
polling_timeout=polling_timeout,
handle_as_tasks=handle_as_tasks,
backoff_config=backoff_config,
allowed_updates=allowed_updates,
handle_signals=handle_signals,
close_bot_session=close_bot_session,
tasks_concurrency_limit=tasks_concurrency_limit,
)
)

View File

@@ -0,0 +1,35 @@
from __future__ import annotations
from typing import Any, Awaitable, Callable, Dict, NoReturn, Optional, TypeVar, Union
from unittest.mock import sentinel
from ...types import TelegramObject
from ..middlewares.base import BaseMiddleware
MiddlewareEventType = TypeVar("MiddlewareEventType", bound=TelegramObject)
NextMiddlewareType = Callable[[MiddlewareEventType, Dict[str, Any]], Awaitable[Any]]
MiddlewareType = Union[
BaseMiddleware,
Callable[
[NextMiddlewareType[MiddlewareEventType], MiddlewareEventType, Dict[str, Any]],
Awaitable[Any],
],
]
UNHANDLED = sentinel.UNHANDLED
REJECTED = sentinel.REJECTED
class SkipHandler(Exception):
pass
class CancelHandler(Exception):
pass
def skip(message: Optional[str] = None) -> NoReturn:
"""
Raise an SkipHandler
"""
raise SkipHandler(message or "Event skipped")

View File

@@ -0,0 +1,53 @@
from __future__ import annotations
from typing import Any, Callable, List
from .handler import CallbackType, HandlerObject
class EventObserver:
"""
Simple events observer
Is used for managing events is not related with Telegram
(For example startup/shutdown processes)
Handlers can be registered via decorator or method
.. code-block:: python
<observer>.register(my_handler)
.. code-block:: python
@<observer>()
async def my_handler(*args, **kwargs): ...
"""
def __init__(self) -> None:
self.handlers: List[HandlerObject] = []
def register(self, callback: CallbackType) -> None:
"""
Register callback with filters
"""
self.handlers.append(HandlerObject(callback=callback))
async def trigger(self, *args: Any, **kwargs: Any) -> None:
"""
Propagate event to handlers.
Handler will be called when all its filters is pass.
"""
for handler in self.handlers:
await handler.call(*args, **kwargs)
def __call__(self) -> Callable[[CallbackType], CallbackType]:
"""
Decorator for registering event handlers
"""
def wrapper(callback: CallbackType) -> CallbackType:
self.register(callback)
return callback
return wrapper

View File

@@ -0,0 +1,95 @@
import asyncio
import contextvars
import inspect
import warnings
from dataclasses import dataclass, field
from functools import partial
from typing import Any, Callable, Dict, List, Optional, Set, Tuple
from magic_filter.magic import MagicFilter as OriginalMagicFilter
from aiogram.dispatcher.flags import extract_flags_from_object
from aiogram.filters.base import Filter
from aiogram.handlers import BaseHandler
from aiogram.utils.magic_filter import MagicFilter
from aiogram.utils.warnings import Recommendation
CallbackType = Callable[..., Any]
@dataclass
class CallableObject:
callback: CallbackType
awaitable: bool = field(init=False)
params: Set[str] = field(init=False)
varkw: bool = field(init=False)
def __post_init__(self) -> None:
callback = inspect.unwrap(self.callback)
self.awaitable = inspect.isawaitable(callback) or inspect.iscoroutinefunction(callback)
spec = inspect.getfullargspec(callback)
self.params = {*spec.args, *spec.kwonlyargs}
self.varkw = spec.varkw is not None
def _prepare_kwargs(self, kwargs: Dict[str, Any]) -> Dict[str, Any]:
if self.varkw:
return kwargs
return {k: kwargs[k] for k in self.params if k in kwargs}
async def call(self, *args: Any, **kwargs: Any) -> Any:
wrapped = partial(self.callback, *args, **self._prepare_kwargs(kwargs))
if self.awaitable:
return await wrapped()
return await asyncio.to_thread(wrapped)
@dataclass
class FilterObject(CallableObject):
magic: Optional[MagicFilter] = None
def __post_init__(self) -> None:
if isinstance(self.callback, OriginalMagicFilter):
# MagicFilter instance is callable but generates
# only "CallOperation" instead of applying the filter
self.magic = self.callback
self.callback = self.callback.resolve
if not isinstance(self.magic, MagicFilter):
# Issue: https://github.com/aiogram/aiogram/issues/990
warnings.warn(
category=Recommendation,
message="You are using F provided by magic_filter package directly, "
"but it lacks `.as_()` extension."
"\n Please change the import statement: from `from magic_filter import F` "
"to `from aiogram import F` to silence this warning.",
stacklevel=6,
)
super(FilterObject, self).__post_init__()
if isinstance(self.callback, Filter):
self.awaitable = True
@dataclass
class HandlerObject(CallableObject):
filters: Optional[List[FilterObject]] = None
flags: Dict[str, Any] = field(default_factory=dict)
def __post_init__(self) -> None:
super(HandlerObject, self).__post_init__()
callback = inspect.unwrap(self.callback)
if inspect.isclass(callback) and issubclass(callback, BaseHandler):
self.awaitable = True
self.flags.update(extract_flags_from_object(callback))
async def check(self, *args: Any, **kwargs: Any) -> Tuple[bool, Dict[str, Any]]:
if not self.filters:
return True, kwargs
for event_filter in self.filters:
check = await event_filter.call(*args, **kwargs)
if not check:
return False, kwargs
if isinstance(check, dict):
kwargs.update(check)
return True, kwargs

View File

@@ -0,0 +1,141 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional
from aiogram.dispatcher.middlewares.manager import MiddlewareManager
from ...exceptions import UnsupportedKeywordArgument
from ...filters.base import Filter
from ...types import TelegramObject
from .bases import UNHANDLED, MiddlewareType, SkipHandler
from .handler import CallbackType, FilterObject, HandlerObject
if TYPE_CHECKING:
from aiogram.dispatcher.router import Router
class TelegramEventObserver:
"""
Event observer for Telegram events
Here you can register handler with filter.
This observer will stop event propagation when first handler is pass.
"""
def __init__(self, router: Router, event_name: str) -> None:
self.router: Router = router
self.event_name: str = event_name
self.handlers: List[HandlerObject] = []
self.middleware = MiddlewareManager()
self.outer_middleware = MiddlewareManager()
# Re-used filters check method from already implemented handler object
# with dummy callback which never will be used
self._handler = HandlerObject(callback=lambda: True, filters=[])
def filter(self, *filters: CallbackType) -> None:
"""
Register filter for all handlers of this event observer
:param filters: positional filters
"""
if self._handler.filters is None:
self._handler.filters = []
self._handler.filters.extend([FilterObject(filter_) for filter_ in filters])
def _resolve_middlewares(self) -> List[MiddlewareType[TelegramObject]]:
middlewares: List[MiddlewareType[TelegramObject]] = []
for router in reversed(tuple(self.router.chain_head)):
observer = router.observers.get(self.event_name)
if observer:
middlewares.extend(observer.middleware)
return middlewares
def register(
self,
callback: CallbackType,
*filters: CallbackType,
flags: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> CallbackType:
"""
Register event handler
"""
if kwargs:
raise UnsupportedKeywordArgument(
"Passing any additional keyword arguments to the registrar method "
"is not supported.\n"
"This error may be caused when you are trying to register filters like in 2.x "
"version of this framework, if it's true just look at correspoding "
"documentation pages.\n"
f"Please remove the {set(kwargs.keys())} arguments from this call.\n"
)
if flags is None:
flags = {}
for item in filters:
if isinstance(item, Filter):
item.update_handler_flags(flags=flags)
self.handlers.append(
HandlerObject(
callback=callback,
filters=[FilterObject(filter_) for filter_ in filters],
flags=flags,
)
)
return callback
def wrap_outer_middleware(
self, callback: Any, event: TelegramObject, data: Dict[str, Any]
) -> Any:
wrapped_outer = self.middleware.wrap_middlewares(
self.outer_middleware,
callback,
)
return wrapped_outer(event, data)
def check_root_filters(self, event: TelegramObject, **kwargs: Any) -> Any:
return self._handler.check(event, **kwargs)
async def trigger(self, event: TelegramObject, **kwargs: Any) -> Any:
"""
Propagate event to handlers and stops propagation on first match.
Handler will be called when all its filters are pass.
"""
for handler in self.handlers:
kwargs["handler"] = handler
result, data = await handler.check(event, **kwargs)
if result:
kwargs.update(data)
try:
wrapped_inner = self.outer_middleware.wrap_middlewares(
self._resolve_middlewares(),
handler.call,
)
return await wrapped_inner(event, kwargs)
except SkipHandler:
continue
return UNHANDLED
def __call__(
self,
*filters: CallbackType,
flags: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Callable[[CallbackType], CallbackType]:
"""
Decorator for registering event handlers
"""
def wrapper(callback: CallbackType) -> CallbackType:
self.register(callback, *filters, flags=flags, **kwargs)
return callback
return wrapper

View File

@@ -0,0 +1,127 @@
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Callable, Dict, Optional, Union, cast, overload
from magic_filter import AttrDict, MagicFilter
if TYPE_CHECKING:
from aiogram.dispatcher.event.handler import HandlerObject
@dataclass(frozen=True)
class Flag:
name: str
value: Any
@dataclass(frozen=True)
class FlagDecorator:
flag: Flag
@classmethod
def _with_flag(cls, flag: Flag) -> "FlagDecorator":
return cls(flag)
def _with_value(self, value: Any) -> "FlagDecorator":
new_flag = Flag(self.flag.name, value)
return self._with_flag(new_flag)
@overload
def __call__(self, value: Callable[..., Any], /) -> Callable[..., Any]: # type: ignore
pass
@overload
def __call__(self, value: Any, /) -> "FlagDecorator":
pass
@overload
def __call__(self, **kwargs: Any) -> "FlagDecorator":
pass
def __call__(
self,
value: Optional[Any] = None,
**kwargs: Any,
) -> Union[Callable[..., Any], "FlagDecorator"]:
if value and kwargs:
raise ValueError("The arguments `value` and **kwargs can not be used together")
if value is not None and callable(value):
value.aiogram_flag = {
**extract_flags_from_object(value),
self.flag.name: self.flag.value,
}
return cast(Callable[..., Any], value)
return self._with_value(AttrDict(kwargs) if value is None else value)
if TYPE_CHECKING:
class _ChatActionFlagProtocol(FlagDecorator):
def __call__( # type: ignore[override]
self,
action: str = ...,
interval: float = ...,
initial_sleep: float = ...,
**kwargs: Any,
) -> FlagDecorator:
pass
class FlagGenerator:
def __getattr__(self, name: str) -> FlagDecorator:
if name[0] == "_":
raise AttributeError("Flag name must NOT start with underscore")
return FlagDecorator(Flag(name, True))
if TYPE_CHECKING:
chat_action: _ChatActionFlagProtocol
def extract_flags_from_object(obj: Any) -> Dict[str, Any]:
if not hasattr(obj, "aiogram_flag"):
return {}
return cast(Dict[str, Any], obj.aiogram_flag)
def extract_flags(handler: Union["HandlerObject", Dict[str, Any]]) -> Dict[str, Any]:
"""
Extract flags from handler or middleware context data
:param handler: handler object or data
:return: dictionary with all handler flags
"""
if isinstance(handler, dict) and "handler" in handler:
handler = handler["handler"]
if hasattr(handler, "flags"):
return handler.flags
return {}
def get_flag(
handler: Union["HandlerObject", Dict[str, Any]],
name: str,
*,
default: Optional[Any] = None,
) -> Any:
"""
Get flag by name
:param handler: handler object or data
:param name: name of the flag
:param default: default value (None)
:return: value of the flag or default
"""
flags = extract_flags(handler)
return flags.get(name, default)
def check_flags(handler: Union["HandlerObject", Dict[str, Any]], magic: MagicFilter) -> Any:
"""
Check flags via magic filter
:param handler: handler object or data
:param magic: instance of the magic
:return: the result of magic filter check
"""
flags = extract_flags(handler)
return magic.resolve(AttrDict(flags))

View File

@@ -0,0 +1,29 @@
from abc import ABC, abstractmethod
from typing import Any, Awaitable, Callable, Dict, TypeVar
from aiogram.types import TelegramObject
T = TypeVar("T")
class BaseMiddleware(ABC):
"""
Generic middleware class
"""
@abstractmethod
async def __call__(
self,
handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
event: TelegramObject,
data: Dict[str, Any],
) -> Any: # pragma: no cover
"""
Execute middleware
:param handler: Wrapped handler in middlewares chain
:param event: Incoming event (Subclass of :class:`aiogram.types.base.TelegramObject`)
:param data: Contextual data. Will be mapped to handler arguments
:return: :class:`Any`
"""
pass

View File

@@ -0,0 +1,97 @@
from __future__ import annotations
from typing import TYPE_CHECKING, TypedDict
from typing_extensions import NotRequired
if TYPE_CHECKING:
from aiogram import Bot, Dispatcher, Router
from aiogram.dispatcher.event.handler import HandlerObject
from aiogram.dispatcher.middlewares.user_context import EventContext
from aiogram.fsm.context import FSMContext
from aiogram.fsm.storage.base import BaseStorage
from aiogram.types import Chat, Update, User
from aiogram.utils.i18n import I18n, I18nMiddleware
class DispatcherData(TypedDict, total=False):
"""
Dispatcher and bot related data.
"""
dispatcher: Dispatcher
"""Instance of the Dispatcher from which the handler was called."""
bot: Bot
"""Bot that received the update."""
bots: NotRequired[list[Bot]]
"""List of all bots in the Dispatcher. Used only in polling mode."""
event_update: Update
"""Update object that triggered the handler."""
event_router: Router
"""Router that was used to find the handler."""
handler: NotRequired[HandlerObject]
"""Handler object that was called.
Available only in the handler itself and inner middlewares."""
class UserContextData(TypedDict, total=False):
"""
Event context related data about user and chat.
"""
event_context: EventContext
"""Event context object that contains user and chat data."""
event_from_user: NotRequired[User]
"""User object that triggered the handler."""
event_chat: NotRequired[Chat]
"""Chat object that triggered the handler.
.. deprecated:: 3.5.0
Use :attr:`event_context.chat` instead."""
event_thread_id: NotRequired[int]
"""Thread ID of the chat that triggered the handler.
.. deprecated:: 3.5.0
Use :attr:`event_context.chat` instead."""
event_business_connection_id: NotRequired[str]
"""Business connection ID of the chat that triggered the handler.
.. deprecated:: 3.5.0
Use :attr:`event_context.business_connection_id` instead."""
class FSMData(TypedDict, total=False):
"""
FSM related data.
"""
fsm_storage: BaseStorage
"""Storage used for FSM."""
state: NotRequired[FSMContext]
"""Current state of the FSM."""
raw_state: NotRequired[str | None]
"""Raw state of the FSM."""
class I18nData(TypedDict, total=False):
"""
I18n related data.
Is not included by default, you need to add it to your own Data class if you need it.
"""
i18n: I18n
"""I18n object."""
i18n_middleware: I18nMiddleware
"""I18n middleware."""
class MiddlewareData(
DispatcherData,
UserContextData,
FSMData,
# I18nData, # Disabled by default, add it if you need it to your own Data class.
total=False,
):
"""
Data passed to the handler by the middlewares.
You can add your own data by extending this class.
"""

View File

@@ -0,0 +1,36 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, cast
from ...types import TelegramObject, Update
from ...types.error_event import ErrorEvent
from ..event.bases import UNHANDLED, CancelHandler, SkipHandler
from .base import BaseMiddleware
if TYPE_CHECKING:
from ..router import Router
class ErrorsMiddleware(BaseMiddleware):
def __init__(self, router: Router):
self.router = router
async def __call__(
self,
handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
event: TelegramObject,
data: Dict[str, Any],
) -> Any:
try:
return await handler(event, data)
except (SkipHandler, CancelHandler): # pragma: no cover
raise
except Exception as e:
response = await self.router.propagate_event(
update_type="error",
event=ErrorEvent(update=cast(Update, event), exception=e),
**data,
)
if response is not UNHANDLED:
return response
raise

View File

@@ -0,0 +1,65 @@
import functools
from typing import Any, Callable, Dict, List, Optional, Sequence, Union, overload
from aiogram.dispatcher.event.bases import (
MiddlewareEventType,
MiddlewareType,
NextMiddlewareType,
)
from aiogram.dispatcher.event.handler import CallbackType
from aiogram.types import TelegramObject
class MiddlewareManager(Sequence[MiddlewareType[TelegramObject]]):
def __init__(self) -> None:
self._middlewares: List[MiddlewareType[TelegramObject]] = []
def register(
self,
middleware: MiddlewareType[TelegramObject],
) -> MiddlewareType[TelegramObject]:
self._middlewares.append(middleware)
return middleware
def unregister(self, middleware: MiddlewareType[TelegramObject]) -> None:
self._middlewares.remove(middleware)
def __call__(
self,
middleware: Optional[MiddlewareType[TelegramObject]] = None,
) -> Union[
Callable[[MiddlewareType[TelegramObject]], MiddlewareType[TelegramObject]],
MiddlewareType[TelegramObject],
]:
if middleware is None:
return self.register
return self.register(middleware)
@overload
def __getitem__(self, item: int) -> MiddlewareType[TelegramObject]:
pass
@overload
def __getitem__(self, item: slice) -> Sequence[MiddlewareType[TelegramObject]]:
pass
def __getitem__(
self, item: Union[int, slice]
) -> Union[MiddlewareType[TelegramObject], Sequence[MiddlewareType[TelegramObject]]]:
return self._middlewares[item]
def __len__(self) -> int:
return len(self._middlewares)
@staticmethod
def wrap_middlewares(
middlewares: Sequence[MiddlewareType[MiddlewareEventType]], handler: CallbackType
) -> NextMiddlewareType[MiddlewareEventType]:
@functools.wraps(handler)
def handler_wrapper(event: TelegramObject, kwargs: Dict[str, Any]) -> Any:
return handler(event, **kwargs)
middleware = handler_wrapper
for m in reversed(middlewares):
middleware = functools.partial(m, middleware) # type: ignore[assignment]
return middleware

View File

@@ -0,0 +1,182 @@
from dataclasses import dataclass
from typing import Any, Awaitable, Callable, Dict, Optional
from aiogram.dispatcher.middlewares.base import BaseMiddleware
from aiogram.types import (
Chat,
ChatBoostSourcePremium,
InaccessibleMessage,
TelegramObject,
Update,
User,
)
EVENT_CONTEXT_KEY = "event_context"
EVENT_FROM_USER_KEY = "event_from_user"
EVENT_CHAT_KEY = "event_chat"
EVENT_THREAD_ID_KEY = "event_thread_id"
@dataclass(frozen=True)
class EventContext:
chat: Optional[Chat] = None
user: Optional[User] = None
thread_id: Optional[int] = None
business_connection_id: Optional[str] = None
@property
def user_id(self) -> Optional[int]:
return self.user.id if self.user else None
@property
def chat_id(self) -> Optional[int]:
return self.chat.id if self.chat else None
class UserContextMiddleware(BaseMiddleware):
async def __call__(
self,
handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
event: TelegramObject,
data: Dict[str, Any],
) -> Any:
if not isinstance(event, Update):
raise RuntimeError("UserContextMiddleware got an unexpected event type!")
event_context = data[EVENT_CONTEXT_KEY] = self.resolve_event_context(event=event)
# Backward compatibility
if event_context.user is not None:
data[EVENT_FROM_USER_KEY] = event_context.user
if event_context.chat is not None:
data[EVENT_CHAT_KEY] = event_context.chat
if event_context.thread_id is not None:
data[EVENT_THREAD_ID_KEY] = event_context.thread_id
return await handler(event, data)
@classmethod
def resolve_event_context(cls, event: Update) -> EventContext:
"""
Resolve chat and user instance from Update object
"""
if event.message:
return EventContext(
chat=event.message.chat,
user=event.message.from_user,
thread_id=(
event.message.message_thread_id if event.message.is_topic_message else None
),
)
if event.edited_message:
return EventContext(
chat=event.edited_message.chat,
user=event.edited_message.from_user,
thread_id=(
event.edited_message.message_thread_id
if event.edited_message.is_topic_message
else None
),
)
if event.channel_post:
return EventContext(chat=event.channel_post.chat)
if event.edited_channel_post:
return EventContext(chat=event.edited_channel_post.chat)
if event.inline_query:
return EventContext(user=event.inline_query.from_user)
if event.chosen_inline_result:
return EventContext(user=event.chosen_inline_result.from_user)
if event.callback_query:
callback_query_message = event.callback_query.message
if callback_query_message:
return EventContext(
chat=callback_query_message.chat,
user=event.callback_query.from_user,
thread_id=(
callback_query_message.message_thread_id
if not isinstance(callback_query_message, InaccessibleMessage)
and callback_query_message.is_topic_message
else None
),
business_connection_id=(
callback_query_message.business_connection_id
if not isinstance(callback_query_message, InaccessibleMessage)
else None
),
)
return EventContext(user=event.callback_query.from_user)
if event.shipping_query:
return EventContext(user=event.shipping_query.from_user)
if event.pre_checkout_query:
return EventContext(user=event.pre_checkout_query.from_user)
if event.poll_answer:
return EventContext(
chat=event.poll_answer.voter_chat,
user=event.poll_answer.user,
)
if event.my_chat_member:
return EventContext(
chat=event.my_chat_member.chat, user=event.my_chat_member.from_user
)
if event.chat_member:
return EventContext(chat=event.chat_member.chat, user=event.chat_member.from_user)
if event.chat_join_request:
return EventContext(
chat=event.chat_join_request.chat, user=event.chat_join_request.from_user
)
if event.message_reaction:
return EventContext(
chat=event.message_reaction.chat,
user=event.message_reaction.user,
)
if event.message_reaction_count:
return EventContext(chat=event.message_reaction_count.chat)
if event.chat_boost:
# We only check the premium source, because only it has a sender user,
# other sources have a user, but it is not the sender, but the recipient
if isinstance(event.chat_boost.boost.source, ChatBoostSourcePremium):
return EventContext(
chat=event.chat_boost.chat,
user=event.chat_boost.boost.source.user,
)
return EventContext(chat=event.chat_boost.chat)
if event.removed_chat_boost:
return EventContext(chat=event.removed_chat_boost.chat)
if event.deleted_business_messages:
return EventContext(
chat=event.deleted_business_messages.chat,
business_connection_id=event.deleted_business_messages.business_connection_id,
)
if event.business_connection:
return EventContext(
user=event.business_connection.user,
business_connection_id=event.business_connection.id,
)
if event.business_message:
return EventContext(
chat=event.business_message.chat,
user=event.business_message.from_user,
thread_id=(
event.business_message.message_thread_id
if event.business_message.is_topic_message
else None
),
business_connection_id=event.business_message.business_connection_id,
)
if event.edited_business_message:
return EventContext(
chat=event.edited_business_message.chat,
user=event.edited_business_message.from_user,
thread_id=(
event.edited_business_message.message_thread_id
if event.edited_business_message.is_topic_message
else None
),
business_connection_id=event.edited_business_message.business_connection_id,
)
if event.purchased_paid_media:
return EventContext(
user=event.purchased_paid_media.from_user,
)
return EventContext()

View File

@@ -0,0 +1,275 @@
from __future__ import annotations
from typing import Any, Dict, Final, Generator, List, Optional, Set
from ..types import TelegramObject
from .event.bases import REJECTED, UNHANDLED
from .event.event import EventObserver
from .event.telegram import TelegramEventObserver
INTERNAL_UPDATE_TYPES: Final[frozenset[str]] = frozenset({"update", "error"})
class Router:
"""
Router can route update, and it nested update types like messages, callback query,
polls and all other event types.
Event handlers can be registered in observer by two ways:
- By observer method - :obj:`router.<event_type>.register(handler, <filters, ...>)`
- By decorator - :obj:`@router.<event_type>(<filters, ...>)`
"""
def __init__(self, *, name: Optional[str] = None) -> None:
"""
:param name: Optional router name, can be useful for debugging
"""
self.name = name or hex(id(self))
self._parent_router: Optional[Router] = None
self.sub_routers: List[Router] = []
# Observers
self.message = TelegramEventObserver(router=self, event_name="message")
self.edited_message = TelegramEventObserver(router=self, event_name="edited_message")
self.channel_post = TelegramEventObserver(router=self, event_name="channel_post")
self.edited_channel_post = TelegramEventObserver(
router=self, event_name="edited_channel_post"
)
self.inline_query = TelegramEventObserver(router=self, event_name="inline_query")
self.chosen_inline_result = TelegramEventObserver(
router=self, event_name="chosen_inline_result"
)
self.callback_query = TelegramEventObserver(router=self, event_name="callback_query")
self.shipping_query = TelegramEventObserver(router=self, event_name="shipping_query")
self.pre_checkout_query = TelegramEventObserver(
router=self, event_name="pre_checkout_query"
)
self.poll = TelegramEventObserver(router=self, event_name="poll")
self.poll_answer = TelegramEventObserver(router=self, event_name="poll_answer")
self.my_chat_member = TelegramEventObserver(router=self, event_name="my_chat_member")
self.chat_member = TelegramEventObserver(router=self, event_name="chat_member")
self.chat_join_request = TelegramEventObserver(router=self, event_name="chat_join_request")
self.message_reaction = TelegramEventObserver(router=self, event_name="message_reaction")
self.message_reaction_count = TelegramEventObserver(
router=self, event_name="message_reaction_count"
)
self.chat_boost = TelegramEventObserver(router=self, event_name="chat_boost")
self.removed_chat_boost = TelegramEventObserver(
router=self, event_name="removed_chat_boost"
)
self.deleted_business_messages = TelegramEventObserver(
router=self, event_name="deleted_business_messages"
)
self.business_connection = TelegramEventObserver(
router=self, event_name="business_connection"
)
self.edited_business_message = TelegramEventObserver(
router=self, event_name="edited_business_message"
)
self.business_message = TelegramEventObserver(router=self, event_name="business_message")
self.purchased_paid_media = TelegramEventObserver(
router=self, event_name="purchased_paid_media"
)
self.errors = self.error = TelegramEventObserver(router=self, event_name="error")
self.startup = EventObserver()
self.shutdown = EventObserver()
self.observers: Dict[str, TelegramEventObserver] = {
"message": self.message,
"edited_message": self.edited_message,
"channel_post": self.channel_post,
"edited_channel_post": self.edited_channel_post,
"inline_query": self.inline_query,
"chosen_inline_result": self.chosen_inline_result,
"callback_query": self.callback_query,
"shipping_query": self.shipping_query,
"pre_checkout_query": self.pre_checkout_query,
"poll": self.poll,
"poll_answer": self.poll_answer,
"my_chat_member": self.my_chat_member,
"chat_member": self.chat_member,
"chat_join_request": self.chat_join_request,
"message_reaction": self.message_reaction,
"message_reaction_count": self.message_reaction_count,
"chat_boost": self.chat_boost,
"removed_chat_boost": self.removed_chat_boost,
"deleted_business_messages": self.deleted_business_messages,
"business_connection": self.business_connection,
"edited_business_message": self.edited_business_message,
"business_message": self.business_message,
"purchased_paid_media": self.purchased_paid_media,
"error": self.errors,
}
def __str__(self) -> str:
return f"{type(self).__name__} {self.name!r}"
def __repr__(self) -> str:
return f"<{self}>"
def resolve_used_update_types(self, skip_events: Optional[Set[str]] = None) -> List[str]:
"""
Resolve registered event names
Is useful for getting updates only for registered event types.
:param skip_events: skip specified event names
:return: set of registered names
"""
handlers_in_use: Set[str] = set()
if skip_events is None:
skip_events = set()
skip_events = {*skip_events, *INTERNAL_UPDATE_TYPES}
for router in self.chain_tail:
for update_name, observer in router.observers.items():
if observer.handlers and update_name not in skip_events:
handlers_in_use.add(update_name)
return list(sorted(handlers_in_use)) # NOQA: C413
async def propagate_event(self, update_type: str, event: TelegramObject, **kwargs: Any) -> Any:
kwargs.update(event_router=self)
observer = self.observers.get(update_type)
async def _wrapped(telegram_event: TelegramObject, **data: Any) -> Any:
return await self._propagate_event(
observer=observer, update_type=update_type, event=telegram_event, **data
)
if observer:
return await observer.wrap_outer_middleware(_wrapped, event=event, data=kwargs)
return await _wrapped(event, **kwargs)
async def _propagate_event(
self,
observer: Optional[TelegramEventObserver],
update_type: str,
event: TelegramObject,
**kwargs: Any,
) -> Any:
response = UNHANDLED
if observer:
# Check globally defined filters before any other handler will be checked.
# This check is placed here instead of `trigger` method to add possibility
# to pass context to handlers from global filters.
result, data = await observer.check_root_filters(event, **kwargs)
if not result:
return UNHANDLED
kwargs.update(data)
response = await observer.trigger(event, **kwargs)
if response is REJECTED: # pragma: no cover
# Possible only if some handler returns REJECTED
return UNHANDLED
if response is not UNHANDLED:
return response
for router in self.sub_routers:
response = await router.propagate_event(update_type=update_type, event=event, **kwargs)
if response is not UNHANDLED:
break
return response
@property
def chain_head(self) -> Generator[Router, None, None]:
router: Optional[Router] = self
while router:
yield router
router = router.parent_router
@property
def chain_tail(self) -> Generator[Router, None, None]:
yield self
for router in self.sub_routers:
yield from router.chain_tail
@property
def parent_router(self) -> Optional[Router]:
return self._parent_router
@parent_router.setter
def parent_router(self, router: Router) -> None:
"""
Internal property setter of parent router fot this router.
Do not use this method in own code.
All routers should be included via `include_router` method.
Self- and circular- referencing are not allowed here
:param router:
"""
if not isinstance(router, Router):
raise ValueError(f"router should be instance of Router not {type(router).__name__!r}")
if self._parent_router:
raise RuntimeError(f"Router is already attached to {self._parent_router!r}")
if self == router:
raise RuntimeError("Self-referencing routers is not allowed")
parent: Optional[Router] = router
while parent is not None:
if parent == self:
raise RuntimeError("Circular referencing of Router is not allowed")
parent = parent.parent_router
self._parent_router = router
router.sub_routers.append(self)
def include_routers(self, *routers: Router) -> None:
"""
Attach multiple routers.
:param routers:
:return:
"""
if not routers:
raise ValueError("At least one router must be provided")
for router in routers:
self.include_router(router)
def include_router(self, router: Router) -> Router:
"""
Attach another router.
:param router:
:return:
"""
if not isinstance(router, Router):
raise ValueError(
f"router should be instance of Router not {type(router).__class__.__name__}"
)
router.parent_router = self
return router
async def emit_startup(self, *args: Any, **kwargs: Any) -> None:
"""
Recursively call startup callbacks
:param args:
:param kwargs:
:return:
"""
kwargs.update(router=self)
await self.startup.trigger(*args, **kwargs)
for router in self.sub_routers:
await router.emit_startup(*args, **kwargs)
async def emit_shutdown(self, *args: Any, **kwargs: Any) -> None:
"""
Recursively call shutdown callbacks to graceful shutdown
:param args:
:param kwargs:
:return:
"""
kwargs.update(router=self)
await self.shutdown.trigger(*args, **kwargs)
for router in self.sub_routers:
await router.emit_shutdown(*args, **kwargs)

View File

@@ -0,0 +1,71 @@
from .bot_command_scope_type import BotCommandScopeType
from .chat_action import ChatAction
from .chat_boost_source_type import ChatBoostSourceType
from .chat_member_status import ChatMemberStatus
from .chat_type import ChatType
from .content_type import ContentType
from .currency import Currency
from .dice_emoji import DiceEmoji
from .encrypted_passport_element import EncryptedPassportElement
from .inline_query_result_type import InlineQueryResultType
from .input_media_type import InputMediaType
from .input_paid_media_type import InputPaidMediaType
from .input_profile_photo_type import InputProfilePhotoType
from .input_story_content_type import InputStoryContentType
from .keyboard_button_poll_type_type import KeyboardButtonPollTypeType
from .mask_position_point import MaskPositionPoint
from .menu_button_type import MenuButtonType
from .message_entity_type import MessageEntityType
from .message_origin_type import MessageOriginType
from .owned_gift_type import OwnedGiftType
from .paid_media_type import PaidMediaType
from .parse_mode import ParseMode
from .passport_element_error_type import PassportElementErrorType
from .poll_type import PollType
from .reaction_type_type import ReactionTypeType
from .revenue_withdrawal_state_type import RevenueWithdrawalStateType
from .sticker_format import StickerFormat
from .sticker_type import StickerType
from .story_area_type_type import StoryAreaTypeType
from .topic_icon_color import TopicIconColor
from .transaction_partner_type import TransactionPartnerType
from .transaction_partner_user_transaction_type_enum import (
TransactionPartnerUserTransactionTypeEnum,
)
from .update_type import UpdateType
__all__ = (
"BotCommandScopeType",
"ChatAction",
"ChatBoostSourceType",
"ChatMemberStatus",
"ChatType",
"ContentType",
"Currency",
"DiceEmoji",
"EncryptedPassportElement",
"InlineQueryResultType",
"InputMediaType",
"InputPaidMediaType",
"InputProfilePhotoType",
"InputStoryContentType",
"KeyboardButtonPollTypeType",
"MaskPositionPoint",
"MenuButtonType",
"MessageEntityType",
"MessageOriginType",
"OwnedGiftType",
"PaidMediaType",
"ParseMode",
"PassportElementErrorType",
"PollType",
"ReactionTypeType",
"RevenueWithdrawalStateType",
"StickerFormat",
"StickerType",
"StoryAreaTypeType",
"TopicIconColor",
"TransactionPartnerType",
"TransactionPartnerUserTransactionTypeEnum",
"UpdateType",
)

View File

@@ -0,0 +1,17 @@
from enum import Enum
class BotCommandScopeType(str, Enum):
"""
This object represents the scope to which bot commands are applied.
Source: https://core.telegram.org/bots/api#botcommandscope
"""
DEFAULT = "default"
ALL_PRIVATE_CHATS = "all_private_chats"
ALL_GROUP_CHATS = "all_group_chats"
ALL_CHAT_ADMINISTRATORS = "all_chat_administrators"
CHAT = "chat"
CHAT_ADMINISTRATORS = "chat_administrators"
CHAT_MEMBER = "chat_member"

View File

@@ -0,0 +1,32 @@
from enum import Enum
class ChatAction(str, Enum):
"""
This object represents bot actions.
Choose one, depending on what the user is about to receive:
- typing for text messages,
- upload_photo for photos,
- record_video or upload_video for videos,
- record_voice or upload_voice for voice notes,
- upload_document for general files,
- choose_sticker for stickers,
- find_location for location data,
- record_video_note or upload_video_note for video notes.
Source: https://core.telegram.org/bots/api#sendchataction
"""
TYPING = "typing"
UPLOAD_PHOTO = "upload_photo"
RECORD_VIDEO = "record_video"
UPLOAD_VIDEO = "upload_video"
RECORD_VOICE = "record_voice"
UPLOAD_VOICE = "upload_voice"
UPLOAD_DOCUMENT = "upload_document"
CHOOSE_STICKER = "choose_sticker"
FIND_LOCATION = "find_location"
RECORD_VIDEO_NOTE = "record_video_note"
UPLOAD_VIDEO_NOTE = "upload_video_note"

View File

@@ -0,0 +1,13 @@
from enum import Enum
class ChatBoostSourceType(str, Enum):
"""
This object represents a type of chat boost source.
Source: https://core.telegram.org/bots/api#chatboostsource
"""
PREMIUM = "premium"
GIFT_CODE = "gift_code"
GIVEAWAY = "giveaway"

View File

@@ -0,0 +1,16 @@
from enum import Enum
class ChatMemberStatus(str, Enum):
"""
This object represents chat member status.
Source: https://core.telegram.org/bots/api#chatmember
"""
CREATOR = "creator"
ADMINISTRATOR = "administrator"
MEMBER = "member"
RESTRICTED = "restricted"
LEFT = "left"
KICKED = "kicked"

View File

@@ -0,0 +1,15 @@
from enum import Enum
class ChatType(str, Enum):
"""
This object represents a chat type
Source: https://core.telegram.org/bots/api#chat
"""
SENDER = "sender"
PRIVATE = "private"
GROUP = "group"
SUPERGROUP = "supergroup"
CHANNEL = "channel"

Some files were not shown because too many files have changed in this diff Show More