from itertools import chain
from typing import Any
from typing import Dict
from typing import Tuple
from fresco.core import context
from fresco.request import Request
from fresco.response import Response
from fresco.routing import Route
from fresco.routing import GET
from fresco.routing import RouteArg
from fresco.util.http import parse_header
from fresco.util.wsgi import make_environ
__all__ = ["subrequest", "subrequest_raw", "subrequest_bytes"]
class Markup(str):
def __html__(self):
return self
[docs]def subrequest(view, *args, **kwargs):
"""
Return the response generated by a subrequest to ``view``
This behaves differently depending on the value of ``view``:
- **A string containing a ``/``**:
a full request context is constructed to emulate a request
to the given path.
The subrequest will invoke all middleware layers and application hooks.
- **A callable**: a new subrequest context will be created and
``view(*args, **kwargs)`` will be called. Middleware and application
hooks will not be called, and any
:class:`~fresco.routeargs.RouteArg` instances will not be resolved.
- **Any other value**: the view function is looked up using the same
route resolution rules as :meth:`~fresco.core.FrescoApp.urlfor`.
Middleware and hooks will be skipped, but
:class:`~fresco.routeargs.RouteArg` instances will be resolved.
If you pass in a view callable you can force RouteArgs to be resolved by
specifying ``_resolve=True``.
You can force a full request to be emulated by specifying ``_full=True``.
Passing additional positional and keyword arguments has a different
effect depending on the value of ``view``, ``_resolve`` and ``_full``:
- **If view is a path, or if _full is True**,
``*args`` and ``**kwargs`` are passed to
:meth:`~fresco.core.FrescoApp.requestcontext`,
and may be used to customize the WSGI environment
used for the subrequest.
A new request context will be created
from a copy of the current request's WSGI environ.
This will contain the same values
except for ``PATH_INFO`` and ``QUERY_STRING``.
- **If view is a callable and _resolve is False**,
``*args`` and ``**kwargs`` are passed to the view callable.
No new request context is created.
- **Otherwise**
``view`` is assumed to be the name of a route.
``*args`` and ``**kwargs`` are passed to the route lookup mechanism
and should provide the values defined in the route's path.
``kwargs`` values also override any :class:`~fresco.routeargs.RouteArg`s
defined by the route. These will only be generated from the request if
a corresponding value is not present in ``kwargs``.
No new request context is created.
:param view: The target path, view callable, or viewspec.
:param _mode: One of ``raw``, ``str`` or ``bytes``.
If ``str``, the content of the subrequest will be
returned as a string.
If ``bytes``, the content will be returned as a
byte string.
If ``raw``, the raw response will be returned. In
this case the caller is responsible for iterating the
content and calling any functions in ``response.onclose``.
:param _resolve: if ``True``, callable views will be resolved via the route
traversal mechanism, allowing routed arguments to be
set from the current request.
:param _full: if ``True``, a full new request context will be set up
ensuring application hooks and middleware are called
for the subrequest.
The WSGI environ will be a copy of the current request's environ
with only ``PATH_INFO`` and ``QUERY_STRING`` set up for the
specified view.
Additional arguments
are passed to :meth:`~fresco.core.FrescoApp.requestcontext`
and can be used to override values in the new request environ.
:param _env: Additional keys to merge into the subrequest's environ
(may only be used when ``_full`` is True)
:param args: Additional positional arguments
:param kwargs: Additional keyword arguments
"""
mode = kwargs.pop("_mode", "str")
if mode == "str":
return subrequest_str(view, *args, **kwargs)
elif mode == "bytes":
return subrequest_bytes(view, *args, **kwargs)
elif mode == "raw":
return subrequest_raw(view, *args, **kwargs)
else:
raise ValueError("Mode must be one of 'str', 'bytes', or 'raw'")
def subrequest_str(view, *args, **kwargs) -> str:
"""
Perform a subrequest and return the response content as a string.
See :func:`~fresco.subrequests.subrequest` for details
"""
return response_to_str(subrequest_raw(view, *args, **kwargs))
[docs]def subrequest_bytes(view, *args, **kwargs) -> bytes:
"""
Perform a subrequest and return the response content as a byte string.
See :func:`~fresco.subrequests.subrequest` for details
"""
return response_to_bytes(subrequest_raw(view, *args, **kwargs))
[docs]def subrequest_raw(view, *args, **kwargs) -> Response:
"""
Perform a subrequest and return the raw response object.
See :func:`~fresco.subrequests.subrequest` for details
"""
current_request = context.request
response = None
full = kwargs.pop("_full", False)
resolve = kwargs.pop("_resolve", False)
_env = kwargs.pop("_env", {})
environ = current_request.environ.copy()
del environ["PATH_INFO"]
if "QUERY_STRING" in environ:
del environ["QUERY_STRING"]
app = context.app
context.push(**dict(context.currentcontext(), is_subrequest=True))
try:
if full or (isinstance(view, str) and "/" in view):
del environ[current_request.STATE_ENV_KEY]
if full:
path = app.urlfor(view, *args, **kwargs)
else:
path = view
environ = make_environ(path, environ, b"")
if _env:
environ.update(_env)
request = app.request_class(environ)
context["request"] = request
response = app.view(request)
else:
assert not _env, "Use of _env invalid without _full"
if not callable(view) or resolve:
view, routed_args, routed_kwargs = resolve_viewspec(
view, *args, **kwargs
)
# Use args/kwargs extracted during route traversal,
args, kwargs = routed_args, routed_kwargs
response = view(*args, **kwargs)
return response
finally:
context.pop()
def response_to_str(response) -> str:
ct, params = parse_header(response.get_header("Content-Type"))
encoding = params.get("charset")
try:
# Check for the common case that response.content is already a
# string
if isinstance(response.content, str):
content = response.content
# ...or a list of strings?
elif isinstance(response.content, list) and all(
isinstance(i, str) for i in response.content
):
content = "".join(response.content)
# Anything else: decode the content iterator
else:
if not encoding:
raise ValueError("No content encoding specified")
content = b"".join(response.content_iterator).decode(encoding)
finally:
for f in response.onclose:
f()
if ct.startswith("text/html") or "xml" in ct.split(";")[0]:
return Markup(content)
return content
def response_to_bytes(response):
try:
return b"".join(response.content_iterator)
finally:
for f in response.onclose:
f()
def resolve_viewspec(viewspec, *args, **kwargs):
"""
:param viewspec: a view name, a reference in the form
``'package.module.viewfunction'``, or the view
callable itself.
:param args: positional arguments for the route path traversal.
:param kwargs: keyword arguments for the route path traversal.
:return: A tuple of ``(<view callable>, args, kwargs)``.
```args`` and ``kwargs`` are the arguments to apply
to ``view`` as extracted during the route traversal.
"""
app = kwargs.pop("_routecollection", context.app)
request = kwargs.pop("request", None) or context.request
method = kwargs.pop("method", GET)
if isinstance(viewspec, str) and ":" in viewspec:
viewspec, remainder = viewspec.split(":", 1)
delegated_route = app.routefor(viewspec)
for k in delegated_route.routed_args_default:
if k not in kwargs:
v = delegated_route.routed_args_default[k]
if callable(v):
v = v(request)
kwargs[k] = v
delegate_args, delegate_kwargs = _get_args_for_route(
delegated_route, request, args, kwargs
)
rc = delegated_route.routecollectionfactory(*delegate_args, **delegate_kwargs)
return resolve_viewspec(
remainder,
_routecollection=rc,
request=request,
method=method,
*args,
**kwargs
)
route = app.routefor(viewspec)
view = route.getview(method)
view_args, view_kwargs = _get_args_for_route(route, request, args, kwargs)
return view, view_args, view_kwargs
def _get_args_for_route(
route: Route, request: Request, args: Tuple, kwargs: Dict[str, Any]
) -> Tuple[Tuple, Dict[str, Any]]:
"""
Return the args/kwargs required to pass to the view callable for ``route``.
"""
mutable_args = list(args)
mutable_kwargs = kwargs.copy()
route_kwargs = {}
route_args = list(
chain(
((a(request) if isinstance(a, RouteArg) else a) for a in route.view_args),
(
mutable_args.pop(0)
for a in route.pattern.segments
if a.converter and not a.name
),
)
)
required_kwargs = chain(
route.view_kwargs.keys(),
(s.name for s in route.pattern.segments if s.converter and s.name),
)
_marker = object()
for k in required_kwargs:
v = mutable_kwargs.pop(k, _marker)
if v is not _marker:
route_kwargs[k] = v
continue
v = route.routed_args_default.get(k, _marker)
if v is not _marker:
if callable(v):
v = v(request)
route_kwargs[k] = v
continue
v = route.view_kwargs.get(k, _marker)
if v is not _marker:
v = v(request) if isinstance(v, RouteArg) else v
route_kwargs[k] = v
return tuple(route_args), route_kwargs