# Copyright 2015 Oliver Cope
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import re
import sys
import warnings
from copy import copy
from collections import defaultdict
from collections import namedtuple
from collections.abc import MutableSequence
from importlib import import_module
from functools import partial
from typing import Any
from typing import Callable
from typing import Dict
from typing import List
from typing import Mapping
from typing import Optional
from typing import Union
from typing import Set
from typing import Tuple
from weakref import WeakKeyDictionary
import typing as t
from fresco.exceptions import ResponseException
from fresco.response import Response
from fresco.request import Request
from fresco.requestcontext import context
from fresco.routeargs import RouteArg
from fresco.util.cache import make_cache
from fresco.util.common import fq_path
from fresco.util.urls import join_path
from fresco.util.object import classorinstancemethod
RFC2518_METHODS = PROPFIND, PROPPATCH, MKCOL, COPY, MOVE, LOCK, UNLOCK = (
"PROPFIND",
"PROPPATCH",
"MKCOL",
"COPY",
"MOVE",
"LOCK",
"UNLOCK",
)
RFC2616_METHODS = GET, HEAD, POST, PUT, DELETE, OPTIONS, TRACE, CONNECT = (
"GET",
"HEAD",
"POST",
"PUT",
"DELETE",
"OPTIONS",
"TRACE",
"CONNECT",
)
RFC3253_METHODS = (
VERSION_CONTROL,
REPORT,
CHECKOUT,
CHECKIN,
UNCHECKOUT,
MKWORKSPACE,
UPDATE,
LABEL,
MERGE,
BASELINE_CONTROL,
MKACTIVITY,
) = (
"VERSION-CONTROL",
"REPORT",
"CHECKOUT",
"CHECKIN",
"UNCHECKOUT",
"MKWORKSPACE",
"UPDATE",
"LABEL",
"MERGE",
"BASELINE-CONTROL",
"MKACTIVITY",
)
RFC3648_METHODS = (ORDERPATCH,) = ("ORDERPATCH",)
RFC3744_METHODS = (ACL,) = ("ACL",)
RFC5323_METHODS = (SEARCH,) = ("SEARCH",)
RFC5789_METHODS = (PATCH,) = ("PATCH",)
ALL_METHODS = HTTP_METHODS = set(
RFC2518_METHODS
+ RFC2616_METHODS
+ RFC3253_METHODS
+ RFC3648_METHODS
+ RFC5323_METHODS
+ RFC5789_METHODS
)
__all__ = [
"ALL_METHODS",
"Pattern",
"Route",
"DelegateRoute",
"RouteCollection",
"routefor",
]
__all__ += [method.replace("-", "_") for method in ALL_METHODS]
#: Encapsulate a pattern match on a path
PathMatch = namedtuple(
"PathMatch", ["path_matched", "path_remaining", "args", "kwargs"]
)
class RouteTraversal(
namedtuple("RouteTraversal", "route args kwargs collections_traversed")
):
"""
Encapsulate a route traversal.
Each ``RouteTraversal`` object contains:
- ``route`` the final route traversed, allowing access to the view
associated with the path.
- ``args`` - positional args to be passed to the view. This is a
combination of args extracted from the path and any added when
constructing the route.
- ``kwargs`` - keyword args to be passed to the view. This is a
combination of kwargs extracted from the path and any added when
constructing the route.
- ``collections_traversed`` - a list of ``TraversedCollection``
objects, containing information on which RouteCollections were
encountered during traversal, the route selected at each stage
and any args/kwargs associated with that phase of the traversal.
"""
def replace(self, viewspec, traversal_kwargs=None, **kwargs):
"""
Return a new ``RouteTraversal`` with the traversal_kwargs or other
fields replaced for the route identified by ``viewspec``.
Example:
>>> from fresco import FrescoApp
>>> app = FrescoApp()
>>> @route('/<lang:str>/index.html', GET, homepage, name='home')
... def homepage(lang):
... return Response({'fr': 'Bonjour!', 'en': 'Hello!'}[lang])
...
>>> traversal = next(app.get_route_traversals('/fr/index.html'))
>>> en_traversal = traversal.replace('home', {'lang': 'en'})
>>> en_traversal.build_path()
'/en/index.html'
"""
if traversal_kwargs is not None:
kwargs["traversal_kwargs"] = traversal_kwargs
if isinstance(viewspec, str):
viewspecs = viewspec.split(":")
else:
viewspecs = [viewspec]
collections_traversed_iter = iter(self.collections_traversed)
for item in viewspecs:
while True:
ct = next(collections_traversed_iter, None)
if ct is None:
raise RouteNotFound(viewspec)
try:
route = ct.collection.routefor(item)
break
except RouteNotFound:
continue
new_collections_traversed = [
c._replace(**kwargs) if (c.route is route) else c
for c in self.collections_traversed
]
return self._replace(collections_traversed=new_collections_traversed)
def build_path(self):
"""
Reconstruct the path from a route traversal
"""
paths = []
for c in self.collections_traversed:
path, _, _ = c.route.path(*c.traversal_args, **c.traversal_kwargs)
paths.append(path)
return "".join(paths)
#: An item of RouteTraversal.collections_traversed
TraversedCollection = namedtuple(
"TraversedCollection",
"collection path route args kwargs " "traversal_args traversal_kwargs",
)
_marker = object()
class RouteNotReady(RuntimeError):
"""
getview was called on a route mapping to an instance method, but the
class has yet been instantiated
"""
class URLGenerationError(Exception):
"""\
Was not possible to generate the requested URL.
"""
class RouteNotFound(Exception):
"""\
The named route does not exist in the RouteCollection.
"""
[docs]class Pattern(object):
"""\
Patterns are matchable against URL paths using their ``match`` method. If a
path matches, this should return a tuple of ``(positional_arguments,
keyword_arguments)`` extracted from the URL path. Otherwise this method
should return ``None``.
Pattern objects may also be able to take a tuple of
``(positional_arguments, keyword_arguments)`` and return a corresponding
URL path.
"""
segments: t.List["PatternSegment"]
[docs] def match(self, path):
"""
Should return a tuple of ``(positional_arguments, keyword_arguments)``
if the pattern matches the given URL path, or None if it does not
match.
"""
raise NotImplementedError
[docs] def pathfor(self, *args, **kwargs):
"""
The inverse of ``match``, this should return a URL path
for the given positional and keyword arguments, along with any unused
arguments.
:return: a tuple of ``(path, remaining_args, remaining_kwargs)``
"""
raise NotImplementedError()
[docs] def path_argument_info(self):
"""
Return information about the arguments required for ``pathfor``
"""
raise NotImplementedError()
[docs] def add_prefix(self, prefix):
"""
Return a copy of the pattern with the given string prepended
"""
raise NotImplementedError()
class Converter(object):
"""\
Responsible for converting arguments to and from URL components.
A ``Converter`` class should provide two instance methods:
- ``to_string``: convert from a python object to a string
- ``from_string``: convert from URL-encoded bytestring to the target
python type.
It must also define the regular expression pattern that is used to extract
the string from the URL.
"""
pattern = "[^/]+"
def __init__(self, pattern=None):
"""
Initialize a ``Converter`` instance.
"""
if pattern is not None:
self.pattern = pattern
def to_string(self, ob):
"""
Convert arbitrary argument ``ob`` to a string representation
"""
return str(ob)
def from_string(self, s):
"""
Convert string argument ``s`` to the target object representation,
whatever that may be.
"""
return s
class IntConverter(Converter):
"""\
Match any integer value and convert to an ``int`` value.
"""
pattern = r"[+-]?\d+"
def from_string(self, s):
"""
Return ``s`` converted to an ``int`` value.
"""
return int(s)
class StrConverter(Converter):
"""\
Match any string, not including a forward slash, and return a ``str``
value
"""
pattern = r"[^/]+"
def to_string(self, s):
"""
Return ``s`` converted to an ``str`` object.
"""
return s
def from_string(self, s):
"""
Return ``s`` converted to a (unicode) string type
"""
return s
class AnyConverter(StrConverter):
"""
Match any one of the given string options. Example::
"/<lang:any('fr', 'en', 'de')>"
"""
def __init__(self, *args):
super(AnyConverter, self).__init__(None)
if len(args) == 0:
raise ValueError("Must supply at least one argument to any()")
self.pattern = "|".join(re.escape(arg) for arg in args)
class PathConverter(StrConverter):
"""\
Match any string, possibly including forward slashes, and return a
``str`` object.
"""
pattern = r".+"
class MatchAllURLsPattern(Pattern):
"""\
A pattern matcher that matches all URLs starting with the given prefix. No
arguments are parsed from the URL.
"""
def __init__(self, path):
self.path = path
def match(self, path):
if path.startswith(self.path):
return PathMatch(self.path, path[len(self.path) :], (), {})
return None
def pathfor(self, *args, **kwargs):
assert (
not args and not kwargs
), "MatchAllURLsPattern does not support URL arguments"
return self.path, (), {}
def path_argument_info(self):
return (), {}
def add_prefix(self, prefix):
return self.__class__(join_path(prefix, self.path))
def __str__(self):
return "%s*" % (self.path,)
class ExtensiblePattern(Pattern):
"""\
An extensible URL pattern matcher.
Synopsis::
>>> from pprint import pprint
>>> p = ExtensiblePattern(r"/<:str>/<year:int>/<title:str>")
>>> pprint(p.match('/archive/1999/blah')) # doctest: +ELLIPSIS
PathMatch(...)
Patterns are split on slashes into components. A component can either be a
literal part of the path, or a pattern component in the form::
<identifier>:<converter>
``identifer`` can be any python name, which will be used as the name of a
keyword argument to the matched function. If omitted, the argument will be
passed as a positional arg.
``converter`` must be the name of a pre-registered converter. Converters
must support ``to_string`` and ``from_string`` methods and are used to
convert between URL segments and python objects.
By default, the following converters are configured:
- ``int`` - converts to an integer
- ``path`` - any path (ie can include forward slashes)
- ``str`` - any string (not including forward slashes)
- ``unicode`` - alias for ``str``
- ``any`` - a string matching a list of alternatives
Some examples::
>>> p = ExtensiblePattern(r"/images/<:path>")
>>> p.match('/images/thumbnails/02.jpg') # doctest: +ELLIPSIS
PathMatch(..., args=('thumbnails/02.jpg',), kwargs={})
>>> p = ExtensiblePattern("/<page:any('about', 'help')>.html")
>>> p.match('/about.html') # doctest: +ELLIPSIS
PathMatch(..., args=(), kwargs={'page': 'about'})
>>> p = ExtensiblePattern("/entries/<id:int>")
>>> p.match('/entries/23') # doctest: +ELLIPSIS
PathMatch(..., args=(), kwargs={'id': 23})
Others can be added by calling ``ExtensiblePattern.register_converter``
"""
preset_patterns = {
"int": IntConverter,
"str": StrConverter,
"unicode": StrConverter,
"path": PathConverter,
"any": AnyConverter,
}
pattern_parser = re.compile(
r"""
<
(?P<name>\w[\w\d]*)?
:
(?P<converter>\w[\w\d]*)
(?:
\(
(?P<args>.*?)
\)
)?
>
""",
re.X,
)
def __init__(self, pattern, match_entire_path=True):
"""
Initialize a new ``ExtensiblePattern`` object with pattern ``pattern``
:param pattern: The pattern string, eg ``'/<id:int>/show'``
:param match_entire_path: Boolean. If ``True``, the entire path will be
required to match, otherwise a prefix match
will suffice.
"""
super(ExtensiblePattern, self).__init__()
self.pattern = pattern
self.match_entire_path = match_entire_path
self.segments = list(self._make_segments())
self.args = [item for item in self.segments if item.converter is not None]
regex = "".join(segment.regex for segment in self.segments)
if self.match_entire_path:
regex += "$"
else:
regex += "(?=/|$)"
self.regex = re.compile(regex)
self.regex_match = self.regex.match
def path_argument_info(self):
positional = tuple(a.converter for a in self.args if a.name is None)
keyword = {a.name: a.converter for a in self.args if a.name is not None}
return (positional, keyword)
def _make_segments(self):
r"""
Generate successive PatternSegment objects from the given string.
Each segment object represents a part of the pattern to be matched, and
comprises ``source``, ``regex``, ``name`` (if a named parameter) and
``converter`` (if a parameter)
"""
for item in split_iter(self.pattern_parser, self.pattern):
if isinstance(item, str):
yield PatternSegment(item, re.escape(item), None, None)
continue
groups = item.groupdict()
name, converter, args = (
groups["name"],
groups["converter"],
groups["args"],
)
converter = self.preset_patterns[converter]
if args:
args, kwargs = self.parseargs(args)
converter = converter(*args, **kwargs)
else:
converter = converter()
yield PatternSegment(
item.group(0), "(%s)" % converter.pattern, name, converter
)
def parseargs(self, argstr):
"""
Return a tuple of ``(args, kwargs)`` parsed out of a string in the
format ``arg1, arg2, param=arg3``.
Synopsis::
>>> ep = ExtensiblePattern('')
>>> ep.parseargs("1, 2, 'buckle my shoe'")
((1, 2, 'buckle my shoe'), {})
>>> ep.parseargs("3, four='knock on the door'")
((3,), {'four': 'knock on the door'})
"""
return eval("(lambda *args, **kwargs: (args, kwargs))(%s)" % argstr)
def match(self, path):
"""
Test ``path`` and return a tuple of parsed ``(args, kwargs)``, or
``None`` if there was no match.
"""
mo = self.regex_match(path)
if mo is None:
return None
groups = mo.groups()
assert len(groups) == len(self.args), (
"Number of regex groups does not match expected count. "
"Perhaps you have used capturing parentheses somewhere? "
"The pattern matched was %r." % self.regex.pattern
)
try:
group_items = [
(segment.name, segment.converter.from_string(value))
for value, segment in zip(groups, self.args)
]
except ValueError:
return None
matched = mo.group(0)
args = tuple(value for name, value in group_items if not name)
kwargs = {name: value for name, value in group_items if name}
return PathMatch(matched, path[len(matched) :], args, kwargs)
def pathfor(self, *args, **kwargs) -> Tuple[str, List[Any], Dict[Any, Any]]:
"""
Example usage::
>>> p = ExtensiblePattern("/view/<name:str>/<revision:int>")
>>> p.pathfor(name='important_document.pdf', revision=299)
('/view/important_document.pdf/299', [], {})
>>> p = ExtensiblePattern("/view/<:str>/<:int>")
>>> p.pathfor('important_document.pdf', 299)
('/view/important_document.pdf/299', [], {})
"""
arg_list = list(args)
kwargs = kwargs
result: List[str] = []
result_append = result.append
for seg in self.segments:
if not seg.converter:
result_append(seg.source)
elif seg.name:
try:
value = kwargs.pop(seg.name)
except IndexError:
raise URLGenerationError(
"Argument %r not specified for url %r"
% (seg.name, self.pattern)
)
result_append(seg.converter.to_string(value))
else:
try:
value = arg_list.pop(0)
except IndexError:
raise URLGenerationError(
"Not enough positional arguments for url %r" % (self.pattern,)
)
result_append(seg.converter.to_string(value))
return "".join(result), arg_list, kwargs
def add_prefix(self, prefix):
return self.__class__(join_path(prefix, self.pattern), self.match_entire_path)
@classmethod
def register_converter(cls, name, converter):
r"""
Register a preset pattern for later use in URL patterns.
Example usage::
>>> from datetime import date
>>> from time import strptime
>>> class DateConverter(Converter):
... pattern = r'\d{8}'
... def from_string(self, s):
... return date(*strptime(s, '%d%m%Y')[:3])
...
>>> ExtensiblePattern.register_converter('date', DateConverter)
>>> ExtensiblePattern('/<:date>')\
... .match('/01011970') # doctest:+ELLIPSIS
PathMatch(..., args=(datetime.date(1970, 1, 1),), kwargs={})
"""
cls.preset_patterns[name] = converter
def __repr__(self):
return "<%s %r>" % (self.__class__, self.pattern)
def __str__(self):
return "%s" % (self.pattern,)
class PatternSegment(object):
"""
Represent a single segment of a URL pattern, storing information about the
``source``, ``regex`` used to pattern match the segment, ``name`` for
named parameters and the ``converter`` used to map the value to a URL
parameter if applicable
"""
__slots__ = ["source", "regex", "name", "converter"]
def __init__(self, source, regex, name, converter):
self.source = source
self.regex = regex
self.name = name
self.converter = converter
[docs]class Route(object):
"""\
Represent a URL routing pattern
"""
#: The default class to use for URL pattern matching
pattern_class = ExtensiblePattern
fallthrough_statuses: Optional[Set[int]]
_route_hints: Dict[Callable, Dict[str, List[Callable]]] = defaultdict(
lambda: defaultdict(list)
)
#: Always provide an positional ``request`` argument to views
provide_request = False
def __init__(
self,
pattern: t.Union[str, Pattern],
methods=None,
view=None,
kwargs=None,
args=None,
name=None,
predicate=None,
decorators=None,
filters=None,
fallthrough_on=None,
provide_request: Optional[bool] = None,
**_kwargs,
):
"""
:param pattern: A string that can be compiled into a path pattern
:param methods: The list of HTTP methods the view is bound to
('GET', 'POST', etc)
:param view: The view function.
:param kwargs: A dictionary of default keyword arguments to pass
to the view callable
:param args: Positional arguments to pass to the view callable
:param name: A name that can later be used to retrieve the route
for URL generation
:param predicate: A callable that is used to decide whether to match
this pattern. The callable must take a ``Request``
object as its only parameter and return a boolean.
:param decorators: Decorator functions to apply to the view callable
before invoking it
:param filters: Filter functions to apply to the view's return
value before returning the final response object
:param fallthrough_on: A List of http status codes which, if returned
by a view will cause the current response to be
discarded with routing continuing to the next
available route.
:param provide_request: If True, provide the current request as the
first argument to the view callable.
:param **_kwargs: Keyword arguments matching HTTP method names
(GET, POST etc) can used to specify views
associated with those methods.
Other keyword aruments are passed through to the
view callable.
Naming routes
-------------
Naming routes allows you to reference routes throughout your
application, eg when generating URLs with
:function:`~fresco.core.urlfor`.
If you don't specify a ``name`` argument, the route will available by
its fully qualified function name (eg
``'package.module.view_function'``), or by passing the function object
itself to ``urlfor``
Views may often be assigned to multiple routes, for example::
>>> def display_image(size):
... return Response(['image data...'])
...
>>> from fresco import FrescoApp
>>> app = FrescoApp()
>>> app.route('/image', display_image, size=(1024, 768))
>>> app.route('/thumbnail', display_image, size=(75, 75))
If you generate a URL for the view function ``display_image``, the
the first declared route will always win, in this case::
>>> app.urlfor(display_image)
'http://localhost/image'
To generate URLs for the thumbnail route in this example, you must
explicity assign names::
>>> app.route('/image', display_image, name='image')
>>> app.route('/thumbnail', display_image, name='image-thumbnail')
>>> app.urlfor('image')
'http://localhost/image'
>>> app.urlfor('image-thumbnail')
'http://localhost/thumbnail'
"""
method_view_map: Dict[str, Callable] = {}
if methods:
if isinstance(methods, str):
methods = [methods]
else:
# Catch the common error of not providing a valid method
if not isinstance(methods, t.Iterable):
raise TypeError(
"HTTP methods must be specified as a string or iterable"
)
for m in methods:
if m not in ALL_METHODS:
raise ValueError("{!r} is not a valid HTTP method".format(m))
method_view_map.update((m, view) for m in methods)
for method in ALL_METHODS:
if method in _kwargs:
method_view_map[method] = _kwargs.pop(method)
if not isinstance(pattern, Pattern):
pattern = self.pattern_class(pattern)
if name and ":" in name:
# Colons are reserved to act as separators
raise ValueError("Route names cannot contain ':'")
self.name = name
self.predicate = predicate
self.decorators = decorators or []
self.before_hooks: List[Callable] = []
self.filters = filters or []
if fallthrough_on:
self.fallthrough_statuses = {int(i) for i in fallthrough_on}
else:
self.fallthrough_statuses = None
if provide_request is not None:
self.provide_request = provide_request
#: Default values to use for path generation
self.routed_args_default: Dict[str, Any] = {}
self.pattern = pattern
self.methods = set(method_view_map)
self.instance = None
# Cached references to view functions
self._cached_views: Dict[str, Callable] = {}
# Cached references to decorated view function. We use weakrefs in case
# a process_view hook substitutes the view function used as a key
# for a dynamically generated function.
self._cached_dviews: Mapping[Callable, Callable] = WeakKeyDictionary()
p_args, p_kwargs = pattern.path_argument_info()
for k in p_kwargs:
default = _kwargs.pop(k + "_default", _marker)
if default is not _marker:
self.routed_args_default[k] = default
self.view_args = tuple(args or _kwargs.pop("view_args", tuple()))
self.view_kwargs = dict(kwargs or _kwargs.pop("view_kwargs", {}), **_kwargs)
for arg in self.view_args:
if isinstance(arg, RouteArg):
arg.configure(self, None)
for argname, arg in self.view_kwargs.items():
# Allow 'Route(... x=FormArg)' shortcut
if isinstance(arg, type) and issubclass(arg, RouteArg):
arg = self.view_kwargs[argname] = arg()
if isinstance(arg, RouteArg):
arg.configure(self, argname)
#: A mapping of HTTP methods to view specifiers
self.viewspecs = method_view_map
def __repr__(self):
view_methods_map: Mapping[Callable, Set[str]] = defaultdict(set)
for method, viewspec in self.viewspecs.items():
view_methods_map[viewspec].add(method)
s = []
for viewspec, methods in view_methods_map.items():
if methods == ALL_METHODS:
method_str = "*"
else:
method_str = " ".join(self.methods)
s.append("%s %s => %r" % (method_str, str(self.pattern), fq_path(viewspec)))
return "<%s %s>" % (self.__class__.__name__, "\n ".join(s))
def __getstate__(self):
state = self.__dict__.copy()
state["_cached_views"] = {}
state["_cached_dviews"] = WeakKeyDictionary()
return state
def fallthrough_on(self, status_codes):
newroute = copy(self)
newroute.fallthrough_statuses = {int(s) for s in status_codes}
return newroute
def match(self, path, method):
if method and method not in self.methods:
return None
return self.pattern.match(path)
[docs] def getview(self, method: str) -> Callable:
"""\
Return the raw view callable.
"""
try:
return self._cached_views[method]
except KeyError:
pass
uview = self.viewspecs[method]
if isinstance(uview, str):
if "." in uview:
mod_name, attr_name = uview.rsplit(".", 1)
mod = import_module(mod_name)
uview = getattr(mod, attr_name)
else:
if self.instance is None:
raise RouteNotReady()
uview = getattr(self.instance, uview)
self._cached_views[method] = uview
return uview
@classmethod
def _add_route_hint(cls, viewfunc, hinttype, func):
assert hinttype in {"before_hooks", "decorators", "filters"}
cls._route_hints[viewfunc][hinttype].append(func)
@classmethod
def _get_route_hints(cls, func, hinttype):
# Look up the underlying function in the case that view is a method
viewfunc = getattr(func, "__func__", func)
return cls._route_hints[viewfunc][hinttype]
[docs] def getdecoratedview(self, view: Callable) -> Callable:
"""\
Return the view callable decorated with any decorators defined in the
route
"""
try:
return self._cached_dviews[view]
except KeyError:
pass
hints = self._get_route_hints
dview = view
# Reverse order of before hooks: the last added should be run first.
# This makes sense if extending existing routes with a before hook,
# as generally you want your new hook to take precendence over any
# existing hooks.
before_hooks = hints(view, "before_hooks")[::-1] + self.before_hooks
filters = self.filters + hints(view, "filters")
decorators = self.decorators + hints(view, "decorators")
for d in decorators:
dview = d(dview)
dview = self._add_before_hooks(dview, before_hooks)
dview = self._add_filters(dview, filters)
self._cached_dviews[view] = dview # type: ignore
return dview
def _add_before_hooks(self, view, hooks):
"""
Decorate ``view`` so that any 'before' hook functions are called.
"""
if not hooks:
return view
def view_with_before_hooks(*args, **kwargs):
for f in hooks:
r = f(*args, **kwargs)
if r is not None:
return r
return view(*args, **kwargs)
return view_with_before_hooks
def _add_filters(self, view, filters):
"""
Decorate ``view`` so that the result is passed through any available
filters.
"""
if not filters:
return view
def filtered_view(*args, **kwargs):
result = view(*args, **kwargs)
for f in filters:
result = f(result)
return result
return filtered_view
[docs] def bindto(self, instance):
"""\
Return copy of the route bound to a given instance.
Use this when traversing view classes.
"""
ob = copy(self)
ob._setinstance(instance)
return ob
def _setinstance(self, instance):
self.instance = instance
[docs] def add_prefix(self, path):
"""
Return a copy of the Route object with the given path prepended to the
routing pattern.
"""
newroute = object.__new__(self.__class__)
newroute.__dict__ = dict(self.__dict__, pattern=self.pattern.add_prefix(path))
return newroute
[docs] def path(self, *args, **kwargs):
"""\
Build the path corresponding to this route and return a tuple of:
``(<path>, <remaining args>, <remaining kwargs>)``
``remaining args`` and ``remaining kwargs`` are any values from
``*args`` and ``**kwargs`` not consumed during path construction.
See also :meth:`~fresco.routing.Pattern.pathfor`.
"""
request = kwargs.pop("request", None)
for k in self.routed_args_default:
if k not in kwargs:
v = self.routed_args_default[k]
if callable(v):
v = v(request)
kwargs[k] = v
return self.pattern.pathfor(*args, **kwargs)
[docs] def route_keys(self):
"""\
Generate keys by which the route should be indexed
"""
if self.name:
yield self.name
for method, viewspec in self.viewspecs.items():
yield viewspec
try:
view = self.getview(method)
except RouteNotReady:
continue
yield view
# Also return the underlying function object for python2 unbound
# methods
func_ob = getattr(view, "__func__", None)
if func_ob is not None:
yield func_ob
[docs] @classorinstancemethod
def before(self, func, *args, **kwargs):
"""
Call a function before passing the request to the view.
Route('/view-secret-page', GET, view_secret_page)\
.before(check_logged_in)
:param func: The function. Must accept the same arguments as the view
and may return either ``None`` or a
:class:`~fresco.response.Response` object.
If a response is returned the view will not be
invoked.
:param args: Extra positional args to pass to ``func``
:param kwargs: Extra keyword args to pass to ``func``
"""
if args or kwargs:
func = partial(func, *args, **kwargs)
if isinstance(self, Route):
self.before_hooks.append(func)
return self
else:
def _decorator(view):
self._add_route_hint(view, "before_hooks", func)
return view
return _decorator
[docs] @classorinstancemethod
def wrap(self, decorator, *args, **kwargs):
"""
Wrap the view function in a decorator. Can be chained for a fluid api::
Route('/user/<id:int>', GET, edituser)\
.wrap(require_ssl)
.wrap(logged_in)
"""
if args or kwargs:
decorator = partial(decorator, *args, **kwargs)
if isinstance(self, Route):
self.decorators.append(decorator)
return self
else:
def _decorator(func):
self._add_route_hint(func, "decorators", decorator)
return func
return _decorator
#: ``decorate`` is an alias for :meth:`~fresco.routing.Route.wrap`
decorate = wrap
[docs] @classorinstancemethod
def filter(self, func, *args, **kwargs):
"""
Filter the output of the view function through other functions::
Route('/user/<id:int>', GET, edituser)\
.wrap(require_ssl)
.filter(render, 'user.tmpl')
:param func: The filter function. Must accept the output of the view
and return the filtered response
:param args: Extra positional args to pass to ``func``
:param kwargs: Extra keyword args to pass to ``func``
"""
if args or kwargs:
func = partial(func, *args, **kwargs)
if isinstance(self, Route):
self.filters.append(func)
return self
else:
def _decorator(view):
self._add_route_hint(view, "filters", func)
return view
return _decorator
class RRoute(Route):
"""
A subclass of :class:`fresco.routing.Route` that always provides an initial
`request` argument to the view.
"""
provide_request = True
def split_iter(pattern, string):
"""
Generate alternate strings and match objects for all occurances of
``pattern`` in ``string``.
"""
matcher = pattern.finditer(string)
match = None
pos = 0
for match in matcher:
yield string[pos : match.start()]
yield match
pos = match.end()
yield string[pos:]
[docs]def routefor(viewspec, _app=None):
"""\
Convenience wrapper around :meth:`~fresco.routing.RouteCollection.urlfor`.
"""
return context.app.routefor(viewspec)
[docs]class RouteCollection(MutableSequence):
"""\
A collection of :class:`~fresco.routing.Route` objects, RouteCollections:
- Contain methods to configure routes, including the ability to
delegate URLs to other RouteCollections
- Can map from a request to a view
"""
route_class = Route
#: The minimum size for the ``(method, path) => route`` cache
min_route_cache_size = 20
#: The maximum size for the ``(method, path) => route`` cache
max_route_cache_size = 500
_route_cache = None
def __init__(self, routes=None, route_class=None, cache=True):
self.__routes__: List[Route] = []
self.__routed_views__: Dict[
Union[str, Callable], Union[Route, RouteNotFound]
] = {}
if cache:
self.reinit_route_cache()
self.route_class = route_class or self.route_class
if routes is not None:
for item in routes:
if isinstance(item, Route):
self.add_route(item)
elif hasattr(item, "__routes__"):
for r in item.__routes__:
if r.instance is None:
r = r.bindto(item)
self.add_route(r)
elif isinstance(item, t.Iterable):
for r in item:
self.add_route(r)
else:
raise TypeError(item)
if cache:
self.reinit_route_cache()
def __add__(self, other):
result = copy(self)
if isinstance(other, Route):
result.add_route(other)
else:
for item in other:
result.add_route(item)
return result
def __radd__(self, other):
if isinstance(other, Route):
return RouteCollection([other]) + self
elif isinstance(other, t.Iterable):
return RouteCollection(other) + self
raise TypeError("Cannot add %r to %r" % (other, self))
def __iter__(self):
return iter(self.__routes__)
def __getitem__(self, index):
if isinstance(index, slice):
return self.__class__(self.__routes__[index])
return self.__routes__[index]
def __setitem__(self, index, new):
self.__routes__.__setitem__(index, new)
if self._route_cache is not None:
self.reinit_route_cache()
def __delitem__(self, index):
self.__routes__.__delitem__(index)
if self._route_cache is not None:
self.reinit_route_cache()
def __len__(self):
return len(self.__routes__)
def reinit_route_cache(self):
cache_size = max(
min(self.max_route_cache_size, len(self.__routes__) * 4),
self.min_route_cache_size,
)
self._route_cache = make_cache(self._get_routes, cache_size)
[docs] def insert(self, position, item):
self.__routes__.insert(position, item)
if self._route_cache is not None:
self.reinit_route_cache()
def add_route(self, route):
self.__routes__.append(route)
if self._route_cache is not None:
self.reinit_route_cache()
[docs] def add_prefix(self, prefix):
"""
Return a copy of the RouteCollection with the given path prepended to
all routes.
"""
ob = copy(self)
ob.__routes__ = [route.add_prefix(prefix) for route in self]
return ob
def fallthrough_on(self, *status_codes):
ob = copy(self)
ob.__routes__ = [route.fallthrough_on(*status_codes) for route in self]
return ob
[docs] def pathfor(self, viewspec, *args, **kwargs):
"""\
Return the path component of the url for the given view name/function
spec.
:param viewspec: a view name, a reference in the form
``'package.module.viewfunction'``, or the view
callable itself.
"""
request = kwargs.pop("request", None)
if isinstance(viewspec, str) and ":" in viewspec:
viewspec, remainder = viewspec.split(":", 1)
delegated_route = self.routefor(viewspec)
p1, remainder_args, remainder_kwargs = delegated_route.path(*args, **kwargs)
factory_args = args[: -len(remainder_args)]
factory_kwargs = {
k: v for k, v in kwargs.items() if k not in remainder_kwargs
}
for k in delegated_route.routed_args_default:
if k not in factory_kwargs:
v = delegated_route.routed_args_default[k]
if callable(v):
v = v(request)
factory_kwargs[k] = v
rc = delegated_route.routecollectionfactory( # type: ignore
*factory_args, **factory_kwargs
)
p2 = rc.pathfor(remainder, request=request, *args, **kwargs)
return p1 + p2
return self.routefor(viewspec).path(request=request, *args, **kwargs)[0]
[docs] def routefor(self, viewspec: Union[Callable, str]) -> Route:
"""
Return the :class:`~fresco.routing.Route` instance associated with
``viewspec``.
Views may have multiple routes bound, in this case the first bound
route will always take precedence.
This method does not resolve delegated routes.
:param viewspec: a view callable or a string in the form
'package.module.viewfunction'
"""
try:
route_or_rnf = self.__routed_views__[viewspec]
if isinstance(route_or_rnf, RouteNotFound):
raise route_or_rnf
return route_or_rnf
except KeyError:
pass
for route in self.__routes__:
for k in route.route_keys():
self.__routed_views__.setdefault(k, route)
try:
route_or_rnf = self.__routed_views__[viewspec]
if isinstance(route_or_rnf, RouteNotFound):
raise route_or_rnf
return route_or_rnf
except KeyError:
pass
if not isinstance(viewspec, str):
exc = self.__routed_views__[viewspec] = RouteNotFound(viewspec)
raise exc
modname, symbols = viewspec, []
while True:
try:
modname, sym = modname.rsplit(".", 1)
except ValueError:
exc = self.__routed_views__[viewspec] = RouteNotFound(viewspec)
raise exc
symbols.append(sym)
module = sys.modules.get(modname, None)
if module:
ob = module
for s in reversed(symbols):
ob = getattr(ob, s)
if callable(ob):
route_or_rnf = self.__routed_views__[ob]
self.__routed_views__[viewspec] = route_or_rnf
if isinstance(route_or_rnf, RouteNotFound):
raise route_or_rnf
return route_or_rnf
else:
exc = self.__routed_views__[viewspec] = RouteNotFound(viewspec)
raise exc
def _get_routes(self, key):
method, path = key
routes = ((r, r.match(path, method)) for r in self.__routes__)
return [(r, t) for (r, t) in routes if t is not None]
[docs] def get_route_traversals(
self, path: str, method: Optional[str], request: Optional[Request] = None
) -> t.Iterator[RouteTraversal]:
"""
Generate RouteTraversals for routes matching the given path and
method::
for rt in routecollection.get_route_traversals('/foo/bar', GET):
print("Route is", rt.route)
print("Arguments extracted from the path:", rt.args, rt.kwargs)
print("RouteCollections are:", rt.collections_traversed)
:param path: the URL path to match (usually this is ``PATH_INFO``)
:param method: the HTTP method to match (usually this is
``REQUEST_METHOD``). May be ``None``, in which case
matching will be performed on the ``path`` argument
only.
:param request: a :class:`~fresco.request.Request` object
:return: A generator yielding ``RouteTraversal`` objects
"""
if self._route_cache:
routes, exc_info = self._route_cache((method, path))
if exc_info is not None:
raise exc_info[1].with_traceback(exc_info[2])
else:
routes = self._get_routes((method, path))
for route, result in routes:
if request and route.predicate and not route.predicate(request):
continue
# View function arguments extracted while traversing the path
traversal_args, traversal_kwargs = result.args, result.kwargs
# Process any args/kwargs defined in the Route declaration.
if request:
route_args = tuple(
(item(request) if isinstance(item, RouteArg) else item)
for item in route.view_args
)
route_kwargs = {
k: v(request) if isinstance(v, RouteArg) else v
for k, v in route.view_kwargs.items()
}
else:
route_args = route.view_args
route_kwargs = route.view_kwargs
args = route_args + traversal_args
if route.provide_request:
args = (request,) + args
kwargs = dict(traversal_kwargs, **route_kwargs)
if isinstance(route, DelegateRoute):
try:
sub_routes = route.routecollectionfactory(*args, **kwargs)
except ResponseException as e:
exc = e
def raiser(*args, **kwargs):
raise exc
r = self.route_class("/", ALL_METHODS, raiser)
yield RouteTraversal(r, (), {}, [(self, "", r)])
continue
for sub in sub_routes.get_route_traversals(
result.path_remaining, method, request
):
traversed = [
TraversedCollection(
self,
"",
route,
args,
kwargs,
traversal_args,
traversal_kwargs,
)
]
traversed.extend(
TraversedCollection(c, result.path_matched + p, r, a, k, ta, tk)
for c, p, r, a, k, ta, tk in sub.collections_traversed
)
# Dynamic routes consume their arguments when creating the
# sub RouteCollection.
if route.dynamic:
yield RouteTraversal(sub.route, sub.args, sub.kwargs, traversed)
else:
yield RouteTraversal(
sub.route,
args + sub.args,
dict(kwargs, **sub.kwargs),
traversed,
)
else:
yield RouteTraversal(
route,
args,
kwargs,
[
TraversedCollection(
self,
"",
route,
args,
kwargs,
traversal_args,
traversal_kwargs,
)
],
)
def get_routes(
self, path: str, method: str, request: Optional[Request] = None
) -> t.Iterable[RouteTraversal]:
warnings.warn(
"RouteCollection.get_routes is deprecated and will be removed "
"in a future version. Use get_route_traversals instead",
DeprecationWarning,
)
return self.get_route_traversals(path, method, request)
[docs] def route(
self,
pattern: t.Union[str, Pattern],
methods: t.Optional[t.Union[str, t.Iterable[str]]] = None,
view: t.Optional[t.Union[str, t.Callable]] = None,
*args,
route_class: t.Optional[t.Type[Route]] = None,
**kwargs,
):
"""
Match a URL pattern to a view function. Can be used as a function
decorator, in which case the ``view`` parameter should not be passed.
:param pattern: A string that can be compiled into a path pattern
:param methods: A list of HTTP methods the view is bound to
:param view: The view function. If not specified a function
decorator will be returned.
Other parameters are as for the :class:`Route` constructor.
"""
# Catch the common error of not providing a valid method
if methods and not isinstance(methods, t.Iterable):
raise TypeError("HTTP methods must be specified as a string or iterable")
# Called as a decorator?
if methods is not None and view is None:
def route_decorator(func):
self.add_route(
self.route_class(pattern, methods, func, *args, **kwargs)
)
return func
return route_decorator
cls = route_class if route_class is not None else self.route_class
route = cls(pattern, methods, view, *args, **kwargs)
self.add_route(route)
return route
[docs] def route_wsgi(
self,
path: str,
wsgiapp: t.Union[t.Callable, str],
rewrite_script_name: bool = True,
*args,
**kwargs,
):
"""
Route requests to ``path`` to the given WSGI application
:param path: the mount point for the app
:param wsgiapp: the WSGI callable, or the path to a callable in the
format ``myapp.module.do_wsgi``.
:param rewrite_script_name: if ``True`` (the default),
the mount point specified by ``path`` will
be shifted off ``PATH_INFO`` and into the
``SCRIPT_NAME`` environ key.
"""
def fake_start_response(status, headers, exc_info=None):
return None
if path.encode("ascii", "ignore").decode("ascii") == path:
ws_path = path
else:
ws_path = None
resolved_wsgi_app = None
def fresco_wsgi_view(
request,
path=path,
ws_path=ws_path,
rewrite_script_name=rewrite_script_name,
fake_start_response=fake_start_response,
make_response=Response.from_wsgi,
):
nonlocal resolved_wsgi_app
environ = request.environ
ws_SCRIPT_NAME = environ["SCRIPT_NAME"]
shift_script_name = rewrite_script_name and not (
path == "/" and ws_SCRIPT_NAME == ""
)
if shift_script_name:
if ws_path is None:
ws_path = path.encode(request.charset).decode("ISO-8859-1")
environ = environ.copy()
environ["SCRIPT_NAME"] = ws_SCRIPT_NAME + ws_path
environ["PATH_INFO"] = environ["PATH_INFO"][len(ws_path) :]
if resolved_wsgi_app is None:
if isinstance(wsgiapp, str):
if "." in wsgiapp:
mod_name, attr_name = wsgiapp.rsplit(".", 1)
mod = import_module(mod_name)
resolved_wsgi_app = getattr(mod, attr_name)
else:
resolved_wsgi_app = wsgiapp
return make_response(resolved_wsgi_app, environ, fake_start_response)
return self.route_all(
path, ALL_METHODS, fresco_wsgi_view, route_class=RRoute, *args, **kwargs
)
[docs] def route_all(
self, path, methods=None, view=None, *args, route_class=None, **kwargs
):
"""
Expose a view for all URLs starting with ``path``.
:param path: the path prefix at which the view will be routed
"""
return self.route(
MatchAllURLsPattern(path),
methods,
view,
route_class=route_class,
*args,
**kwargs,
)
[docs] def include(self, path, views, fallthrough_on=None):
"""
Include a view collection at the given path.
The included view collection's url properties will be modified to
generate the prefixed URLs.
:param path: Path at which to include the views
:param views: Any collection of views (must expose a ``__routes___``
attribute)
:param fallthrough_on: a fallthrough_on argument,
passed to :class:~`fresco.routing.Route`
"""
routes = list(r.add_prefix(path) for r in views.__routes__)
for r in routes:
if r.instance is None:
r = r.bindto(views)
if fallthrough_on:
r = r.fallthrough_on(fallthrough_on)
self.add_route(r)
[docs] def delegate(self, path, app, dynamic=False, *args, **kwargs):
"""\
Delegate all requests under ``path`` to ``app``
:param path: the path prefix at which the app will be available
:param app: a FrescoApp instance
"""
route = DelegateRoute(path, app, dynamic, *args, **kwargs)
self.add_route(route)
return route
[docs] def replace(self, viewspec, newroute):
"""
Replace the route(s) identified by ``viewspec`` with a new
Route.
:param viewspec: The routed view callable, or its fully qualified name
('package.module.view_function'), or the name of a
named route
:param newroute: The replacement route. This may be None, in which
case the route will be removed without replacement
"""
route = self.routefor(viewspec)
if newroute:
position = self.__routes__.index(route)
self.__routes__[position] = newroute
else:
self.__routes__.remove(route)
for k, r in list(self.__routed_views__.items()):
if r is route:
del self.__routed_views__[k]
[docs] def remove(self, viewspec):
"""
Remove the route(s) identified by ``viewspec``
:param viewspec: The routed view callable, or its fully qualified name
('package.module.view_function'), or the name of a
named route
"""
self.replace(viewspec, None)
[docs]class DelegateRoute(Route):
"""\
A specialisation of Route that is used to delegate a path prefix to another
route collection.
"""
def __init__(self, prefix, view, dynamic=False, *args, **kwargs):
pattern = ExtensiblePattern(prefix, match_entire_path=False)
self.dynamic = dynamic
if self.dynamic:
self.routecollectionfactory = self._dynamic_routecollectionfactory
else:
self.routecollectionfactory = self._static_routecollectionfactory
if not isinstance(view, RouteCollection):
routes = [
r.bindto(view) if r.instance is None else r for r in view.__routes__
]
view = RouteCollection(routes)
super(DelegateRoute, self).__init__(pattern, ALL_METHODS, view, *args, **kwargs)
def _dynamic_routecollectionfactory(self, *args, **kwargs):
"""\
Return the RouteCollection responsible for paths under this route
"""
routes = self.getdecoratedview(self.getview(GET))(*args, **kwargs)
if isinstance(routes, RouteCollection):
return routes
return RouteCollection(
(r.bindto(routes) for r in routes.__routes__), cache=False
)
def _static_routecollectionfactory(self, *args, **kwargs):
return self.getview(GET)
def register_converter(name, registry=ExtensiblePattern):
"""
Class decorator that registers a converter class for use with
ExtensiblePattern.
Example::
>>> @register_converter('hex')
... class HexStringConverter(Converter):
... pattern = r'[a-f0-9]+'
...
"""
def register_converter(cls):
registry.register_converter(name, cls)
return cls
return register_converter