# Copyright 2015 Oliver Cope
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Utilities for interfacing with WSGI
"""
import logging
from functools import partial
from io import BytesIO
from urllib.parse import unquote
from urllib.parse import urlparse
from typing import List
from typing import Optional
from typing import Tuple
import sys
from fresco.typing import ExcInfoTuple
from fresco.typing import HeadersList
from fresco.typing import WSGICallable
logger = logging.getLogger(__name__)
__all__ = [
"environ_to_str",
"str_to_environ",
"environ_to_bytes",
"bytes_to_environ",
"StartResponseWrapper",
"ClosingIterator",
"make_environ",
]
#: The core keys expected in the WSGI environ dict, as defined by PEP3333
WSGI_KEYS = set(
[
"REQUEST_METHOD",
"SCRIPT_NAME",
"PATH_INFO",
"QUERY_STRING",
"CONTENT_TYPE",
"CONTENT_LENGTH",
"SERVER_NAME",
"SERVER_PORT",
"SERVER_PROTOCOL",
]
)
#: List of standard request header field names
#: Taken from the list available at
#: http://en.wikipedia.org/wiki/List_of_HTTP_header_fields which combines
#: RFC specified headers with commonly used non-standard header names
REQUEST_HEADER_NAMES = {
"accept": "Accept",
"accept_charset": "Accept-Charset",
"accept_encoding": "Accept-Encoding",
"accept_language": "Accept-Language",
"accept_datetime": "Accept-Datetime",
"authorization": "Authorization",
"cache_control": "Cache-Control",
"connection": "Connection",
"cookie": "Cookie",
"content_length": "Content-Length",
"content_md5": "Content-MD5",
"content_type": "Content-Type",
"date": "Date",
"expect": "Expect",
"from": "From",
"host": "Host",
"if_match": "If-Match",
"if_modified_since": "If-Modified-Since",
"if_none_match": "If-None-Match",
"if_range": "If-Range",
"if_unmodified_since": "If-Unmodified-Since",
"max_forwards": "Max-Forwards",
"origin": "Origin",
"pragma": "Pragma",
"proxy_authorization": "Proxy-Authorization",
"range": "Range",
"referer": "Referer",
"te": "TE",
"user_agent": "User-Agent",
"upgrade": "Upgrade",
"via": "Via",
"warning": "Warning",
"common": "Common",
"field": "Field",
"x_requested_with": "X-Requested-With",
"dnt": "DNT",
"x_forwarded_for": "X-Forwarded-For",
"x_forwarded_host": "X-Forwarded-Host",
"x_forwarded_host": "X-Forwarded-Host",
"x_forwarded_proto": "X-Forwarded-Proto",
"front_end_https": "Front-End-Https",
"x_http_method_override": "X-Http-Method-Override",
"x_att_deviceid": "X-ATT-DeviceId",
"x_wap_profile": "X-Wap-Profile",
"proxy_connection": "Proxy-Connection",
}
[docs]def environ_to_str(s, enc="UTF-8"):
"""
Decode a WSGI environ value to a string
"""
return s.encode("iso-8859-1").decode(enc)
[docs]def str_to_environ(s, enc="UTF-8"):
"""
Return a string encoded for a WSGI environ value
This returns a 'bytes-as-unicode' string:
- encode ``s`` using the specified encoding (eg utf8)
- decode the resulting byte string as latin-1
"""
return s.encode(enc, "surrogateescape").decode("iso-8859-1")
def unicode_to_environ(s, enc="UTF-8"):
import warnings
warnings.warn(
"unicode_to_environ is deprecated, use environ_to_str instead",
DeprecationWarning,
)
return environ_to_str(s, enc)
[docs]def environ_to_bytes(s):
"""
Decode a WSGI environ value to a bytes object
"""
return s.encode("latin1")
[docs]def bytes_to_environ(s):
"""
Encode a byte string to a WSGI environ value
This returns a 'bytes-as-unicode' string.
"""
return s.decode("latin1")
def getenv(environ, key, default=None):
"""
Return the named ``key`` from the WSGI environ dict ``environ`` as
a byte string.
"""
try:
return environ[key].encode("ISO-8859-1")
except KeyError:
return default
def setenv(environ, key, value):
"""
Set the named ``key`` to ``value`` in the WSGI environ dict ``environ``.
Value must be a byte ``str`` and will be encoded according
to the rules in PEP-3333.
"""
environ[key] = value.decode("ISO-8859-1")
[docs]class StartResponseWrapper(object):
"""\
Wrapper class for the ``start_response`` callable, allowing middleware
applications to intercept and interrogate the proxied start_response
arguments.
Synopsis::
>>> def my_wsgi_app(environ, start_response):
... start_response('200 OK', [('Content-Type', 'text/plain')])
... return ['Whoa nelly!']
...
>>> def my_other_wsgi_app(environ, start_response):
... responder = StartResponseWrapper(start_response)
... result = my_wsgi_app(environ, responder)
... print("Got status", responder.status)
... print("Got headers", responder.headers)
... responder.call_start_response()
... return result
...
>>> from flea import Agent
>>> result = Agent(my_other_wsgi_app).get('/')
Got status 200 OK
Got headers [('Content-Type', 'text/plain')]
See also ``Response.from_wsgi``, which takes a wsgi callable, environ and
start_response, and returns a ``Response`` object, allowing the client to
further interrogate and customize the WSGI response.
Note that it is usually not advised to use this directly in middleware as
start_response may not be called directly from the WSGI application, but
rather from the iterator it returns. In this case the middleware may need
logic to accommodate this. It is usually safer to use
``Response.from_wsgi``, which takes this into account.
"""
def __init__(self, start_response):
self.start_response = start_response
self.status = None
self.headers: List[str] = []
self.called = False
self.buf = BytesIO()
self.exc_info = None
def __call__(self, status, headers, exc_info=None):
"""
Dummy WSGI ``start_response`` function that stores the arguments for
later use.
"""
self.status = status
self.headers = headers
self.exc_info = exc_info
self.called = True
return self.buf.write
[docs] def call_start_response(self):
"""
Invoke the wrapped WSGI ``start_response`` function.
"""
try:
write = self.start_response(self.status, self.headers, self.exc_info)
write(self.buf.getvalue())
return write
finally:
# Avoid dangling circular ref
self.exc_info = None
[docs]class ClosingIterator(object):
"""\
Wrap a WSGI iterator to allow additional close functions to be called on
application exit.
Synopsis::
>>> class filelikeobject(object):
...
... def read(self):
... print("file read!")
... return ''
...
... def close(self):
... print("file closed!")
...
>>> def app(environ, start_response):
... f = filelikeobject()
... start_response('200 OK', [('Content-Type', 'text/plain')])
... return ClosingIterator(iter(f.read, ''), f.close)
...
>>> from flea import Agent
>>> m = Agent(app).get('/')
file read!
file closed!
"""
__slots__ = ("_iterable", "_next", "_close_funcs", "_closed")
def __init__(self, iterable, *close_funcs):
"""
Initialize a ``ClosingIterator`` to wrap iterable ``iterable``, and
call any functions listed in ``*close_funcs`` on the instance's
``.close`` method.
"""
self._iterable = iterable
self._next = partial(next, iter(iterable))
self._close_funcs = close_funcs
iterable_close = getattr(self._iterable, "close", None)
if iterable_close is not None:
self._close_funcs = (iterable_close,) + close_funcs
self._closed = False
def __iter__(self):
"""
``__iter__`` method
"""
return self
def __next__(self):
"""
Return the next item from the iterator
"""
return self._next()
[docs] def close(self):
"""
Call all close functions listed in ``*close_funcs``.
"""
for func in self._close_funcs:
try:
func()
except Exception:
logger.exception(
"ContentIterator close function {func!r} raised an exception"
)
self._closed = True
def __del__(self):
"""
Emit a warning if the iterator is deleted without ``close`` having been
called.
"""
if not self._closed:
try:
import warnings
except ImportError:
return
else:
warnings.warn("%r deleted without close being called" % (self,))
[docs]def make_environ(url="/", environ=None, wsgi_input=b"", **kwargs):
"""
Return a WSGI environ dict populated with values modelling the specified
request url and data.
:param url: The URL for the request,
eg ``/index.html`` or ``/search?q=foo``.
Note that ``url`` must be properly URL encoded.
:param environ: values to pass into the WSGI environ dict
:param wsgi_input: The input stream to use in the ``wsgi.input``
key of the environ dict
:param kwargs: additional keyword arguments will be passed into the
WSGI request environment
"""
url = urlparse(url)
netloc = url.netloc
user = ""
if "@" in netloc:
user, netloc = netloc.split("@", 1)
if ":" in user:
user = user.split(":")[0]
if isinstance(wsgi_input, bytes):
wsgi_input = BytesIO(wsgi_input)
env_overrides = environ or {}
for key, value in kwargs.items():
key = key.replace("wsgi_", "wsgi.")
if "." not in key:
if value is None:
continue
# Convert core WSGI keys to upper case
if key.upper() in WSGI_KEYS:
key = key.upper()
# Convert header names to form HTTP_USER_AGENT
elif key.lower() in REQUEST_HEADER_NAMES:
key = "HTTP_" + key.upper()
# value must be a python native str, whatever that means
if not isinstance(value, str):
value = str_to_environ(value)
env_overrides[key] = value
environ = {
"REQUEST_METHOD": "GET",
"SERVER_NAME": "localhost",
"SERVER_PORT": "80",
"SERVER_PROTOCOL": "HTTP/1.0",
"HTTP_HOST": netloc or "localhost",
"SCRIPT_NAME": "",
"PATH_INFO": str_to_environ(unquote(url.path)),
"REMOTE_ADDR": "127.0.0.1",
"wsgi.version": (1, 0),
"wsgi.url_scheme": url.scheme or "http",
"wsgi.input": wsgi_input,
"wsgi.errors": sys.stderr,
"wsgi.multithread": True,
"wsgi.multiprocess": True,
"wsgi.run_once": False,
}
if url.scheme == "https":
environ["HTTPS"] = "on"
environ["SERVER_PORT"] = "443"
if user:
environ["REMOTE_USER"] = user
if url.query:
environ["QUERY_STRING"] = url.query
environ.update(env_overrides)
for k, v in kwargs.items():
if v is None and "." not in k:
del environ[k]
return environ
def apply_request(
request, wsgicallable: WSGICallable
) -> Tuple[str, HeadersList, Optional[ExcInfoTuple], List[bytes]]:
"""
Execute ``wsgicallable`` with the given request, exhaust and close the
content iterator and return the result.
"""
_start_response_result: List[Tuple[str, HeadersList, Optional[ExcInfoTuple]]] = []
def start_response(
status: str, headers: HeadersList, exc_info: Optional[ExcInfoTuple] = None
):
_start_response_result.append((status, headers, exc_info))
contentiter = wsgicallable(request.environ, start_response)
assert len(_start_response_result) == 1
content = list(contentiter)
if hasattr(contentiter, "close"):
contentiter.close() # type: ignore
return _start_response_result[0] + (content,)