Source code for fresco.subrequests

from itertools import chain
from typing import Any
from typing import Dict
from typing import Tuple

from fresco.core import context
from fresco.request import Request
from fresco.response import Response
from fresco.routing import Route
from fresco.routing import GET
from fresco.routing import RouteArg
from fresco.util.http import parse_header
from fresco.util.wsgi import make_environ

__all__ = ["subrequest", "subrequest_raw", "subrequest_bytes"]


class Markup(str):
    def __html__(self):
        return self


[docs]def subrequest(view, *args, **kwargs): """ Return the response generated by a subrequest to ``view`` This behaves differently depending on the value of ``view``: - **A string containing a ``/``**: a full request context is constructed to emulate a request to the given path. The subrequest will invoke all middleware layers and application hooks. - **A callable**: a new subrequest context will be created and ``view(*args, **kwargs)`` will be called. Middleware and application hooks will not be called, and any :class:`~fresco.routeargs.RouteArg` instances will not be resolved. - **Any other value**: the view function is looked up using the same route resolution rules as :meth:`~fresco.core.FrescoApp.urlfor`. Middleware and hooks will be skipped, but :class:`~fresco.routeargs.RouteArg` instances will be resolved. If you pass in a view callable you can force RouteArgs to be resolved by specifying ``_resolve=True``. You can force a full request to be emulated by specifying ``_full=True``. Passing additional positional and keyword arguments has a different effect depending on the value of ``view``, ``_resolve`` and ``_full``: - **If view is a path, or if _full is True**, ``*args`` and ``**kwargs`` are passed to :meth:`~fresco.core.FrescoApp.requestcontext`, and may be used to customize the WSGI environment used for the subrequest. A new request context will be created from a copy of the current request's WSGI environ. This will contain the same values except for ``PATH_INFO`` and ``QUERY_STRING``. - **If view is a callable and _resolve is False**, ``*args`` and ``**kwargs`` are passed to the view callable. No new request context is created. - **Otherwise** ``view`` is assumed to be the name of a route. ``*args`` and ``**kwargs`` are passed to the route lookup mechanism and should provide the values defined in the route's path. ``kwargs`` values also override any :class:`~fresco.routeargs.RouteArg`s defined by the route. These will only be generated from the request if a corresponding value is not present in ``kwargs``. No new request context is created. :param view: The target path, view callable, or viewspec. :param _mode: One of ``raw``, ``str`` or ``bytes``. If ``str``, the content of the subrequest will be returned as a string. If ``bytes``, the content will be returned as a byte string. If ``raw``, the raw response will be returned. In this case the caller is responsible for iterating the content and calling any functions in ``response.onclose``. :param _resolve: if ``True``, callable views will be resolved via the route traversal mechanism, allowing routed arguments to be set from the current request. :param _full: if ``True``, a full new request context will be set up ensuring application hooks and middleware are called for the subrequest. The WSGI environ will be a copy of the current request's environ with only ``PATH_INFO`` and ``QUERY_STRING`` set up for the specified view. Additional arguments are passed to :meth:`~fresco.core.FrescoApp.requestcontext` and can be used to override values in the new request environ. :param _env: Additional keys to merge into the subrequest's environ (may only be used when ``_full`` is True) :param args: Additional positional arguments :param kwargs: Additional keyword arguments """ mode = kwargs.pop("_mode", "str") if mode == "str": return subrequest_str(view, *args, **kwargs) elif mode == "bytes": return subrequest_bytes(view, *args, **kwargs) elif mode == "raw": return subrequest_raw(view, *args, **kwargs) else: raise ValueError("Mode must be one of 'str', 'bytes', or 'raw'")
def subrequest_str(view, *args, **kwargs) -> str: """ Perform a subrequest and return the response content as a string. See :func:`~fresco.subrequests.subrequest` for details """ return response_to_str(subrequest_raw(view, *args, **kwargs))
[docs]def subrequest_bytes(view, *args, **kwargs) -> bytes: """ Perform a subrequest and return the response content as a byte string. See :func:`~fresco.subrequests.subrequest` for details """ return response_to_bytes(subrequest_raw(view, *args, **kwargs))
[docs]def subrequest_raw(view, *args, **kwargs) -> Response: """ Perform a subrequest and return the raw response object. See :func:`~fresco.subrequests.subrequest` for details """ current_request = context.request response = None full = kwargs.pop("_full", False) resolve = kwargs.pop("_resolve", False) _env = kwargs.pop("_env", {}) environ = current_request.environ.copy() del environ["PATH_INFO"] if "QUERY_STRING" in environ: del environ["QUERY_STRING"] app = context.app context.push(**dict(context.currentcontext(), is_subrequest=True)) try: if full or (isinstance(view, str) and "/" in view): del environ[current_request.STATE_ENV_KEY] if full: path = app.urlfor(view, *args, **kwargs) else: path = view environ = make_environ(path, environ, b"") if _env: environ.update(_env) request = app.request_class(environ) context["request"] = request response = app.view(request) else: assert not _env, "Use of _env invalid without _full" if not callable(view) or resolve: view, routed_args, routed_kwargs = resolve_viewspec( view, *args, **kwargs ) # Use args/kwargs extracted during route traversal, args, kwargs = routed_args, routed_kwargs response = view(*args, **kwargs) return response finally: context.pop()
def response_to_str(response) -> str: ct, params = parse_header(response.get_header("Content-Type")) encoding = params.get("charset") try: # Check for the common case that response.content is already a # string if isinstance(response.content, str): content = response.content # ...or a list of strings? elif isinstance(response.content, list) and all( isinstance(i, str) for i in response.content ): content = "".join(response.content) # Anything else: decode the content iterator else: if not encoding: raise ValueError("No content encoding specified") content = b"".join(response.content_iterator).decode(encoding) finally: for f in response.onclose: f() if ct.startswith("text/html") or "xml" in ct.split(";")[0]: return Markup(content) return content def response_to_bytes(response): try: return b"".join(response.content_iterator) finally: for f in response.onclose: f() def resolve_viewspec(viewspec, *args, **kwargs): """ :param viewspec: a view name, a reference in the form ``'package.module.viewfunction'``, or the view callable itself. :param args: positional arguments for the route path traversal. :param kwargs: keyword arguments for the route path traversal. :return: A tuple of ``(<view callable>, args, kwargs)``. ```args`` and ``kwargs`` are the arguments to apply to ``view`` as extracted during the route traversal. """ app = kwargs.pop("_routecollection", context.app) request = kwargs.pop("request", None) or context.request method = kwargs.pop("method", GET) if isinstance(viewspec, str) and ":" in viewspec: viewspec, remainder = viewspec.split(":", 1) delegated_route = app.routefor(viewspec) for k in delegated_route.routed_args_default: if k not in kwargs: v = delegated_route.routed_args_default[k] if callable(v): v = v(request) kwargs[k] = v delegate_args, delegate_kwargs = _get_args_for_route( delegated_route, request, args, kwargs ) rc = delegated_route.routecollectionfactory(*delegate_args, **delegate_kwargs) return resolve_viewspec( remainder, _routecollection=rc, request=request, method=method, *args, **kwargs ) route = app.routefor(viewspec) view = route.getview(method) view_args, view_kwargs = _get_args_for_route(route, request, args, kwargs) return view, view_args, view_kwargs def _get_args_for_route( route: Route, request: Request, args: Tuple, kwargs: Dict[str, Any] ) -> Tuple[Tuple, Dict[str, Any]]: """ Return the args/kwargs required to pass to the view callable for ``route``. """ mutable_args = list(args) mutable_kwargs = kwargs.copy() route_kwargs = {} route_args = list( chain( ((a(request) if isinstance(a, RouteArg) else a) for a in route.view_args), ( mutable_args.pop(0) for a in route.pattern.segments if a.converter and not a.name ), ) ) required_kwargs = chain( route.view_kwargs.keys(), (s.name for s in route.pattern.segments if s.converter and s.name), ) _marker = object() for k in required_kwargs: v = mutable_kwargs.pop(k, _marker) if v is not _marker: route_kwargs[k] = v continue v = route.routed_args_default.get(k, _marker) if v is not _marker: if callable(v): v = v(request) route_kwargs[k] = v continue v = route.view_kwargs.get(k, _marker) if v is not _marker: v = v(request) if isinstance(v, RouteArg) else v route_kwargs[k] = v return tuple(route_args), route_kwargs