# Copyright 2015 Oliver Cope
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from functools import partial
from functools import wraps
from typing import Callable
from typing import Dict
from typing import List
from typing import Tuple
from typing import Type
from typing import Set
from typing import Union
import typing as t
import contextlib
import logging
import sys
import types
from fresco.request import Request
from fresco.response import Response
from fresco.util.http import encode_multipart
from fresco.util.urls import normpath, make_query
from fresco.util.common import fq_path
from fresco.util.wsgi import make_environ
from fresco.typing import WSGICallable
from fresco.exceptions import ResponseException
from fresco.requestcontext import context
from fresco.routing import (
GET,
ExtensiblePattern,
RouteCollection,
RouteNotFound,
TraversedCollection,
)
from fresco.options import Options
__all__ = ("FrescoApp", "urlfor")
logger = logging.getLogger(__name__)
[docs]class FrescoApp(RouteCollection):
"""\
Fresco application class.
"""
#: The default class to use for URL pattern matching
pattern_class = ExtensiblePattern
#: A stdlib logger object, or None
logger = None
#: Class to use to instantiate request objects
request_class = Request
def __init__(self, *args, **kwargs):
views = kwargs.pop("views", None)
path = kwargs.pop("path", None)
super(FrescoApp, self).__init__(*args, **kwargs)
if views is not None:
if path is None:
path = "/"
self.include(path, views)
#: A list of (middleware, args, kwargs) tuples
self._middleware: List[Tuple[Callable, Tuple, Dict]] = []
#: The WSGI application. This is generated when the first request is
#: made.
self._wsgi_app = None
#: An options dictionary, for arbitrary application variables or
#: configuration
self.options = Options()
#: Functions to be called before the routing has been traversed.
#: Each function will be passed the request object.
#: If a function returns a value (other than ``None``)
#: this will be returned and the normal routing system bypassed.
self.process_request_handlers: List[Callable] = []
#: Functions to be called after routing, but before any view is invoked
#: Each function will be passed
#: ``request, view, view_args, view_kwargs``.
#: If a function returns a Response instance
#: this will be returned as the response
#: instead of calling the scheduled view.
#: If a function returns any other (non-None) value this will be used
#: to replace the scheduled view function.
self.process_view_handlers: List[Callable] = []
#: Functions to be called after a response object has been generated
#: Each function will be passed ``request, response``.
#: If a function returns a value (other than ``None``),
#: this value will be
#: returned as the response instead of calling the scheduled view.
self.process_response_handlers: List[Callable] = []
#: Functions to be called if the response has an HTTP error status code
#: (400 <= x <= 599)
#: Each function will be passed ``request, response``.
#: If a function returns a value (other than ``None``),
#: this value will be
#: returned as the response instead of calling the scheduled view.
self.process_http_error_response_handlers: List[Tuple[int, Callable]] = []
#: Functions to be called if an exception is raised during a view
#: Each function will be passed ``request, exc_info``.
#: If a function returns a value (other than ``None``),
#: this value will be
#: returned as the response and the error will not be propagated.
#: If all exception handlers return None then the error will be raised
self.process_exception_handlers: List[Tuple[Exception, Callable]] = []
#: Functions to be called at the end of request processing,
#: after all content has been output.
#: Each function will be passed ``request`` and should not
#: return any value.
self.process_teardown_handlers: List[Callable] = []
def __call__(self, environ, start_response):
"""\
Call the app as a WSGI application
"""
if self._wsgi_app is None:
self._wsgi_app = self.make_wsgi_app()
return self._wsgi_app(environ, start_response)
def __str__(self):
"""\
String representation of the application and its configured routes
"""
clsname = self.__class__.__name__
return "<%s %s>" % (
clsname,
("\n" + " " * (len(clsname) + 2)).join(str(r) for r in self.__routes__),
)
def get_response(
self,
request,
path,
method,
currentcontext=context.currentcontext,
normpath=normpath,
):
ctx = currentcontext()
ctx["app"] = self
environ = request.environ
environ["fresco.app"] = self
error_response = response = None
if ".." in path or "//" in path or "/./" in path:
path = normpath(path)
for f in self.process_request_handlers:
try:
r = f(request)
if r is not None:
response = r
except Exception:
return self.handle_exception(request, allow_reraise=False)
if response:
return response
try:
for traversal in self.get_route_traversals(path, method, request):
try:
route = traversal.route
environ["wsgiorg.routing_args"] = (
traversal.args,
traversal.kwargs,
)
view = route.getview(method)
ctx["view_self"] = getattr(view, "__self__", None)
ctx["route_traversal"] = traversal
if self.logger:
self.logger.info(
"matched route: %s %r => %r",
request.method,
path,
fq_path(view),
)
response = None
for f in self.process_view_handlers:
try:
r = f(request, view, traversal.args, traversal.kwargs)
if r is not None:
response = r
except Exception:
return self.handle_exception(request, allow_reraise=False)
if response is not None:
if isinstance(response, Response):
return response
else:
view = response
view = route.getdecoratedview(view)
response = view(*traversal.args, **traversal.kwargs)
if (
route.fallthrough_statuses
and response.status_code in route.fallthrough_statuses
):
error_response = response
continue
except ResponseException as e:
if e.is_final:
return e.response
error_response = error_response or e.response
if (
route.fallthrough_statuses
and error_response.status_code in route.fallthrough_statuses
):
continue
except Exception:
return self.handle_exception(request)
else:
return response
except ResponseException as e:
if e.is_final:
return e.response
error_response = error_response or e.response
except Exception:
return self.handle_exception(request)
# A route was matched, but an error was returned
if error_response:
return error_response
# Is this a head request?
if method == "HEAD":
response = self.get_response(request, path, GET)
if "200" <= response.status < "300":
return response.replace(content=[], content_length=0)
return response
# Is the URL matched by another HTTP method?
methods = self.get_methods(request, path)
if methods:
return Response.method_not_allowed(methods)
# Is the URL just missing a trailing '/'?
if not path or path[-1] != "/":
for _ in self.get_methods(request, path + "/"):
return Response.unrestricted_redirect_permanent(path + "/")
return Response.not_found()
def view(self, request=None) -> Response:
request = request or context.request
try:
path = request.path_info
except ResponseException as e:
response = e.response
else:
response = self.get_response(request, path, request.method)
for f in self.process_response_handlers:
try:
r = f(request, response)
if r is not None:
response = r
except Exception:
self.log_exception(request)
if "400" <= response.status <= "599":
if self.process_http_error_response_handlers:
response = self.handle_http_error_response(request, response)
return response
[docs] def handle_http_error_response(self, request, response):
"""
Call any process_http_error_response handlers and return the
(potentially modified) response object.
"""
for status, f in self.process_http_error_response_handlers:
try:
if status is not None and status != response.status_code:
continue
r = f(request, response)
if r is not None:
response = r
except Exception:
self.log_exception(request)
return response
[docs] def get_methods(self, request, path):
"""\
Return the HTTP methods valid in routes to the given path
"""
methods: Set[str] = set()
for traversal in self.get_route_traversals(path, None):
route = traversal.route
if route.predicate and not route.predicate(request):
continue
methods.update(route.methods)
return methods
def log_exception(self, request, exc_info=None):
exc_info = exc_info or sys.exc_info()
(self.logger or logger).error(
"Exception in {0} {1}".format(request.method, request.url),
exc_info=exc_info,
)
def handle_exception(
self, request, allow_reraise=True
) -> Union[
Response,
Tuple[Type[BaseException], BaseException, types.TracebackType],
]:
exc_info = sys.exc_info()
if exc_info[0] is None:
raise AssertionError(
"handle_exception called " "when no exception is being handled"
)
have_error_handlers = self.process_exception_handlers or any(
st in (None, 500) for st, fn in self.process_http_error_response_handlers
)
# Backwards compatibility: if no exception or 500 http error
# handlers have been installed we default to the old behavior
# of raising the exception and letting the upstream
# server handle it
if allow_reraise and not have_error_handlers:
raise exc_info[1].with_traceback(exc_info[2]) # type: ignore
response: Union[
Response,
Tuple[Type[BaseException], BaseException, types.TracebackType],
] = Response.internal_server_error()
if not self.process_exception_handlers:
self.log_exception(request, exc_info)
for exc_type, func in self.process_exception_handlers:
try:
if not issubclass(exc_info[0], exc_type): # type: ignore
continue
r = func(request, exc_info)
if r is not None:
response = r
break
except Exception:
self.log_exception(request)
if isinstance(response, tuple) and len(response) == 3:
raise exc_info[1].with_traceback(exc_info[2]) # type: ignore
return response
[docs] def add_middleware(self, middleware, *args, **kwargs):
"""\
Add a WSGI middleware layer
Note that middleware is applied from the outside in. The first
middleware added will occupy the innermost layer and be called last in
each request cycle.
"""
self.reset_wsgi_app()
self._middleware.append((middleware, args, kwargs))
[docs] def remove_middleware(self, middleware):
"""
Remove the given WSGI middleware layer.
:param middleware: A middleware object. This must be the an object
that was previouslly passed to
:meth:`add_middleware`.
"""
self.reset_wsgi_app()
self._middleware = [m for m in self._middleware if m[0] is not middleware]
[docs] def insert_middleware(self, position, middleware, *args, **kwargs):
"""
Insert a middleware layer at the given position
:param position: The index of the element before which to insert.
Note that middleware is applied from the outside in,
so middleware inserted at position zero will be called
last.
:param middleware: A middleware callable.
:param args: extra positional args to be passed to the middleware for
initialization
:param kwargs: extra keyword args to be passed to the middleware for
initialization
"""
self.reset_wsgi_app()
self._middleware.insert(position, (middleware, args, kwargs))
[docs] def make_wsgi_app(self, wsgi_app=None, use_middleware=True) -> WSGICallable:
"""
Return a WSGI (PEP-3333) compliant application that drives this
FrescoApp object.
:param wsgi_app: if given, will be called in place of
:meth:~`fresco.core.FrescoApp.view`(request)
with :meth:~`fresco.core.FrescoApp.requestcontext`)
:param use_middleware: if True, the app's middleware stack will be applied
to the resulting WSGI app.
"""
if wsgi_app is None:
def wsgi_app(
environ,
start_response,
view=self.view,
request_class=self.request_class,
):
request = request_class(environ)
return view(request)(environ, start_response)
if use_middleware:
for m, m_args, m_kwargs in self._middleware:
wsgi_app = m(wsgi_app, *m_args, **m_kwargs)
def fresco_wsgi_app(
environ,
start_response,
frescoapp=self,
wsgi_app=wsgi_app,
request_class=self.request_class,
process_teardown_handlers=self.process_teardown_handlers,
call_process_teardown_handlers=self.call_process_teardown_handlers,
context_push=context.push,
context_pop=context.pop,
):
request = request_class(environ)
context_push(request=request, app=frescoapp)
iterator = None
try:
iterator = wsgi_app(environ, start_response)
yield from iterator
except Exception:
exc_info = sys.exc_info()
try:
response = frescoapp.handle_exception(request)
if "400" <= response.status <= "599":
response = frescoapp.handle_http_error_response(
request, response
)
def exc_start_response(s, h, exc_info=exc_info):
return start_response(s, h, exc_info)
yield from response(environ, exc_start_response)
finally:
del exc_info
finally:
try:
if process_teardown_handlers:
call_process_teardown_handlers(request)
for item in request.teardown_handlers:
item()
finally:
try:
close = getattr(iterator, "close", None)
if close is not None:
close()
finally:
context_pop()
return fresco_wsgi_app
def reset_wsgi_app(self):
try:
del self.__call__
except AttributeError:
pass
self._wsgi_app = None
[docs] def urlfor(self, viewspec, *args, **kwargs):
"""\
Return the url for the given view name or function spec
:param viewspec: a view name, a reference in the form
``'package.module.viewfunction'``, or the view
callable itself.
:param _scheme: the URL scheme to use (eg 'https' or 'http').
:param _netloc: the network location to use (eg 'localhost:8000').
:param _script_name: the SCRIPT_NAME path component
:param _query: any query parameters, as a dict or list of
``(key, value)`` tuples.
:param _fragment: a URL fragment to append.
All other arguments or keyword args are fed to the ``pathfor`` method
of the pattern.
"""
popkw = kwargs.pop
scheme = popkw("_scheme", None)
netloc = popkw("_netloc", None)
query = popkw("_query", {})
script_name = popkw("_script_name", None)
fragment = popkw("_fragment", None)
ctx = context.currentcontext()
request = ctx["request"]
traversal = ctx.get("route_traversal")
if traversal and traversal.collections_traversed:
collections_traversed = traversal.collections_traversed
else:
collections_traversed = [
TraversedCollection(self, "", None, (), {}, (), {})
]
exc = None
for ct in collections_traversed[::-1]:
try:
path = ct.path + ct.collection.pathfor(
viewspec, request=request, *args, **kwargs
)
except RouteNotFound as e:
exc = e
continue
return request.make_url(
scheme=scheme,
netloc=netloc,
SCRIPT_NAME=script_name,
PATH_INFO=path,
parameters="",
query=query,
fragment=fragment,
)
raise exc or RouteNotFound(viewspec)
[docs] def view_method_not_found(self, valid_methods):
"""
Return a ``405 Method Not Allowed`` response.
Called when a view matched the pattern but no HTTP methods matched.
"""
return Response.method_not_allowed(valid_methods)
[docs] def call_process_teardown_handlers(self, request):
"""
Called once the request has been completed and the response content
output.
"""
for func in self.process_teardown_handlers:
try:
func(request)
except Exception:
self.log_exception(request)
[docs] def process_request_once(
self, func: Callable[[Request], t.Optional[Response]]
) -> Callable[[Request], t.Optional[Response]]:
"""
Register a ``process_request`` hook function that is called only once
When running fresco with multiple worker threads/processes the hook
function will be called at most once per worker.
"""
@self.process_request
@wraps(func)
def process_request_once(request: Request) -> t.Optional[Response]:
try:
self.process_request_handlers.remove(process_request_once)
except ValueError:
return None
return func(request)
return func
[docs] def process_request(self, func):
"""
Register a ``process_request`` hook function
"""
self.process_request_handlers.append(func)
return func
[docs] def process_view(self, func):
"""
Register a ``process_view`` hook function
"""
self.process_view_handlers.append(func)
return func
[docs] def process_response(self, func):
"""
Register a ``process_response`` hook function
"""
self.process_response_handlers.append(func)
return func
[docs] def process_exception(self, func, exc_type=Exception):
"""
Register a ``process_exception`` hook function
"""
if isinstance(func, type) and issubclass(func, Exception):
return partial(self.process_exception, exc_type=func)
self.process_exception_handlers.append((exc_type, func))
return func
[docs] def process_http_error_response(self, func, status=None):
"""
Register a ``process_http_error_response`` hook function
"""
if isinstance(func, int):
return partial(self.process_http_error_response, status=func)
self.process_http_error_response_handlers.append((status, func))
return func
[docs] def process_teardown(self, func):
"""
Register a ``process_teardown`` hook function
"""
if self._wsgi_app:
raise AssertionError(
"Cannot add hook: application is now receiving requests"
)
self.process_teardown_handlers.append(func)
[docs] @contextlib.contextmanager
def requestcontext(
self, url="/", environ=None, wsgi_input=b"", middleware=True, **kwargs
):
"""
Return the global :class:`fresco.requestcontext.RequestContext`
instance, populated with a new request object modelling default
WSGI environ values.
Synopsis::
>>> app = FrescoApp()
>>> with app.requestcontext('http://www.example.com/view') as c:
... print(c.request.url)
...
http://www.example.com/view
Note that ``url`` must be properly URL encoded.
:param url: The URL for the request,
eg ``/index.html`` or ``/search?q=foo``.
:param environ: values to pass into the WSGI environ dict
:param wsgi_input: The input stream to use in the ``wsgi.input``
key of the environ dict
:param middleware: If ``False`` the middleware stack will not be
invoked. Disabling the middleware can speed up
the execution considerably, but it will no longer
give an accurate simulation of a real HTTP request.
:param kwargs: additional keyword arguments will be passed into the
WSGI request environment
"""
def fake_app(environ, start_response):
start_response("200 OK", [])
yield b""
def fake_start_response(status, headers, exc_info=None):
return lambda s: None
environ = make_environ(url, environ, wsgi_input, **kwargs)
app = self.make_wsgi_app(wsgi_app=fake_app, use_middleware=middleware)
result = app(environ, fake_start_response)
close = getattr(result, "close", None)
content_iterator = iter(result)
try:
next(content_iterator, None)
yield context
list(content_iterator)
finally:
if close is not None:
close()
def requestcontext_with_payload(
self, url="/", data=None, environ=None, files=None, multipart=False, **kwargs
):
if files:
multipart = True
if multipart:
wsgi_input, headers = encode_multipart(data, files)
kwargs.update(headers)
elif hasattr(data, "read"):
wsgi_input = data.read()
elif isinstance(data, bytes):
wsgi_input = data
elif data is None:
wsgi_input = ""
else:
wsgi_input = make_query(data).encode("ascii")
if "CONTENT_LENGTH" not in kwargs:
kwargs["CONTENT_LENGTH"] = str(len(wsgi_input))
return self.requestcontext(url, environ, wsgi_input=wsgi_input, **kwargs)
def requestcontext_post(self, *args, **kwargs):
return self.requestcontext_with_payload(REQUEST_METHOD="POST", *args, **kwargs)
def requestcontext_put(self, *args, **kwargs):
kwargs["REQUEST_METHOD"] = "PUT"
return self.requestcontext_with_payload(*args, **kwargs)
def requestcontext_patch(self, *args, **kwargs):
kwargs["REQUEST_METHOD"] = "PATCH"
return self.requestcontext_with_payload(*args, **kwargs)
def requestcontext_delete(self, *args, **kwargs):
kwargs["REQUEST_METHOD"] = "DELETE"
return self.requestcontext(*args, **kwargs)
[docs]def urlfor(*args, **kwargs):
"""
Convenience wrapper around :meth:`~fresco.core.FrescoApp.urlfor`.
"""
return context.app.urlfor(*args, **kwargs)