brief-extractor/backend/venv/lib/python3.10/site-packages/quart/utils.py
2026-03-06 18:42:46 +00:00

186 lines
5.7 KiB
Python
Executable file

from __future__ import annotations
import asyncio
import inspect
import os
import platform
import sys
from collections.abc import AsyncIterator
from collections.abc import Awaitable
from collections.abc import Coroutine
from collections.abc import Iterable
from collections.abc import Iterator
from contextvars import copy_context
from functools import partial
from functools import wraps
from pathlib import Path
from typing import Any
from typing import Callable
from typing import TYPE_CHECKING
from typing import TypeVar
from werkzeug.datastructures import Headers
from .typing import Event
from .typing import FilePath
if TYPE_CHECKING:
from .wrappers.response import Response # noqa: F401
class MustReloadError(Exception):
pass
def file_path_to_path(*paths: FilePath) -> Path:
# Flask supports bytes paths
safe_paths: list[str | os.PathLike] = []
for path in paths:
if isinstance(path, bytes):
safe_paths.append(path.decode())
else:
safe_paths.append(path)
return Path(*safe_paths)
def run_sync(func: Callable[..., Any]) -> Callable[..., Coroutine[None, None, Any]]:
"""Ensure that the sync function is run within the event loop.
If the *func* is not a coroutine it will be wrapped such that
it runs in the default executor (use loop.set_default_executor
to change). This ensures that synchronous functions do not
block the event loop.
"""
@wraps(func)
async def _wrapper(*args: Any, **kwargs: Any) -> Any:
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(
None, copy_context().run, partial(func, *args, **kwargs)
)
if inspect.isgenerator(result):
return run_sync_iterable(result)
else:
return result
_wrapper._quart_async_wrapper = True # type: ignore
return _wrapper
T = TypeVar("T")
def run_sync_iterable(iterable: Iterator[T]) -> AsyncIterator[T]:
async def _gen_wrapper() -> AsyncIterator[T]:
# Wrap the generator such that each iteration runs
# in the executor. Then rationalise the raised
# errors so that it ends.
def _inner() -> T:
# https://bugs.python.org/issue26221
# StopIteration errors are swallowed by the
# run_in_exector method
try:
return next(iterable)
except StopIteration as e:
raise StopAsyncIteration() from e
loop = asyncio.get_running_loop()
while True:
try:
yield await loop.run_in_executor(None, copy_context().run, _inner)
except StopAsyncIteration:
return
return _gen_wrapper()
def encode_headers(headers: Headers) -> list[tuple[bytes, bytes]]:
return [(key.lower().encode(), value.encode()) for key, value in headers.items()]
def decode_headers(headers: Iterable[tuple[bytes, bytes]]) -> Headers:
return Headers([(key.decode(), value.decode()) for key, value in headers])
async def observe_changes(
sleep: Callable[[float], Awaitable[Any]], shutdown_event: Event
) -> None:
last_updates: dict[Path, float] = {}
for module in list(sys.modules.values()):
filename = getattr(module, "__file__", None)
if filename is None:
continue
path = Path(filename)
try:
last_updates[Path(filename)] = path.stat().st_mtime
except (FileNotFoundError, NotADirectoryError):
pass
while not shutdown_event.is_set():
await sleep(1)
for index, (path, last_mtime) in enumerate(last_updates.items()):
if index % 10 == 0:
# Yield to the event loop
await sleep(0)
try:
mtime = path.stat().st_mtime
except FileNotFoundError as e:
# File deleted
raise MustReloadError() from e
else:
if mtime > last_mtime:
raise MustReloadError()
else:
last_updates[path] = mtime
def restart() -> None:
# Restart this process (only safe for dev/debug)
executable = sys.executable
script_path = Path(sys.argv[0]).resolve()
args = sys.argv[1:]
main_package = sys.modules["__main__"].__package__
if main_package is None:
# Executed by filename
if platform.system() == "Windows":
if not script_path.exists() and script_path.with_suffix(".exe").exists():
# quart run
executable = str(script_path.with_suffix(".exe"))
else:
# python run.py
args = [str(script_path), *args]
else:
if script_path.is_file() and os.access(script_path, os.X_OK):
# hypercorn run:app --reload
executable = str(script_path)
else:
# python run.py
args = [str(script_path), *args]
else:
# Executed as a module e.g. python -m run
module = script_path.stem
import_name = main_package
if module != "__main__":
import_name = f"{main_package}.{module}"
args[:0] = ["-m", import_name.lstrip(".")]
os.execv(executable, [executable] + args)
async def cancel_tasks(tasks: set[asyncio.Task]) -> None:
# Cancel any pending, and wait for the cancellation to
# complete i.e. finish any remaining work.
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
raise_task_exceptions(tasks)
def raise_task_exceptions(tasks: set[asyncio.Task]) -> None:
# Raise any unexpected exceptions
for task in tasks:
if not task.cancelled() and task.exception() is not None:
raise task.exception()