1828 lines
65 KiB
Python
Executable file
1828 lines
65 KiB
Python
Executable file
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import os
|
|
import signal
|
|
import sys
|
|
import warnings
|
|
from collections import defaultdict
|
|
from collections.abc import AsyncGenerator
|
|
from collections.abc import Awaitable
|
|
from collections.abc import Coroutine
|
|
from datetime import timedelta
|
|
from inspect import isasyncgen
|
|
from inspect import iscoroutinefunction as _inspect_iscoroutinefunction
|
|
from inspect import isgenerator
|
|
from types import TracebackType
|
|
from typing import Any
|
|
from typing import AnyStr
|
|
from typing import Callable
|
|
from typing import cast
|
|
from typing import NoReturn
|
|
from typing import Optional
|
|
from typing import overload
|
|
from typing import TypeVar
|
|
from urllib.parse import quote
|
|
|
|
from aiofiles import open as async_open
|
|
from aiofiles.base import AiofilesContextManager
|
|
from flask.sansio.app import App
|
|
from flask.sansio.scaffold import setupmethod
|
|
from hypercorn.asyncio import serve
|
|
from hypercorn.config import Config as HyperConfig
|
|
from hypercorn.typing import ASGIReceiveCallable
|
|
from hypercorn.typing import ASGISendCallable
|
|
from hypercorn.typing import Scope
|
|
from werkzeug.datastructures import Authorization
|
|
from werkzeug.datastructures import Headers
|
|
from werkzeug.datastructures import ImmutableDict
|
|
from werkzeug.exceptions import Aborter
|
|
from werkzeug.exceptions import BadRequestKeyError
|
|
from werkzeug.exceptions import HTTPException
|
|
from werkzeug.exceptions import InternalServerError
|
|
from werkzeug.routing import BuildError
|
|
from werkzeug.routing import MapAdapter
|
|
from werkzeug.routing import RoutingException
|
|
from werkzeug.wrappers import Response as WerkzeugResponse
|
|
|
|
from .asgi import ASGIHTTPConnection
|
|
from .asgi import ASGILifespan
|
|
from .asgi import ASGIWebsocketConnection
|
|
from .cli import AppGroup
|
|
from .config import Config
|
|
from .ctx import _AppCtxGlobals
|
|
from .ctx import AppContext
|
|
from .ctx import has_request_context
|
|
from .ctx import has_websocket_context
|
|
from .ctx import RequestContext
|
|
from .ctx import WebsocketContext
|
|
from .globals import _cv_app
|
|
from .globals import _cv_request
|
|
from .globals import _cv_websocket
|
|
from .globals import g
|
|
from .globals import request
|
|
from .globals import request_ctx
|
|
from .globals import session
|
|
from .globals import websocket
|
|
from .globals import websocket_ctx
|
|
from .helpers import get_debug_flag
|
|
from .helpers import get_flashed_messages
|
|
from .helpers import send_from_directory
|
|
from .routing import QuartMap
|
|
from .routing import QuartRule
|
|
from .sessions import SecureCookieSessionInterface
|
|
from .signals import appcontext_tearing_down
|
|
from .signals import got_background_exception
|
|
from .signals import got_request_exception
|
|
from .signals import got_serving_exception
|
|
from .signals import got_websocket_exception
|
|
from .signals import request_finished
|
|
from .signals import request_started
|
|
from .signals import request_tearing_down
|
|
from .signals import websocket_finished
|
|
from .signals import websocket_started
|
|
from .signals import websocket_tearing_down
|
|
from .templating import _default_template_ctx_processor
|
|
from .templating import Environment
|
|
from .testing import make_test_body_with_headers
|
|
from .testing import make_test_headers_path_and_query_string
|
|
from .testing import make_test_scope
|
|
from .testing import no_op_push
|
|
from .testing import QuartClient
|
|
from .testing import QuartCliRunner
|
|
from .testing import sentinel
|
|
from .testing import TestApp
|
|
from .typing import AfterServingCallable
|
|
from .typing import AfterWebsocketCallable
|
|
from .typing import ASGIHTTPProtocol
|
|
from .typing import ASGILifespanProtocol
|
|
from .typing import ASGIWebsocketProtocol
|
|
from .typing import BeforeServingCallable
|
|
from .typing import BeforeWebsocketCallable
|
|
from .typing import Event
|
|
from .typing import FilePath
|
|
from .typing import HeadersValue
|
|
from .typing import ResponseReturnValue
|
|
from .typing import ResponseTypes
|
|
from .typing import ShellContextProcessorCallable
|
|
from .typing import StatusCode
|
|
from .typing import TeardownCallable
|
|
from .typing import TemplateFilterCallable
|
|
from .typing import TemplateGlobalCallable
|
|
from .typing import TemplateTestCallable
|
|
from .typing import TestAppProtocol
|
|
from .typing import TestClientProtocol
|
|
from .typing import WebsocketCallable
|
|
from .typing import WhileServingCallable
|
|
from .utils import cancel_tasks
|
|
from .utils import file_path_to_path
|
|
from .utils import MustReloadError
|
|
from .utils import observe_changes
|
|
from .utils import restart
|
|
from .utils import run_sync
|
|
from .wrappers import BaseRequestWebsocket
|
|
from .wrappers import Request
|
|
from .wrappers import Response
|
|
from .wrappers import Websocket
|
|
|
|
if sys.version_info >= (3, 10):
|
|
from typing import ParamSpec
|
|
else:
|
|
from typing_extensions import ParamSpec
|
|
|
|
# Python 3.14 deprecated asyncio.iscoroutinefunction, but suggested
|
|
# inspect.iscoroutinefunction does not work correctly in some Python
|
|
# versions before 3.12.
|
|
# See https://github.com/python/cpython/issues/122858#issuecomment-2466239748
|
|
if sys.version_info >= (3, 12):
|
|
iscoroutinefunction = _inspect_iscoroutinefunction
|
|
else:
|
|
iscoroutinefunction = asyncio.iscoroutinefunction
|
|
|
|
AppOrBlueprintKey = Optional[str] # The App key is None, whereas blueprints are named
|
|
T_after_serving = TypeVar("T_after_serving", bound=AfterServingCallable)
|
|
T_after_websocket = TypeVar("T_after_websocket", bound=AfterWebsocketCallable)
|
|
T_before_serving = TypeVar("T_before_serving", bound=BeforeServingCallable)
|
|
T_before_websocket = TypeVar("T_before_websocket", bound=BeforeWebsocketCallable)
|
|
T_shell_context_processor = TypeVar(
|
|
"T_shell_context_processor", bound=ShellContextProcessorCallable
|
|
)
|
|
T_teardown = TypeVar("T_teardown", bound=TeardownCallable)
|
|
T_template_filter = TypeVar("T_template_filter", bound=TemplateFilterCallable)
|
|
T_template_global = TypeVar("T_template_global", bound=TemplateGlobalCallable)
|
|
T_template_test = TypeVar("T_template_test", bound=TemplateTestCallable)
|
|
T_websocket = TypeVar("T_websocket", bound=WebsocketCallable)
|
|
T_while_serving = TypeVar("T_while_serving", bound=WhileServingCallable)
|
|
|
|
T = TypeVar("T", bound=Any)
|
|
P = ParamSpec("P")
|
|
|
|
|
|
def _make_timedelta(value: timedelta | int | None) -> timedelta | None:
|
|
if value is None or isinstance(value, timedelta):
|
|
return value
|
|
|
|
return timedelta(seconds=value)
|
|
|
|
|
|
class Quart(App):
|
|
"""The web framework class, handles requests and returns responses.
|
|
|
|
The primary method from a serving viewpoint is
|
|
:meth:`~quart.app.Quart.handle_request`, from an application
|
|
viewpoint all the other methods are vital.
|
|
|
|
This can be extended in many ways, with most methods designed with
|
|
this in mind. Additionally any of the classes listed as attributes
|
|
can be replaced.
|
|
|
|
Attributes:
|
|
aborter_class: The class to use to raise HTTP error via the abort
|
|
helper function.
|
|
app_ctx_globals_class: The class to use for the ``g`` object
|
|
asgi_http_class: The class to use to handle the ASGI HTTP
|
|
protocol.
|
|
asgi_lifespan_class: The class to use to handle the ASGI
|
|
lifespan protocol.
|
|
asgi_websocket_class: The class to use to handle the ASGI
|
|
websocket protocol.
|
|
config_class: The class to use for the configuration.
|
|
env: The name of the environment the app is running on.
|
|
event_class: The class to use to signal an event in an async
|
|
manner.
|
|
debug: Wrapper around configuration DEBUG value, in many places
|
|
this will result in more output if True. If unset, debug
|
|
mode will be activated if environ is set to 'development'.
|
|
jinja_environment: The class to use for the jinja environment.
|
|
jinja_options: The default options to set when creating the jinja
|
|
environment.
|
|
permanent_session_lifetime: Wrapper around configuration
|
|
PERMANENT_SESSION_LIFETIME value. Specifies how long the session
|
|
data should survive.
|
|
request_class: The class to use for requests.
|
|
response_class: The class to user for responses.
|
|
secret_key: Warpper around configuration SECRET_KEY value. The app
|
|
secret for signing sessions.
|
|
session_interface: The class to use as the session interface.
|
|
shutdown_event: This event is set when the app starts to
|
|
shutdown allowing waiting tasks to know when to stop.
|
|
url_map_class: The class to map rules to endpoints.
|
|
url_rule_class: The class to use for URL rules.
|
|
websocket_class: The class to use for websockets.
|
|
|
|
"""
|
|
|
|
asgi_http_class: type[ASGIHTTPProtocol]
|
|
asgi_lifespan_class: type[ASGILifespanProtocol]
|
|
asgi_websocket_class: type[ASGIWebsocketProtocol]
|
|
shutdown_event: Event
|
|
test_app_class: type[TestAppProtocol]
|
|
test_client_class: type[TestClientProtocol] # type: ignore[assignment]
|
|
|
|
aborter_class = Aborter
|
|
app_ctx_globals_class = _AppCtxGlobals
|
|
asgi_http_class = ASGIHTTPConnection
|
|
asgi_lifespan_class = ASGILifespan
|
|
asgi_websocket_class = ASGIWebsocketConnection
|
|
config_class = Config
|
|
event_class = asyncio.Event
|
|
jinja_environment = Environment # type: ignore[assignment]
|
|
lock_class = asyncio.Lock
|
|
request_class = Request
|
|
response_class = Response
|
|
session_interface = SecureCookieSessionInterface()
|
|
test_app_class = TestApp
|
|
test_client_class = QuartClient # type: ignore[assignment]
|
|
test_cli_runner_class = QuartCliRunner # type: ignore
|
|
url_map_class = QuartMap
|
|
url_rule_class = QuartRule # type: ignore[assignment]
|
|
websocket_class = Websocket
|
|
|
|
default_config = ImmutableDict(
|
|
{
|
|
"APPLICATION_ROOT": "/",
|
|
"BACKGROUND_TASK_SHUTDOWN_TIMEOUT": 5, # Second
|
|
"BODY_TIMEOUT": 60, # Second
|
|
"DEBUG": None,
|
|
"ENV": None,
|
|
"EXPLAIN_TEMPLATE_LOADING": False,
|
|
"MAX_CONTENT_LENGTH": 16 * 1024 * 1024, # 16 MB Limit
|
|
"MAX_COOKIE_SIZE": 4093,
|
|
"MAX_FORM_MEMORY_SIZE": 500_000,
|
|
"MAX_FORM_PARTS": 1_000,
|
|
"PERMANENT_SESSION_LIFETIME": timedelta(days=31),
|
|
# Replaces PREFERRED_URL_SCHEME to allow for WebSocket scheme
|
|
"PREFER_SECURE_URLS": False,
|
|
"PRESERVE_CONTEXT_ON_EXCEPTION": None,
|
|
"PROPAGATE_EXCEPTIONS": None,
|
|
"PROVIDE_AUTOMATIC_OPTIONS": True,
|
|
"RESPONSE_TIMEOUT": 60, # Second
|
|
"SECRET_KEY": None,
|
|
"SEND_FILE_MAX_AGE_DEFAULT": timedelta(hours=12),
|
|
"SERVER_NAME": None,
|
|
"SESSION_COOKIE_DOMAIN": None,
|
|
"SESSION_COOKIE_HTTPONLY": True,
|
|
"SESSION_COOKIE_NAME": "session",
|
|
"SESSION_COOKIE_PATH": None,
|
|
"SESSION_COOKIE_SAMESITE": None,
|
|
"SESSION_COOKIE_SECURE": False,
|
|
"SESSION_REFRESH_EACH_REQUEST": True,
|
|
"TEMPLATES_AUTO_RELOAD": None,
|
|
"TESTING": False,
|
|
"TRAP_BAD_REQUEST_ERRORS": None,
|
|
"TRAP_HTTP_EXCEPTIONS": False,
|
|
}
|
|
)
|
|
|
|
def __init__(
|
|
self,
|
|
import_name: str,
|
|
static_url_path: str | None = None,
|
|
static_folder: str | None = "static",
|
|
static_host: str | None = None,
|
|
host_matching: bool = False,
|
|
subdomain_matching: bool = False,
|
|
template_folder: str | None = "templates",
|
|
instance_path: str | None = None,
|
|
instance_relative_config: bool = False,
|
|
root_path: str | None = None,
|
|
) -> None:
|
|
"""Construct a Quart web application.
|
|
|
|
Use to create a new web application to which requests should
|
|
be handled, as specified by the various attached url
|
|
rules. See also :class:`~quart.static.PackageStatic` for
|
|
additional constructor arguments.
|
|
|
|
Arguments:
|
|
import_name: The name at import of the application, use
|
|
``__name__`` unless there is a specific issue.
|
|
host_matching: Optionally choose to match the host to the
|
|
configured host on request (404 if no match).
|
|
instance_path: Optional path to an instance folder, for
|
|
deployment specific settings and files.
|
|
instance_relative_config: If True load the config from a
|
|
path relative to the instance path.
|
|
Attributes:
|
|
after_request_funcs: The functions to execute after a
|
|
request has been handled.
|
|
after_websocket_funcs: The functions to execute after a
|
|
websocket has been handled.
|
|
before_request_funcs: The functions to execute before handling
|
|
a request.
|
|
before_websocket_funcs: The functions to execute before handling
|
|
a websocket.
|
|
"""
|
|
super().__init__(
|
|
import_name,
|
|
static_url_path,
|
|
static_folder,
|
|
static_host,
|
|
host_matching,
|
|
subdomain_matching,
|
|
template_folder,
|
|
instance_path,
|
|
instance_relative_config,
|
|
root_path,
|
|
)
|
|
|
|
self.after_serving_funcs: list[Callable[[], Awaitable[None]]] = []
|
|
self.after_websocket_funcs: dict[
|
|
AppOrBlueprintKey, list[AfterWebsocketCallable]
|
|
] = defaultdict(list)
|
|
self.background_tasks: set[asyncio.Task] = set()
|
|
self.before_serving_funcs: list[Callable[[], Awaitable[None]]] = []
|
|
self.before_websocket_funcs: dict[
|
|
AppOrBlueprintKey, list[BeforeWebsocketCallable]
|
|
] = defaultdict(list)
|
|
self.teardown_websocket_funcs: dict[
|
|
AppOrBlueprintKey, list[TeardownCallable]
|
|
] = defaultdict(list)
|
|
self.while_serving_gens: list[AsyncGenerator[None, None]] = []
|
|
|
|
self.template_context_processors[None] = [_default_template_ctx_processor]
|
|
|
|
self.cli = AppGroup()
|
|
self.cli.name = self.name
|
|
|
|
if self.has_static_folder:
|
|
assert (
|
|
bool(static_host) == host_matching
|
|
), "Invalid static_host/host_matching combination"
|
|
|
|
self.add_url_rule(
|
|
f"{self.static_url_path}/<path:filename>",
|
|
"static",
|
|
self.send_static_file,
|
|
host=static_host,
|
|
)
|
|
|
|
def get_send_file_max_age(self, filename: str | None) -> int | None:
|
|
"""Used by :func:`send_file` to determine the ``max_age`` cache
|
|
value for a given file path if it wasn't passed.
|
|
|
|
By default, this returns :data:`SEND_FILE_MAX_AGE_DEFAULT` from
|
|
the configuration of :data:`~flask.current_app`. This defaults
|
|
to ``None``, which tells the browser to use conditional requests
|
|
instead of a timed cache, which is usually preferable.
|
|
|
|
Note this is a duplicate of the same method in the Quart
|
|
class.
|
|
|
|
"""
|
|
value = self.config["SEND_FILE_MAX_AGE_DEFAULT"]
|
|
|
|
if value is None:
|
|
return None
|
|
|
|
if isinstance(value, timedelta):
|
|
return int(value.total_seconds())
|
|
|
|
return value
|
|
return None
|
|
|
|
async def send_static_file(self, filename: str) -> Response:
|
|
if not self.has_static_folder:
|
|
raise RuntimeError("No static folder for this object")
|
|
return await send_from_directory(self.static_folder, filename)
|
|
|
|
async def open_resource(
|
|
self,
|
|
path: FilePath,
|
|
mode: str = "rb",
|
|
) -> AiofilesContextManager:
|
|
"""Open a file for reading.
|
|
|
|
Use as
|
|
|
|
.. code-block:: python
|
|
|
|
async with await app.open_resource(path) as file_:
|
|
await file_.read()
|
|
"""
|
|
if mode not in {"r", "rb", "rt"}:
|
|
raise ValueError("Files can only be opened for reading")
|
|
|
|
return async_open(os.path.join(self.root_path, path), mode) # type: ignore
|
|
|
|
async def open_instance_resource(
|
|
self, path: FilePath, mode: str = "rb"
|
|
) -> AiofilesContextManager:
|
|
"""Open a file for reading.
|
|
|
|
Use as
|
|
|
|
.. code-block:: python
|
|
|
|
async with await app.open_instance_resource(path) as file_:
|
|
await file_.read()
|
|
"""
|
|
return async_open(self.instance_path / file_path_to_path(path), mode) # type: ignore
|
|
|
|
def create_jinja_environment(self) -> Environment: # type: ignore
|
|
"""Create and return the jinja environment.
|
|
|
|
This will create the environment based on the
|
|
:attr:`jinja_options` and configuration settings. The
|
|
environment will include the Quart globals by default.
|
|
"""
|
|
options = dict(self.jinja_options)
|
|
if "autoescape" not in options:
|
|
options["autoescape"] = self.select_jinja_autoescape
|
|
if "auto_reload" not in options:
|
|
options["auto_reload"] = self.config["TEMPLATES_AUTO_RELOAD"]
|
|
jinja_env = self.jinja_environment(self, **options) # type: ignore
|
|
jinja_env.globals.update(
|
|
{
|
|
"config": self.config,
|
|
"g": g,
|
|
"get_flashed_messages": get_flashed_messages,
|
|
"request": request,
|
|
"session": session,
|
|
"url_for": self.url_for,
|
|
}
|
|
)
|
|
jinja_env.policies["json.dumps_function"] = self.json.dumps
|
|
return jinja_env
|
|
|
|
async def update_template_context(self, context: dict) -> None:
|
|
"""Update the provided template context.
|
|
|
|
This adds additional context from the various template context
|
|
processors.
|
|
|
|
Arguments:
|
|
context: The context to update (mutate).
|
|
"""
|
|
names = [None]
|
|
if has_request_context():
|
|
names.extend(reversed(request_ctx.request.blueprints)) # type: ignore
|
|
elif has_websocket_context():
|
|
names.extend(reversed(websocket_ctx.websocket.blueprints)) # type: ignore
|
|
|
|
extra_context: dict[str, Any] = {}
|
|
for name in names:
|
|
for processor in self.template_context_processors[name]:
|
|
extra_context.update(await self.ensure_async(processor)()) # type: ignore[call-overload]
|
|
|
|
original = context.copy()
|
|
context.update(extra_context)
|
|
context.update(original)
|
|
|
|
@setupmethod
|
|
def before_serving(
|
|
self,
|
|
func: T_before_serving,
|
|
) -> T_before_serving:
|
|
"""Add a before serving function.
|
|
|
|
This will allow the function provided to be called once before
|
|
anything is served (before any byte is received).
|
|
|
|
This is designed to be used as a decorator, if used to
|
|
decorate a synchronous function, the function will be wrapped
|
|
in :func:`~quart.utils.run_sync` and run in a thread executor
|
|
(with the wrapped function returned). An example usage,
|
|
|
|
.. code-block:: python
|
|
|
|
@app.before_serving
|
|
async def func():
|
|
...
|
|
|
|
Arguments:
|
|
func: The function itself.
|
|
"""
|
|
self.before_serving_funcs.append(func)
|
|
return func
|
|
|
|
@setupmethod
|
|
def while_serving(
|
|
self,
|
|
func: T_while_serving,
|
|
) -> T_while_serving:
|
|
"""Add a while serving generator function.
|
|
|
|
This will allow the generator provided to be invoked at
|
|
startup and then again at shutdown.
|
|
|
|
This is designed to be used as a decorator. An example usage,
|
|
|
|
.. code-block:: python
|
|
|
|
@app.while_serving
|
|
async def func():
|
|
... # Startup
|
|
yield
|
|
... # Shutdown
|
|
|
|
Arguments:
|
|
func: The function itself.
|
|
|
|
"""
|
|
self.while_serving_gens.append(func())
|
|
return func
|
|
|
|
@setupmethod
|
|
def after_serving(
|
|
self,
|
|
func: T_after_serving,
|
|
) -> T_after_serving:
|
|
"""Add a after serving function.
|
|
|
|
This will allow the function provided to be called once after
|
|
anything is served (after last byte is sent).
|
|
|
|
This is designed to be used as a decorator, if used to
|
|
decorate a synchronous function, the function will be wrapped
|
|
in :func:`~quart.utils.run_sync` and run in a thread executor
|
|
(with the wrapped function returned). An example usage,
|
|
|
|
.. code-block:: python
|
|
|
|
@app.after_serving
|
|
async def func():
|
|
...
|
|
|
|
Arguments:
|
|
func: The function itself.
|
|
"""
|
|
self.after_serving_funcs.append(func)
|
|
return func
|
|
|
|
def create_url_adapter(
|
|
self, request: BaseRequestWebsocket | None
|
|
) -> MapAdapter | None:
|
|
"""Create and return a URL adapter.
|
|
|
|
This will create the adapter based on the request if present
|
|
otherwise the app configuration.
|
|
"""
|
|
if request is not None:
|
|
subdomain = (
|
|
(self.url_map.default_subdomain or None)
|
|
if not self.subdomain_matching
|
|
else None
|
|
)
|
|
|
|
return self.url_map.bind_to_request( # type: ignore[attr-defined]
|
|
request, subdomain, self.config["SERVER_NAME"]
|
|
)
|
|
|
|
if self.config["SERVER_NAME"] is not None:
|
|
scheme = "https" if self.config["PREFER_SECURE_URLS"] else "http"
|
|
return self.url_map.bind(self.config["SERVER_NAME"], url_scheme=scheme)
|
|
return None
|
|
|
|
def websocket(
|
|
self,
|
|
rule: str,
|
|
**options: Any,
|
|
) -> Callable[[T_websocket], T_websocket]:
|
|
"""Add a websocket to the application.
|
|
|
|
This is designed to be used as a decorator, if used to
|
|
decorate a synchronous function, the function will be wrapped
|
|
in :func:`~quart.utils.run_sync` and run in a thread executor
|
|
(with the wrapped function returned). An example usage,
|
|
|
|
.. code-block:: python
|
|
|
|
@app.websocket('/')
|
|
async def websocket_route():
|
|
...
|
|
|
|
Arguments:
|
|
rule: The path to route on, should start with a ``/``.
|
|
endpoint: Optional endpoint name, if not present the
|
|
function name is used.
|
|
defaults: A dictionary of variables to provide automatically, use
|
|
to provide a simpler default path for a route, e.g. to allow
|
|
for ``/book`` rather than ``/book/0``,
|
|
|
|
.. code-block:: python
|
|
|
|
@app.websocket('/book', defaults={'page': 0})
|
|
@app.websocket('/book/<int:page>')
|
|
def book(page):
|
|
...
|
|
|
|
host: The full host name for this route (should include subdomain
|
|
if needed) - cannot be used with subdomain.
|
|
subdomain: A subdomain for this specific route.
|
|
strict_slashes: Strictly match the trailing slash present in the
|
|
path. Will redirect a leaf (no slash) to a branch (with slash).
|
|
"""
|
|
|
|
def decorator(func: T_websocket) -> T_websocket:
|
|
endpoint = options.pop("endpoint", None)
|
|
self.add_websocket(
|
|
rule,
|
|
endpoint,
|
|
func,
|
|
**options,
|
|
)
|
|
return func
|
|
|
|
return decorator
|
|
|
|
def add_websocket(
|
|
self,
|
|
rule: str,
|
|
endpoint: str | None = None,
|
|
view_func: WebsocketCallable | None = None,
|
|
**options: Any,
|
|
) -> None:
|
|
"""Add a websocket url rule to the application.
|
|
|
|
This is designed to be used on the application directly. An
|
|
example usage,
|
|
|
|
.. code-block:: python
|
|
|
|
def websocket_route():
|
|
...
|
|
|
|
app.add_websocket('/', websocket_route)
|
|
|
|
Arguments:
|
|
rule: The path to route on, should start with a ``/``.
|
|
endpoint: Optional endpoint name, if not present the
|
|
function name is used.
|
|
view_func: Callable that returns a response.
|
|
defaults: A dictionary of variables to provide automatically, use
|
|
to provide a simpler default path for a route, e.g. to allow
|
|
for ``/book`` rather than ``/book/0``,
|
|
|
|
.. code-block:: python
|
|
|
|
@app.websocket('/book', defaults={'page': 0})
|
|
@app.websocket('/book/<int:page>')
|
|
def book(page):
|
|
...
|
|
|
|
host: The full host name for this route (should include subdomain
|
|
if needed) - cannot be used with subdomain.
|
|
subdomain: A subdomain for this specific route.
|
|
strict_slashes: Strictly match the trailing slash present in the
|
|
path. Will redirect a leaf (no slash) to a branch (with slash).
|
|
"""
|
|
return self.add_url_rule(
|
|
rule,
|
|
endpoint,
|
|
view_func,
|
|
methods={"GET"},
|
|
websocket=True,
|
|
**options,
|
|
)
|
|
|
|
def url_for(
|
|
self,
|
|
endpoint: str,
|
|
*,
|
|
_anchor: str | None = None,
|
|
_external: bool | None = None,
|
|
_method: str | None = None,
|
|
_scheme: str | None = None,
|
|
**values: Any,
|
|
) -> str:
|
|
"""Return the url for a specific endpoint.
|
|
|
|
This is most useful in templates and redirects to create a URL
|
|
that can be used in the browser.
|
|
|
|
Arguments:
|
|
endpoint: The endpoint to build a url for, if prefixed with
|
|
``.`` it targets endpoint's in the current blueprint.
|
|
_anchor: Additional anchor text to append (i.e. #text).
|
|
_external: Return an absolute url for external (to app) usage.
|
|
_method: The method to consider alongside the endpoint.
|
|
_scheme: A specific scheme to use.
|
|
values: The values to build into the URL, as specified in
|
|
the endpoint rule.
|
|
"""
|
|
|
|
app_context = _cv_app.get(None)
|
|
request_context = _cv_request.get(None)
|
|
websocket_context = _cv_websocket.get(None)
|
|
|
|
if request_context is not None:
|
|
url_adapter = request_context.url_adapter
|
|
if endpoint.startswith("."):
|
|
if request.blueprint is not None:
|
|
endpoint = request.blueprint + endpoint
|
|
else:
|
|
endpoint = endpoint[1:]
|
|
if _external is None:
|
|
_external = _scheme is not None
|
|
elif websocket_context is not None:
|
|
url_adapter = websocket_context.url_adapter
|
|
if endpoint.startswith("."):
|
|
if websocket.blueprint is not None:
|
|
endpoint = websocket.blueprint + endpoint
|
|
else:
|
|
endpoint = endpoint[1:]
|
|
if _external is None:
|
|
_external = _scheme is not None
|
|
elif app_context is not None:
|
|
url_adapter = app_context.url_adapter
|
|
if _external is None:
|
|
_external = True
|
|
else:
|
|
url_adapter = self.create_url_adapter(None)
|
|
if _external is None:
|
|
_external = True
|
|
|
|
if url_adapter is None:
|
|
raise RuntimeError(
|
|
"Unable to create a url adapter, try setting the SERVER_NAME"
|
|
" config variable."
|
|
)
|
|
if _scheme is not None and not _external:
|
|
raise ValueError("External must be True for scheme usage")
|
|
|
|
self.inject_url_defaults(endpoint, values)
|
|
|
|
old_scheme = None
|
|
if _scheme is not None:
|
|
old_scheme = url_adapter.url_scheme
|
|
url_adapter.url_scheme = _scheme
|
|
|
|
try:
|
|
url = url_adapter.build(
|
|
endpoint, values, method=_method, force_external=_external
|
|
)
|
|
except BuildError as error:
|
|
return self.handle_url_build_error(error, endpoint, values)
|
|
finally:
|
|
if old_scheme is not None:
|
|
url_adapter.url_scheme = old_scheme
|
|
|
|
if _anchor is not None:
|
|
quoted_anchor = quote(_anchor, safe="%!#$&'()*+,/:;=?@")
|
|
url = f"{url}#{quoted_anchor}"
|
|
return url
|
|
|
|
def make_shell_context(self) -> dict:
|
|
"""Create a context for interactive shell usage.
|
|
|
|
The :attr:`shell_context_processors` can be used to add
|
|
additional context.
|
|
"""
|
|
context = {"app": self, "g": g}
|
|
for processor in self.shell_context_processors:
|
|
context.update(processor())
|
|
return context
|
|
|
|
def run(
|
|
self,
|
|
host: str | None = None,
|
|
port: int | None = None,
|
|
debug: bool | None = None,
|
|
use_reloader: bool = True,
|
|
loop: asyncio.AbstractEventLoop | None = None,
|
|
ca_certs: str | None = None,
|
|
certfile: str | None = None,
|
|
keyfile: str | None = None,
|
|
**kwargs: Any,
|
|
) -> None:
|
|
"""Run this application.
|
|
|
|
This is best used for development only, see Hypercorn for
|
|
production servers.
|
|
|
|
Arguments:
|
|
host: Hostname to listen on. By default this is loopback
|
|
only, use 0.0.0.0 to have the server listen externally.
|
|
port: Port number to listen on.
|
|
debug: If set enable (or disable) debug mode and debug output.
|
|
use_reloader: Automatically reload on code changes.
|
|
loop: Asyncio loop to create the server in, if None, take default one.
|
|
If specified it is the caller's responsibility to close and cleanup the
|
|
loop.
|
|
ca_certs: Path to the SSL CA certificate file.
|
|
certfile: Path to the SSL certificate file.
|
|
keyfile: Path to the SSL key file.
|
|
"""
|
|
if kwargs:
|
|
warnings.warn(
|
|
f"Additional arguments, {','.join(kwargs.keys())}, are not supported.\n"
|
|
"They may be supported by Hypercorn, which is the ASGI server Quart "
|
|
"uses by default. This method is meant for development and debugging.",
|
|
stacklevel=2,
|
|
)
|
|
|
|
if loop is None:
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
|
|
if "QUART_DEBUG" in os.environ:
|
|
self.debug = get_debug_flag()
|
|
|
|
if debug is not None:
|
|
self.debug = debug
|
|
|
|
loop.set_debug(self.debug)
|
|
|
|
shutdown_event = asyncio.Event()
|
|
|
|
def _signal_handler(*_: Any) -> None:
|
|
shutdown_event.set()
|
|
|
|
for signal_name in {"SIGINT", "SIGTERM", "SIGBREAK"}:
|
|
if hasattr(signal, signal_name):
|
|
try:
|
|
loop.add_signal_handler(
|
|
getattr(signal, signal_name), _signal_handler
|
|
)
|
|
except NotImplementedError:
|
|
# Add signal handler may not be implemented on Windows
|
|
signal.signal(getattr(signal, signal_name), _signal_handler)
|
|
|
|
server_name = self.config.get("SERVER_NAME")
|
|
sn_host = None
|
|
sn_port = None
|
|
if server_name is not None:
|
|
sn_host, _, sn_port = server_name.partition(":")
|
|
|
|
if host is None:
|
|
host = sn_host or "127.0.0.1"
|
|
|
|
if port is None:
|
|
port = int(sn_port or "5000")
|
|
|
|
task = self.run_task(
|
|
host,
|
|
port,
|
|
debug,
|
|
ca_certs,
|
|
certfile,
|
|
keyfile,
|
|
shutdown_trigger=shutdown_event.wait, # type: ignore
|
|
)
|
|
print(f" * Serving Quart app '{self.name}'") # noqa: T201
|
|
print(f" * Debug mode: {self.debug or False}") # noqa: T201
|
|
print(" * Please use an ASGI server (e.g. Hypercorn) directly in production") # noqa: T201
|
|
scheme = "https" if certfile is not None and keyfile is not None else "http"
|
|
print(f" * Running on {scheme}://{host}:{port} (CTRL + C to quit)") # noqa: T201
|
|
|
|
tasks = [loop.create_task(task)]
|
|
|
|
if use_reloader:
|
|
tasks.append(
|
|
loop.create_task(observe_changes(asyncio.sleep, shutdown_event))
|
|
)
|
|
|
|
reload_ = False
|
|
try:
|
|
loop.run_until_complete(asyncio.gather(*tasks))
|
|
except MustReloadError:
|
|
reload_ = True
|
|
finally:
|
|
try:
|
|
_cancel_all_tasks(loop)
|
|
loop.run_until_complete(loop.shutdown_asyncgens())
|
|
finally:
|
|
asyncio.set_event_loop(None)
|
|
loop.close()
|
|
|
|
if reload_:
|
|
restart()
|
|
|
|
def run_task(
|
|
self,
|
|
host: str = "127.0.0.1",
|
|
port: int = 5000,
|
|
debug: bool | None = None,
|
|
ca_certs: str | None = None,
|
|
certfile: str | None = None,
|
|
keyfile: str | None = None,
|
|
shutdown_trigger: Callable[..., Awaitable[None]] | None = None,
|
|
) -> Coroutine[None, None, None]:
|
|
"""Return a task that when awaited runs this application.
|
|
|
|
This is best used for development only, see Hypercorn for
|
|
production servers.
|
|
|
|
Arguments:
|
|
host: Hostname to listen on. By default this is loopback
|
|
only, use 0.0.0.0 to have the server listen externally.
|
|
port: Port number to listen on.
|
|
debug: If set enable (or disable) debug mode and debug output.
|
|
ca_certs: Path to the SSL CA certificate file.
|
|
certfile: Path to the SSL certificate file.
|
|
keyfile: Path to the SSL key file.
|
|
|
|
"""
|
|
config = HyperConfig()
|
|
config.access_log_format = "%(h)s %(r)s %(s)s %(b)s %(D)s"
|
|
config.accesslog = "-"
|
|
config.bind = [f"{host}:{port}"]
|
|
config.ca_certs = ca_certs
|
|
config.certfile = certfile
|
|
if debug is not None:
|
|
self.debug = debug
|
|
config.errorlog = config.accesslog
|
|
config.keyfile = keyfile
|
|
|
|
return serve(self, config, shutdown_trigger=shutdown_trigger)
|
|
|
|
def test_client(
|
|
self, use_cookies: bool = True, **kwargs: Any
|
|
) -> TestClientProtocol:
|
|
"""Creates and returns a test client."""
|
|
return self.test_client_class(self, use_cookies=use_cookies, **kwargs)
|
|
|
|
def test_cli_runner(self, **kwargs: Any) -> QuartCliRunner:
|
|
"""Creates and returns a CLI test runner."""
|
|
return self.test_cli_runner_class(self, **kwargs) # type: ignore
|
|
|
|
@setupmethod
|
|
def before_websocket(
|
|
self,
|
|
func: T_before_websocket,
|
|
) -> T_before_websocket:
|
|
"""Add a before websocket function.
|
|
|
|
This is designed to be used as a decorator, if used to
|
|
decorate a synchronous function, the function will be wrapped
|
|
in :func:`~quart.utils.run_sync` and run in a thread executor
|
|
(with the wrapped function returned). An example usage,
|
|
|
|
.. code-block:: python
|
|
|
|
@app.before_websocket
|
|
async def func():
|
|
...
|
|
|
|
Arguments:
|
|
func: The before websocket function itself.
|
|
"""
|
|
self.before_websocket_funcs[None].append(func)
|
|
return func
|
|
|
|
@setupmethod
|
|
def after_websocket(
|
|
self,
|
|
func: T_after_websocket,
|
|
) -> T_after_websocket:
|
|
"""Add an after websocket function.
|
|
|
|
This is designed to be used as a decorator, if used to
|
|
decorate a synchronous function, the function will be wrapped
|
|
in :func:`~quart.utils.run_sync` and run in a thread executor
|
|
(with the wrapped function returned). An example usage,
|
|
|
|
.. code-block:: python
|
|
|
|
@app.after_websocket
|
|
async def func(response):
|
|
return response
|
|
|
|
Arguments:
|
|
func: The after websocket function itself.
|
|
"""
|
|
self.after_websocket_funcs[None].append(func)
|
|
return func
|
|
|
|
@setupmethod
|
|
def teardown_websocket(
|
|
self,
|
|
func: T_teardown,
|
|
) -> T_teardown:
|
|
"""Add a teardown websocket function.
|
|
This is designed to be used as a decorator, if used to
|
|
decorate a synchronous function, the function will be wrapped
|
|
in :func:`~quart.utils.run_sync` and run in a thread executor
|
|
(with the wrapped function returned). An example usage,
|
|
.. code-block:: python
|
|
@app.teardown_websocket
|
|
async def func():
|
|
...
|
|
Arguments:
|
|
func: The teardown websocket function itself.
|
|
"""
|
|
self.teardown_websocket_funcs[None].append(func)
|
|
return func
|
|
|
|
async def handle_http_exception(
|
|
self, error: HTTPException
|
|
) -> HTTPException | ResponseReturnValue:
|
|
"""Handle a HTTPException subclass error.
|
|
|
|
This will attempt to find a handler for the error and if fails
|
|
will fall back to the error response.
|
|
"""
|
|
if error.code is None:
|
|
return error
|
|
|
|
if isinstance(error, RoutingException):
|
|
return error
|
|
|
|
blueprints = []
|
|
if has_request_context():
|
|
blueprints = request.blueprints
|
|
elif has_websocket_context():
|
|
blueprints = websocket.blueprints
|
|
|
|
handler = self._find_error_handler(error, blueprints)
|
|
if handler is None:
|
|
return error
|
|
else:
|
|
return await self.ensure_async(handler)(error) # type: ignore[return-value]
|
|
|
|
async def handle_user_exception(
|
|
self, error: Exception
|
|
) -> HTTPException | ResponseReturnValue:
|
|
"""Handle an exception that has been raised.
|
|
|
|
This should forward :class:`~quart.exception.HTTPException` to
|
|
:meth:`handle_http_exception`, then attempt to handle the
|
|
error. If it cannot it should reraise the error.
|
|
"""
|
|
if isinstance(error, BadRequestKeyError) and (
|
|
self.debug or self.config["TRAP_BAD_REQUEST_ERRORS"]
|
|
):
|
|
error.show_exception = True
|
|
|
|
if isinstance(error, HTTPException) and not self.trap_http_exception(error):
|
|
return await self.handle_http_exception(error)
|
|
|
|
blueprints = []
|
|
if has_request_context():
|
|
blueprints = request.blueprints
|
|
elif has_websocket_context():
|
|
blueprints = websocket.blueprints
|
|
|
|
handler = self._find_error_handler(error, blueprints)
|
|
if handler is None:
|
|
raise error
|
|
return await self.ensure_async(handler)(error) # type: ignore[return-value]
|
|
|
|
async def handle_exception(self, error: Exception) -> ResponseTypes:
|
|
"""Handle an uncaught exception.
|
|
|
|
By default this switches the error response to a 500 internal
|
|
server error.
|
|
"""
|
|
exc_info = sys.exc_info()
|
|
await got_request_exception.send_async(
|
|
self,
|
|
_sync_wrapper=self.ensure_async, # type: ignore[arg-type]
|
|
exception=error,
|
|
)
|
|
propagate = self.config["PROPAGATE_EXCEPTIONS"]
|
|
|
|
if propagate is None:
|
|
propagate = self.testing or self.debug
|
|
|
|
if propagate:
|
|
# Re-raise if called with an active exception, otherwise
|
|
# raise the passed in exception.
|
|
if exc_info[1] is error:
|
|
raise
|
|
|
|
raise error
|
|
|
|
self.log_exception(exc_info)
|
|
server_error: InternalServerError | ResponseReturnValue
|
|
server_error = InternalServerError(original_exception=error)
|
|
handler = self._find_error_handler(server_error, request.blueprints)
|
|
|
|
if handler is not None:
|
|
server_error = await self.ensure_async(handler)(server_error) # type: ignore[assignment]
|
|
|
|
return await self.finalize_request(server_error, from_error_handler=True)
|
|
|
|
async def handle_websocket_exception(
|
|
self, error: Exception
|
|
) -> ResponseTypes | None:
|
|
"""Handle an uncaught exception.
|
|
|
|
By default this logs the exception and then re-raises it.
|
|
"""
|
|
exc_info = sys.exc_info()
|
|
await got_websocket_exception.send_async(
|
|
self,
|
|
_sync_wrapper=self.ensure_async, # type: ignore[arg-type]
|
|
exception=error,
|
|
)
|
|
propagate = self.config["PROPAGATE_EXCEPTIONS"]
|
|
|
|
if propagate is None:
|
|
propagate = self.testing or self.debug
|
|
|
|
if propagate:
|
|
# Re-raise if called with an active exception, otherwise
|
|
# raise the passed in exception.
|
|
if exc_info[1] is error:
|
|
raise
|
|
|
|
raise error
|
|
|
|
self.log_exception(exc_info)
|
|
server_error: InternalServerError | ResponseReturnValue
|
|
server_error = InternalServerError(original_exception=error)
|
|
handler = self._find_error_handler(server_error, websocket.blueprints)
|
|
|
|
if handler is not None:
|
|
server_error = await self.ensure_async(handler)(server_error) # type: ignore[assignment]
|
|
|
|
return await self.finalize_websocket(server_error, from_error_handler=True)
|
|
|
|
def log_exception(
|
|
self,
|
|
exception_info: (
|
|
tuple[type, BaseException, TracebackType] | tuple[None, None, None]
|
|
),
|
|
) -> None:
|
|
"""Log a exception to the :attr:`logger`.
|
|
|
|
By default this is only invoked for unhandled exceptions.
|
|
"""
|
|
if has_request_context():
|
|
request_ = request_ctx.request
|
|
self.logger.error(
|
|
f"Exception on request {request_.method} {request_.path}",
|
|
exc_info=exception_info,
|
|
)
|
|
elif has_websocket_context():
|
|
websocket_ = websocket_ctx.websocket
|
|
self.logger.error(
|
|
f"Exception on websocket {websocket_.path}", exc_info=exception_info
|
|
)
|
|
else:
|
|
self.logger.error("Exception", exc_info=exception_info)
|
|
|
|
@overload
|
|
def ensure_async(
|
|
self, func: Callable[P, Awaitable[T]]
|
|
) -> Callable[P, Awaitable[T]]: ...
|
|
|
|
@overload
|
|
def ensure_async(self, func: Callable[P, T]) -> Callable[P, Awaitable[T]]: ...
|
|
|
|
def ensure_async(
|
|
self, func: Callable[P, Awaitable[T]] | Callable[P, T]
|
|
) -> Callable[P, Awaitable[T]]:
|
|
"""Ensure that the returned func is async and calls the func.
|
|
|
|
.. versionadded:: 0.11
|
|
|
|
Override if you wish to change how synchronous functions are
|
|
run. Before Quart 0.11 this did not run the synchronous code
|
|
in an executor.
|
|
"""
|
|
if iscoroutinefunction(func):
|
|
return func
|
|
else:
|
|
return self.sync_to_async(cast(Callable[P, T], func))
|
|
|
|
def sync_to_async(self, func: Callable[P, T]) -> Callable[P, Awaitable[T]]:
|
|
"""Return a async function that will run the synchronous function *func*.
|
|
|
|
This can be used as so,::
|
|
|
|
result = await app.sync_to_async(func)(*args, **kwargs)
|
|
|
|
Override this method to change how the app converts sync code
|
|
to be asynchronously callable.
|
|
"""
|
|
return run_sync(func)
|
|
|
|
async def do_teardown_request(
|
|
self, exc: BaseException | None, request_context: RequestContext | None = None
|
|
) -> None:
|
|
"""Teardown the request, calling the teardown functions.
|
|
|
|
Arguments:
|
|
exc: Any exception not handled that has caused the request
|
|
to teardown.
|
|
request_context: The request context, optional as Flask
|
|
omits this argument.
|
|
"""
|
|
names = [*(request_context or request_ctx).request.blueprints, None]
|
|
for name in names:
|
|
for function in reversed(self.teardown_request_funcs[name]):
|
|
await self.ensure_async(function)(exc)
|
|
|
|
await request_tearing_down.send_async(
|
|
self,
|
|
_sync_wrapper=self.ensure_async, # type: ignore[arg-type]
|
|
exc=exc,
|
|
)
|
|
|
|
async def do_teardown_websocket(
|
|
self,
|
|
exc: BaseException | None,
|
|
websocket_context: WebsocketContext | None = None,
|
|
) -> None:
|
|
"""Teardown the websocket, calling the teardown functions.
|
|
|
|
Arguments:
|
|
exc: Any exception not handled that has caused the websocket
|
|
to teardown.
|
|
websocket_context: The websocket context, optional as Flask
|
|
omits this argument.
|
|
"""
|
|
names = [*(websocket_context or websocket_ctx).websocket.blueprints, None]
|
|
for name in names:
|
|
for function in reversed(self.teardown_websocket_funcs[name]):
|
|
await self.ensure_async(function)(exc)
|
|
|
|
await websocket_tearing_down.send_async(
|
|
self,
|
|
_sync_wrapper=self.ensure_async, # type: ignore[arg-type]
|
|
exc=exc,
|
|
)
|
|
|
|
async def do_teardown_appcontext(self, exc: BaseException | None) -> None:
|
|
"""Teardown the app (context), calling the teardown functions."""
|
|
for function in self.teardown_appcontext_funcs:
|
|
await self.ensure_async(function)(exc)
|
|
await appcontext_tearing_down.send_async(
|
|
self,
|
|
_sync_wrapper=self.ensure_async, # type: ignore[arg-type]
|
|
exc=exc,
|
|
)
|
|
|
|
def app_context(self) -> AppContext:
|
|
"""Create and return an app context.
|
|
|
|
This is best used within a context, i.e.
|
|
|
|
.. code-block:: python
|
|
|
|
async with app.app_context():
|
|
...
|
|
"""
|
|
return AppContext(self)
|
|
|
|
def request_context(self, request: Request) -> RequestContext:
|
|
"""Create and return a request context.
|
|
|
|
Use the :meth:`test_request_context` whilst testing. This is
|
|
best used within a context, i.e.
|
|
|
|
.. code-block:: python
|
|
|
|
async with app.request_context(request):
|
|
...
|
|
|
|
Arguments:
|
|
request: A request to build a context around.
|
|
"""
|
|
return RequestContext(self, request)
|
|
|
|
def websocket_context(self, websocket: Websocket) -> WebsocketContext:
|
|
"""Create and return a websocket context.
|
|
|
|
Use the :meth:`test_websocket_context` whilst testing. This is
|
|
best used within a context, i.e.
|
|
|
|
.. code-block:: python
|
|
|
|
async with app.websocket_context(websocket):
|
|
...
|
|
|
|
Arguments:
|
|
websocket: A websocket to build a context around.
|
|
"""
|
|
return WebsocketContext(self, websocket)
|
|
|
|
def test_app(self) -> TestAppProtocol:
|
|
return self.test_app_class(self)
|
|
|
|
def test_request_context(
|
|
self,
|
|
path: str,
|
|
*,
|
|
method: str = "GET",
|
|
headers: dict | Headers | None = None,
|
|
query_string: dict | None = None,
|
|
scheme: str = "http",
|
|
send_push_promise: Callable[[str, Headers], Awaitable[None]] = no_op_push,
|
|
data: AnyStr | None = None,
|
|
form: dict | None = None,
|
|
json: Any = sentinel,
|
|
root_path: str = "",
|
|
http_version: str = "1.1",
|
|
scope_base: dict | None = None,
|
|
auth: Authorization | tuple[str, str] | None = None,
|
|
subdomain: str | None = None,
|
|
) -> RequestContext:
|
|
"""Create a request context for testing purposes.
|
|
|
|
This is best used for testing code within request contexts. It
|
|
is a simplified wrapper of :meth:`request_context`. It is best
|
|
used in a with block, i.e.
|
|
|
|
.. code-block:: python
|
|
|
|
async with app.test_request_context("/", method="GET"):
|
|
...
|
|
|
|
Arguments:
|
|
path: Request path.
|
|
method: HTTP verb
|
|
headers: Headers to include in the request.
|
|
query_string: To send as a dictionary, alternatively the
|
|
query_string can be determined from the path.
|
|
scheme: Scheme for the request, default http.
|
|
"""
|
|
headers, path, query_string_bytes = make_test_headers_path_and_query_string(
|
|
self,
|
|
path,
|
|
headers,
|
|
query_string,
|
|
auth,
|
|
subdomain,
|
|
)
|
|
request_body, body_headers = make_test_body_with_headers(
|
|
data=data, form=form, json=json
|
|
)
|
|
headers.update(**body_headers)
|
|
scope = make_test_scope(
|
|
"http",
|
|
path,
|
|
method,
|
|
headers,
|
|
query_string_bytes,
|
|
scheme,
|
|
root_path,
|
|
http_version,
|
|
scope_base,
|
|
)
|
|
request = self.request_class(
|
|
method,
|
|
scheme,
|
|
path,
|
|
query_string_bytes,
|
|
headers,
|
|
root_path,
|
|
http_version,
|
|
send_push_promise=send_push_promise,
|
|
scope=scope,
|
|
)
|
|
request.body.set_result(request_body)
|
|
return self.request_context(request)
|
|
|
|
def add_background_task(self, func: Callable, *args: Any, **kwargs: Any) -> None:
|
|
async def _wrapper() -> None:
|
|
try:
|
|
async with self.app_context():
|
|
await self.ensure_async(func)(*args, **kwargs)
|
|
except Exception as error:
|
|
await self.handle_background_exception(error)
|
|
|
|
task = asyncio.get_event_loop().create_task(_wrapper())
|
|
self.background_tasks.add(task)
|
|
task.add_done_callback(self.background_tasks.discard)
|
|
|
|
async def handle_background_exception(self, error: Exception) -> None:
|
|
await got_background_exception.send_async(
|
|
self,
|
|
_sync_wrapper=self.ensure_async, # type: ignore[arg-type]
|
|
exception=error,
|
|
)
|
|
|
|
self.log_exception(sys.exc_info())
|
|
|
|
async def make_default_options_response(self) -> Response:
|
|
"""This is the default route function for OPTIONS requests."""
|
|
methods = request_ctx.url_adapter.allowed_methods()
|
|
return self.response_class("", headers={"Allow": ", ".join(methods)})
|
|
|
|
async def make_response(
|
|
self, result: ResponseReturnValue | HTTPException
|
|
) -> ResponseTypes:
|
|
"""Make a Response from the result of the route handler.
|
|
|
|
The result itself can either be:
|
|
- A Response object (or subclass).
|
|
- A tuple of a ResponseValue and a header dictionary.
|
|
- A tuple of a ResponseValue, status code and a header dictionary.
|
|
|
|
A ResponseValue is either a Response object (or subclass) or a str.
|
|
"""
|
|
headers: HeadersValue | None = None
|
|
status: StatusCode | None = None
|
|
if isinstance(result, tuple):
|
|
if len(result) == 3:
|
|
value, status, headers = result
|
|
elif len(result) == 2:
|
|
value, status_or_headers = result
|
|
|
|
if isinstance(status_or_headers, (Headers, dict, list)):
|
|
headers = status_or_headers
|
|
status = None
|
|
elif status_or_headers is not None:
|
|
status = status_or_headers # type: ignore[assignment]
|
|
else:
|
|
raise TypeError(
|
|
"""The response value returned must be either (body, status), (body,
|
|
headers), or (body, status, headers)"""
|
|
)
|
|
else:
|
|
value = result # type: ignore[assignment]
|
|
|
|
if value is None:
|
|
raise TypeError(
|
|
"The response value returned by the view function cannot be None"
|
|
)
|
|
|
|
response: ResponseTypes
|
|
if isinstance(value, HTTPException):
|
|
response = value.get_response() # type: ignore
|
|
elif not isinstance(value, (Response, WerkzeugResponse)):
|
|
if (
|
|
isinstance(value, (str, bytes, bytearray))
|
|
or isgenerator(value)
|
|
or isasyncgen(value)
|
|
):
|
|
response = self.response_class(value)
|
|
elif isinstance(value, (list, dict)):
|
|
response = self.json.response(value) # type: ignore[assignment]
|
|
else:
|
|
raise TypeError(
|
|
f"The response value type ({type(value).__name__}) is not valid"
|
|
)
|
|
else:
|
|
response = value
|
|
|
|
if status is not None:
|
|
response.status_code = int(status)
|
|
|
|
if headers is not None:
|
|
response.headers.update(headers)
|
|
|
|
return response
|
|
|
|
async def handle_request(self, request: Request) -> ResponseTypes:
|
|
async with self.request_context(request) as request_context:
|
|
try:
|
|
return await self.full_dispatch_request(request_context)
|
|
except asyncio.CancelledError:
|
|
raise # CancelledErrors should be handled by serving code.
|
|
except Exception as error:
|
|
return await self.handle_exception(error)
|
|
finally:
|
|
if request.scope.get("_quart._preserve_context", False):
|
|
self._preserved_context = request_context.copy()
|
|
|
|
async def handle_websocket(self, websocket: Websocket) -> ResponseTypes | None:
|
|
async with self.websocket_context(websocket) as websocket_context:
|
|
try:
|
|
return await self.full_dispatch_websocket(websocket_context)
|
|
except asyncio.CancelledError:
|
|
raise # CancelledErrors should be handled by serving code.
|
|
except Exception as error:
|
|
return await self.handle_websocket_exception(error)
|
|
finally:
|
|
if websocket.scope.get("_quart._preserve_context", False):
|
|
self._preserved_context = websocket_context.copy()
|
|
|
|
async def full_dispatch_request(
|
|
self, request_context: RequestContext | None = None
|
|
) -> ResponseTypes:
|
|
"""Adds pre and post processing to the request dispatching.
|
|
|
|
Arguments:
|
|
request_context: The request context, optional as Flask
|
|
omits this argument.
|
|
"""
|
|
try:
|
|
await request_started.send_async(self, _sync_wrapper=self.ensure_async) # type: ignore
|
|
|
|
result: ResponseReturnValue | HTTPException | None
|
|
result = await self.preprocess_request(request_context)
|
|
if result is None:
|
|
result = await self.dispatch_request(request_context)
|
|
except Exception as error:
|
|
result = await self.handle_user_exception(error)
|
|
return await self.finalize_request(result, request_context)
|
|
|
|
async def full_dispatch_websocket(
|
|
self, websocket_context: WebsocketContext | None = None
|
|
) -> ResponseTypes | None:
|
|
"""Adds pre and post processing to the websocket dispatching.
|
|
|
|
Arguments:
|
|
websocket_context: The websocket context, optional to match
|
|
the Flask convention.
|
|
"""
|
|
try:
|
|
await websocket_started.send_async(
|
|
self,
|
|
_sync_wrapper=self.ensure_async, # type: ignore
|
|
)
|
|
|
|
result: ResponseReturnValue | HTTPException | None
|
|
result = await self.preprocess_websocket(websocket_context)
|
|
if result is None:
|
|
result = await self.dispatch_websocket(websocket_context)
|
|
except Exception as error:
|
|
result = await self.handle_user_exception(error)
|
|
return await self.finalize_websocket(result, websocket_context)
|
|
|
|
async def preprocess_request(
|
|
self, request_context: RequestContext | None = None
|
|
) -> ResponseReturnValue | None:
|
|
"""Preprocess the request i.e. call before_request functions.
|
|
|
|
Arguments:
|
|
request_context: The request context, optional as Flask
|
|
omits this argument.
|
|
"""
|
|
names = [None, *reversed((request_context or request_ctx).request.blueprints)]
|
|
|
|
for name in names:
|
|
for processor in self.url_value_preprocessors[name]:
|
|
processor(request.endpoint, request.view_args)
|
|
|
|
for name in names:
|
|
for function in self.before_request_funcs[name]:
|
|
result = await self.ensure_async(function)()
|
|
if result is not None:
|
|
return result # type: ignore[return-value]
|
|
|
|
return None
|
|
|
|
async def preprocess_websocket(
|
|
self, websocket_context: WebsocketContext | None = None
|
|
) -> ResponseReturnValue | None:
|
|
"""Preprocess the websocket i.e. call before_websocket functions.
|
|
|
|
Arguments:
|
|
websocket_context: The websocket context, optional as Flask
|
|
omits this argument.
|
|
"""
|
|
names = [
|
|
None,
|
|
*reversed((websocket_context or websocket_ctx).websocket.blueprints),
|
|
]
|
|
|
|
for name in names:
|
|
for processor in self.url_value_preprocessors[name]:
|
|
processor(request.endpoint, request.view_args)
|
|
|
|
for name in names:
|
|
for function in self.before_websocket_funcs[name]:
|
|
result = await self.ensure_async(function)()
|
|
if result is not None:
|
|
return result # type: ignore[return-value]
|
|
|
|
return None
|
|
|
|
def raise_routing_exception(self, request: BaseRequestWebsocket) -> NoReturn:
|
|
raise request.routing_exception
|
|
|
|
async def dispatch_request(
|
|
self, request_context: RequestContext | None = None
|
|
) -> ResponseReturnValue:
|
|
"""Dispatch the request to the view function.
|
|
|
|
Arguments:
|
|
request_context: The request context, optional as Flask
|
|
omits this argument.
|
|
"""
|
|
request_ = (request_context or request_ctx).request
|
|
if request_.routing_exception is not None:
|
|
self.raise_routing_exception(request_)
|
|
|
|
if request_.method == "OPTIONS" and request_.url_rule.provide_automatic_options:
|
|
return await self.make_default_options_response()
|
|
|
|
handler = self.view_functions[request_.url_rule.endpoint]
|
|
return await self.ensure_async(handler)(**request_.view_args) # type: ignore[return-value]
|
|
|
|
async def dispatch_websocket(
|
|
self, websocket_context: WebsocketContext | None = None
|
|
) -> ResponseReturnValue | None:
|
|
"""Dispatch the websocket to the view function.
|
|
|
|
Arguments:
|
|
websocket_context: The websocket context, optional to match
|
|
the Flask convention.
|
|
"""
|
|
websocket_ = (websocket_context or websocket_ctx).websocket
|
|
if websocket_.routing_exception is not None:
|
|
self.raise_routing_exception(websocket_)
|
|
|
|
handler = self.view_functions[websocket_.url_rule.endpoint]
|
|
return await self.ensure_async(handler)(**websocket_.view_args) # type: ignore[return-value]
|
|
|
|
async def finalize_request(
|
|
self,
|
|
result: ResponseReturnValue | HTTPException,
|
|
request_context: RequestContext | None = None,
|
|
from_error_handler: bool = False,
|
|
) -> ResponseTypes:
|
|
"""Turns the view response return value into a response.
|
|
|
|
Arguments:
|
|
result: The result of the request to finalize into a response.
|
|
request_context: The request context, optional as Flask
|
|
omits this argument.
|
|
"""
|
|
response = await self.make_response(result)
|
|
try:
|
|
response = await self.process_response(response, request_context)
|
|
await request_finished.send_async(
|
|
self,
|
|
_sync_wrapper=self.ensure_async, # type: ignore[arg-type]
|
|
response=response,
|
|
)
|
|
except Exception:
|
|
if not from_error_handler:
|
|
raise
|
|
self.logger.exception("Request finalizing errored")
|
|
return response
|
|
|
|
async def finalize_websocket(
|
|
self,
|
|
result: ResponseReturnValue | HTTPException,
|
|
websocket_context: WebsocketContext | None = None,
|
|
from_error_handler: bool = False,
|
|
) -> ResponseTypes | None:
|
|
"""Turns the view response return value into a response.
|
|
|
|
Arguments:
|
|
result: The result of the websocket to finalize into a response.
|
|
websocket_context: The websocket context, optional as Flask
|
|
omits this argument.
|
|
"""
|
|
if result is not None:
|
|
response = await self.make_response(result)
|
|
else:
|
|
response = None
|
|
try:
|
|
response = await self.postprocess_websocket(response, websocket_context)
|
|
await websocket_finished.send_async(
|
|
self,
|
|
_sync_wrapper=self.ensure_async, # type: ignore[arg-type]
|
|
response=response,
|
|
)
|
|
except Exception:
|
|
if not from_error_handler:
|
|
raise
|
|
self.logger.exception("Request finalizing errored")
|
|
return response
|
|
|
|
async def process_response(
|
|
self,
|
|
response: ResponseTypes,
|
|
request_context: RequestContext | None = None,
|
|
) -> ResponseTypes:
|
|
"""Postprocess the request acting on the response.
|
|
|
|
Arguments:
|
|
response: The response after the request is finalized.
|
|
request_context: The request context, optional as Flask
|
|
omits this argument.
|
|
"""
|
|
names = [*(request_context or request_ctx).request.blueprints, None]
|
|
|
|
for function in (request_context or request_ctx)._after_request_functions:
|
|
response = await self.ensure_async(function)(response) # type: ignore[assignment]
|
|
|
|
for name in names:
|
|
for function in reversed(self.after_request_funcs[name]):
|
|
response = await self.ensure_async(function)(response)
|
|
|
|
session_ = (request_context or request_ctx).session
|
|
if not self.session_interface.is_null_session(session_):
|
|
await self.ensure_async(self.session_interface.save_session)(
|
|
self, session_, response
|
|
)
|
|
return response
|
|
|
|
async def postprocess_websocket(
|
|
self,
|
|
response: ResponseTypes | None,
|
|
websocket_context: WebsocketContext | None = None,
|
|
) -> ResponseTypes:
|
|
"""Postprocess the websocket acting on the response.
|
|
|
|
Arguments:
|
|
response: The response after the websocket is finalized.
|
|
websocket_context: The websocket context, optional as Flask
|
|
omits this argument.
|
|
"""
|
|
names = [*(websocket_context or websocket_ctx).websocket.blueprints, None]
|
|
|
|
for function in (websocket_context or websocket_ctx)._after_websocket_functions:
|
|
response = await self.ensure_async(function)(response) # type: ignore[assignment]
|
|
|
|
for name in names:
|
|
for function in reversed(self.after_websocket_funcs[name]):
|
|
response = await self.ensure_async(function)(response) # type: ignore[assignment]
|
|
|
|
session_ = (websocket_context or websocket_ctx).session
|
|
if not self.session_interface.is_null_session(session_):
|
|
await self.session_interface.save_session(self, session_, response)
|
|
return response
|
|
|
|
async def __call__(
|
|
self, scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable
|
|
) -> None:
|
|
"""Called by ASGI servers.
|
|
|
|
The related :meth:`~quart.app.Quart.asgi_app` is called,
|
|
allowing for middleware usage whilst keeping the top level app
|
|
a :class:`~quart.app.Quart` instance.
|
|
"""
|
|
await self.asgi_app(scope, receive, send)
|
|
|
|
async def asgi_app(
|
|
self, scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable
|
|
) -> None:
|
|
"""This handles ASGI calls, it can be wrapped in middleware.
|
|
|
|
When using middleware with Quart it is preferable to wrap this
|
|
method rather than the app itself. This is to ensure that the
|
|
app is an instance of this class - which allows the quart cli
|
|
to work correctly. To use this feature simply do,
|
|
|
|
.. code-block:: python
|
|
|
|
app.asgi_app = middleware(app.asgi_app)
|
|
|
|
"""
|
|
asgi_handler: ASGIHTTPProtocol | ASGILifespanProtocol | ASGIWebsocketProtocol
|
|
if scope["type"] == "http":
|
|
asgi_handler = self.asgi_http_class(self, scope)
|
|
elif scope["type"] == "websocket":
|
|
asgi_handler = self.asgi_websocket_class(self, scope)
|
|
elif scope["type"] == "lifespan":
|
|
asgi_handler = self.asgi_lifespan_class(self, scope)
|
|
else:
|
|
raise RuntimeError("ASGI Scope type is unknown")
|
|
await asgi_handler(receive, send)
|
|
|
|
async def startup(self) -> None:
|
|
self.shutdown_event = self.event_class()
|
|
try:
|
|
async with self.app_context():
|
|
for func in self.before_serving_funcs:
|
|
await self.ensure_async(func)()
|
|
for gen in self.while_serving_gens:
|
|
await gen.__anext__()
|
|
except Exception as error:
|
|
await got_serving_exception.send_async(
|
|
self,
|
|
_sync_wrapper=self.ensure_async, # type: ignore[arg-type]
|
|
exception=error,
|
|
)
|
|
self.log_exception(sys.exc_info())
|
|
raise
|
|
|
|
async def shutdown(self) -> None:
|
|
self.shutdown_event.set()
|
|
try:
|
|
await asyncio.wait_for(
|
|
asyncio.gather(*self.background_tasks),
|
|
timeout=self.config["BACKGROUND_TASK_SHUTDOWN_TIMEOUT"],
|
|
)
|
|
except asyncio.TimeoutError:
|
|
await cancel_tasks(self.background_tasks)
|
|
|
|
try:
|
|
async with self.app_context():
|
|
for func in self.after_serving_funcs:
|
|
await self.ensure_async(func)()
|
|
for gen in self.while_serving_gens:
|
|
try:
|
|
await gen.__anext__()
|
|
except StopAsyncIteration:
|
|
pass
|
|
else:
|
|
raise RuntimeError("While serving generator didn't terminate")
|
|
except Exception as error:
|
|
await got_serving_exception.send_async(
|
|
self,
|
|
_sync_wrapper=self.ensure_async, # type: ignore[arg-type]
|
|
exception=error,
|
|
)
|
|
self.log_exception(sys.exc_info())
|
|
raise
|
|
|
|
|
|
def _cancel_all_tasks(loop: asyncio.AbstractEventLoop) -> None:
|
|
tasks = [task for task in asyncio.all_tasks(loop) if not task.done()]
|
|
if not tasks:
|
|
return
|
|
|
|
for task in tasks:
|
|
task.cancel()
|
|
loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True))
|
|
|
|
for task in tasks:
|
|
if not task.cancelled() and task.exception() is not None:
|
|
loop.call_exception_handler(
|
|
{
|
|
"message": "unhandled exception during shutdown",
|
|
"exception": task.exception(),
|
|
"task": task,
|
|
}
|
|
)
|