from __future__ import annotations import hashlib from datetime import datetime from datetime import timezone from typing import TYPE_CHECKING from flask.sessions import NullSession as NullSession # noqa: F401 from flask.sessions import SecureCookieSession as SecureCookieSession # noqa: F401 from flask.sessions import ( # noqa: F401 session_json_serializer as session_json_serializer, ) from flask.sessions import SessionMixin as SessionMixin # noqa: F401 from itsdangerous import BadSignature from itsdangerous import URLSafeTimedSerializer from werkzeug.wrappers import Response as WerkzeugResponse from .wrappers import BaseRequestWebsocket from .wrappers import Response if TYPE_CHECKING: from .app import Quart # noqa class SessionInterface: """Base class for session interfaces. Attributes: null_session_class: Storage class for null (no storage) sessions. pickle_based: Indicates if pickling is used for the session. """ null_session_class = NullSession pickle_based = False async def make_null_session(self, app: Quart) -> NullSession: """Create a Null session object. This is used in replacement of an actual session if sessions are not configured or active. """ return self.null_session_class() def is_null_session(self, instance: object) -> bool: """Returns True is the instance is a null session.""" return isinstance(instance, self.null_session_class) def get_cookie_name(self, app: Quart) -> str: """Helper method to return the Cookie Name for the App.""" return app.config["SESSION_COOKIE_NAME"] def get_cookie_domain(self, app: Quart) -> str | None: """Helper method to return the Cookie Domain for the App.""" rv = app.config["SESSION_COOKIE_DOMAIN"] return rv if rv else None def get_cookie_path(self, app: Quart) -> str: """Helper method to return the Cookie path for the App.""" return app.config["SESSION_COOKIE_PATH"] or app.config["APPLICATION_ROOT"] def get_cookie_httponly(self, app: Quart) -> bool: """Helper method to return if the Cookie should be HTTPOnly for the App.""" return app.config["SESSION_COOKIE_HTTPONLY"] def get_cookie_secure(self, app: Quart) -> bool: """Helper method to return if the Cookie should be Secure for the App.""" return app.config["SESSION_COOKIE_SECURE"] def get_cookie_samesite(self, app: Quart) -> str: """Helper method to return the Cookie Samesite configuration for the App.""" return app.config["SESSION_COOKIE_SAMESITE"] def get_expiration_time(self, app: Quart, session: SessionMixin) -> datetime | None: """Helper method to return the Session expiration time. If the session is not 'permanent' it will expire as and when the browser stops accessing the app. """ if session.permanent: return datetime.now(timezone.utc) + app.permanent_session_lifetime else: return None def should_set_cookie(self, app: Quart, session: SessionMixin) -> bool: """Helper method to return if the Set Cookie header should be present. This triggers if the session is marked as modified or the app is configured to always refresh the cookie. """ if session.modified: return True save_each = app.config["SESSION_REFRESH_EACH_REQUEST"] return save_each and session.permanent async def open_session( self, app: Quart, request: BaseRequestWebsocket ) -> SessionMixin | None: """Open an existing session from the request or create one. Returns: The Session object or None if no session can be created, in which case the :attr:`null_session_class` is expected to be used. """ raise NotImplementedError() async def save_session( self, app: Quart, session: SessionMixin, response: Response | WerkzeugResponse | None, ) -> None: """Save the session argument to the response. Arguments: response: Can be None if the session is being saved after a websocket connection closes. Returns: The modified response, with the session stored. """ raise NotImplementedError() class SecureCookieSessionInterface(SessionInterface): """A Session interface that uses cookies as storage. This will store the data on the cookie in plain text, but with a signature to prevent modification. """ digest_method = staticmethod(hashlib.sha1) key_derivation = "hmac" salt = "cookie-session" serializer = session_json_serializer session_class = SecureCookieSession def get_signing_serializer(self, app: Quart) -> URLSafeTimedSerializer | None: """Return a serializer for the session that also signs data. This will return None if the app is not configured for secrets. """ if not app.secret_key: return None options = { "key_derivation": self.key_derivation, "digest_method": self.digest_method, } return URLSafeTimedSerializer( app.secret_key, salt=self.salt, serializer=self.serializer, signer_kwargs=options, ) async def open_session( self, app: Quart, request: BaseRequestWebsocket ) -> SecureCookieSession | None: """Open a secure cookie based session. This will return None if a signing serializer is not available, usually if the config SECRET_KEY is not set. """ signer = self.get_signing_serializer(app) if signer is None: return None cookie = request.cookies.get(self.get_cookie_name(app)) if cookie is None: return self.session_class() max_age = int(app.permanent_session_lifetime.total_seconds()) try: data = signer.loads(cookie, max_age=max_age) return self.session_class(data) except BadSignature: return self.session_class() async def save_session( self, app: Quart, session: SessionMixin, response: Response | WerkzeugResponse | None, ) -> None: """Saves the session to the response in a secure cookie.""" if response is None: if session.modified: app.logger.exception( "Secure Cookie Session modified during websocket handling. " "These modifications will be lost as a cookie cannot be set." ) return name = self.get_cookie_name(app) domain = self.get_cookie_domain(app) path = self.get_cookie_path(app) secure = self.get_cookie_secure(app) samesite = self.get_cookie_samesite(app) httponly = self.get_cookie_httponly(app) # Add a "Vary: Cookie" header if the session was accessed at all. if session.accessed: response.vary.add("Cookie") # If the session is modified to be empty, remove the cookie. # If the session is empty, return without setting the cookie. if not session: if session.modified: response.delete_cookie( name, domain=domain, path=path, secure=secure, samesite=samesite, httponly=httponly, ) response.vary.add("Cookie") return if not self.should_set_cookie(app, session): return expires = self.get_expiration_time(app, session) val = self.get_signing_serializer(app).dumps(dict(session)) response.set_cookie( name, val, expires=expires, httponly=httponly, domain=domain, path=path, secure=secure, samesite=samesite, ) response.vary.add("Cookie")