brief-extractor/backend/venv/lib/python3.10/site-packages/quart/helpers.py
2026-03-06 18:42:46 +00:00

403 lines
13 KiB
Python
Executable file

from __future__ import annotations
import mimetypes
import os
import pkgutil
import sys
from collections.abc import Iterable
from datetime import datetime
from datetime import timedelta
from datetime import timezone
from functools import cache
from functools import wraps
from io import BytesIO
from pathlib import Path
from typing import Any
from typing import Callable
from typing import cast
from typing import NoReturn
from zlib import adler32
from flask.helpers import get_root_path as get_root_path # noqa: F401
from werkzeug.exceptions import abort as werkzeug_abort
from werkzeug.exceptions import NotFound
from werkzeug.utils import redirect as werkzeug_redirect
from werkzeug.utils import safe_join
from werkzeug.wrappers import Response as WerkzeugResponse
from .globals import _cv_request
from .globals import current_app
from .globals import request
from .globals import request_ctx
from .globals import session
from .signals import message_flashed
from .typing import FilePath
from .typing import ResponseReturnValue
from .typing import ResponseTypes
from .utils import file_path_to_path
from .wrappers import Response
from .wrappers.response import ResponseBody
DEFAULT_MIMETYPE = "application/octet-stream"
locked_cached_property = property
def get_debug_flag() -> bool:
"""Reads QUART_DEBUG environment variable to determine whether to run
the app in debug mode. If unset, and development mode has been
configured, it will be enabled automatically.
"""
value = os.getenv("QUART_DEBUG", None)
return bool(value and value.lower() not in {"0", "false", "no"})
def get_load_dotenv(default: bool = True) -> bool:
"""Get whether the user has disabled loading default dotenv files by
setting :envvar:`QUART_SKIP_DOTENV`. The default is ``True``, load
the files.
:param default: What to return if the env var isn't set.
"""
val = os.environ.get("QUART_SKIP_DOTENV")
if not val:
return default
return val.lower() in ("0", "false", "no")
async def make_response(*args: Any) -> ResponseTypes:
"""Create a response, a simple wrapper function.
This is most useful when you want to alter a Response before
returning it, for example
.. code-block:: python
response = make_response(render_template('index.html'))
response.headers['X-Header'] = 'Something'
"""
if not args:
return current_app.response_class("")
if len(args) == 1:
args = args[0]
return await current_app.make_response(cast(ResponseReturnValue, args))
async def make_push_promise(path: str) -> None:
"""Create a push promise, a simple wrapper function.
This takes a path that should be pushed to the client if the
protocol is HTTP/2.
"""
return await request.send_push_promise(path)
async def flash(message: str, category: str = "message") -> None:
"""Add a message (with optional category) to the session store.
This is typically used to flash a message to a user that will be
stored in the session and shown during some other request. For
example,
.. code-block:: python
@app.route('/login', methods=['POST'])
async def login():
...
await flash('Login successful')
return redirect(url_for('index'))
allows the index route to show the flashed messages, without
having to accept the message as an argument or otherwise. See
:func:`~quart.helpers.get_flashed_messages` for message retrieval.
"""
flashes = session.get("_flashes", [])
flashes.append((category, message))
session["_flashes"] = flashes
app = current_app._get_current_object() # type: ignore
await message_flashed.send_async(
app, _sync_wrapper=app.ensure_async, message=message, category=category
)
def get_flashed_messages(
with_categories: bool = False, category_filter: Iterable[str] = ()
) -> list[str] | list[tuple[str, str]]:
"""Retrieve the flashed messages stored in the session.
This is mostly useful in templates where it is exposed as a global
function, for example
.. code-block:: html+jinja
<ul>
{% for message in get_flashed_messages() %}
<li>{{ message }}</li>
{% endfor %}
</ul>
Note that caution is required for usage of ``category_filter`` as
all messages will be popped, but only those matching the filter
returned. See :func:`~quart.helpers.flash` for message creation.
"""
flashes: list[str] = request_ctx.flashes
if flashes is None:
flashes = session.pop("_flashes", [])
request_ctx.flashes = flashes # type: ignore[assignment]
if category_filter:
flashes = [flash for flash in flashes if flash[0] in category_filter]
if not with_categories:
flashes = [flash[1] for flash in flashes]
return flashes
def get_template_attribute(template_name: str, attribute: str) -> Any:
"""Load a attribute from a template.
This is useful in Python code in order to use attributes in
templates.
Arguments:
template_name: To load the attribute from.
attribute: The attribute name to load
"""
return getattr(current_app.jinja_env.get_template(template_name).module, attribute)
def url_for(
endpoint: str,
*,
_anchor: str | None = None,
_external: bool | None = None,
_method: str | None = None,
_scheme: str | None = None,
**values: Any,
) -> str:
"""Return the url for a specific endpoint.
This is most useful in templates and redirects to create a URL
that can be used in the browser.
Arguments:
endpoint: The endpoint to build a url for, if prefixed with
``.`` it targets endpoint's in the current blueprint.
_anchor: Additional anchor text to append (i.e. #text).
_external: Return an absolute url for external (to app) usage.
_method: The method to consider alongside the endpoint.
_scheme: A specific scheme to use.
values: The values to build into the URL, as specified in
the endpoint rule.
"""
return current_app.url_for(
endpoint,
_anchor=_anchor,
_method=_method,
_scheme=_scheme,
_external=_external,
**values,
)
def stream_with_context(func: Callable) -> Callable:
"""Share the current request context with a generator.
This allows the request context to be accessed within a streaming
generator, for example,
.. code-block:: python
@app.route('/')
def index() -> AsyncGenerator[bytes, None]:
@stream_with_context
async def generator() -> bytes:
yield request.method.encode()
yield b' '
yield request.path.encode()
return generator()
"""
request_context = _cv_request.get().copy()
@wraps(func)
async def generator(*args: Any, **kwargs: Any) -> Any:
async with request_context:
async for data in func(*args, **kwargs):
yield data
return generator
def find_package(name: str) -> tuple[Path | None, Path]:
"""Finds packages install prefix (or None) and it's containing Folder"""
module = name.split(".")[0]
loader = pkgutil.get_loader(module)
if name == "__main__" or loader is None:
package_path = Path.cwd()
else:
if hasattr(loader, "get_filename"):
filename = loader.get_filename(module)
else:
__import__(name)
filename = sys.modules[name].__file__
package_path = Path(filename).resolve().parent
if hasattr(loader, "is_package"):
is_package = loader.is_package(module)
if is_package:
package_path = Path(package_path).resolve().parent
sys_prefix = Path(sys.prefix).resolve()
try:
package_path.relative_to(sys_prefix)
except ValueError:
return None, package_path
else:
return sys_prefix, package_path
async def send_from_directory(
directory: FilePath,
file_name: str,
*,
mimetype: str | None = None,
as_attachment: bool = False,
attachment_filename: str | None = None,
add_etags: bool = True,
cache_timeout: int | None = None,
conditional: bool = True,
last_modified: datetime | None = None,
) -> Response:
"""Send a file from a given directory.
Arguments:
directory: Directory that when combined with file_name gives
the file path.
file_name: File name that when combined with directory gives
the file path.
See :func:`send_file` for the other arguments.
"""
raw_file_path = safe_join(str(directory), file_name)
if raw_file_path is None:
raise NotFound()
file_path = Path(raw_file_path)
if not file_path.is_file():
raise NotFound()
return await send_file(
file_path,
mimetype=mimetype,
as_attachment=as_attachment,
attachment_filename=attachment_filename,
add_etags=add_etags,
cache_timeout=cache_timeout,
conditional=conditional,
last_modified=last_modified,
)
async def send_file(
filename_or_io: FilePath | BytesIO,
mimetype: str | None = None,
as_attachment: bool = False,
attachment_filename: str | None = None,
add_etags: bool = True,
cache_timeout: int | None = None,
conditional: bool = False,
last_modified: datetime | None = None,
) -> Response:
"""Return a Response to send the filename given.
Arguments:
filename_or_io: The filename (path) to send, remember to use
:func:`safe_join`.
mimetype: Mimetype to use, by default it will be guessed or
revert to the DEFAULT_MIMETYPE.
as_attachment: If true use the attachment filename in a
Content-Disposition attachment header.
attachment_filename: Name for the filename, if it differs
add_etags: Set etags based on the filename, size and
modification time.
last_modified: Used to override the last modified value.
cache_timeout: Time in seconds for the response to be cached.
"""
file_body: ResponseBody
file_size: int | None = None
etag: str | None = None
if isinstance(filename_or_io, BytesIO):
file_body = current_app.response_class.io_body_class(filename_or_io)
file_size = filename_or_io.getbuffer().nbytes
else:
file_path = file_path_to_path(filename_or_io)
file_size = file_path.stat().st_size
if attachment_filename is None:
attachment_filename = file_path.name
file_body = current_app.response_class.file_body_class(file_path)
if last_modified is None:
last_modified = file_path.stat().st_mtime # type: ignore
if cache_timeout is None:
cache_timeout = current_app.get_send_file_max_age(str(file_path))
etag = (
f"{file_path.stat().st_mtime}-{file_path.stat().st_size}"
f"-{adler32(bytes(file_path))}"
)
if mimetype is None and attachment_filename is not None:
mimetype = mimetypes.guess_type(attachment_filename)[0] or DEFAULT_MIMETYPE
if mimetype is None:
raise ValueError(
"The mime type cannot be inferred, please set it manually via the"
" mimetype argument."
)
response = current_app.response_class(file_body, mimetype=mimetype)
response.content_length = file_size
if as_attachment:
response.headers.add(
"Content-Disposition", "attachment", filename=attachment_filename
)
if last_modified is not None:
response.last_modified = last_modified
response.cache_control.public = True
if cache_timeout is not None:
response.cache_control.max_age = cache_timeout
response.expires = datetime.now(timezone.utc) + timedelta(seconds=cache_timeout)
if add_etags and etag is not None:
response.set_etag(etag)
if conditional:
await response.make_conditional(
request, accept_ranges=True, complete_length=file_size
)
return response
@cache
def _split_blueprint_path(name: str) -> list[str]:
bps = [name]
while "." in bps[-1]:
bps.append(bps[-1].rpartition(".")[0])
return bps
def abort(code: int | Response, *args: Any, **kwargs: Any) -> NoReturn:
"""Raise an HTTPException for the given status code."""
if current_app:
current_app.aborter(code, *args, **kwargs)
werkzeug_abort(code, *args, **kwargs)
def redirect(location: str, code: int = 302) -> WerkzeugResponse:
"""Redirect to the location with the status code."""
if current_app:
return current_app.redirect(location, code=code)
return werkzeug_redirect(location, code=code)