# encoding=UTF-8
# Simple WSGI A/B testing.
#
# (c) 2010-2018 Oliver Cope.
#
# See ``README.rst`` for usage instructions etc.
import base64
import logging
import math
import os
import re
from collections import OrderedDict
from random import Random
from struct import unpack_from
from time import time
from hashlib import md5
from functools import partial
from urllib.parse import quote_plus
from importlib import resources
from fresco import FrescoApp, Request, Response
from fresco.cookie import Cookie
from fresco_static import StaticFiles
from fresco_template import Piglet
from piglet import TemplateLoader
logger = logging.getLogger(__name__)
templateloader = TemplateLoader([str(resources.files(__name__) / "templates")])
render = Piglet(templateloader)
__version__ = "0.2.3"
is_bot_ua = re.compile(
"|".join(
[
r"findlinks",
r"ia_archiver",
r"ichiro",
r".*bot\b",
r".*crawler\b",
r".*spider\b",
r".*seeker\b",
r".*fetch[oe]r\b",
r".*wget",
r".*plukkie",
r".*Nutch",
r".*InfoLink",
r".*indy library",
r".*yandex",
r".*ezooms\b",
r".*jeeves\b",
r".*mediapartners-google",
r".*jakarta Commons",
r".*java/",
r".*mj12",
r".*speedy",
r".*bot_",
r".*pubsub",
r".*facebookexternalhit",
r".*feedfetcher-Google",
r".*pflab",
r".*metauri",
r".*shopwiki",
r".*libcurl",
r".*resolver",
r".*service",
r".*postrank",
r"^.{0,4}$",
]
),
re.I,
).match
[docs]
class Swab(object):
"""
Simple WSGI A/B testing
"""
def __init__(self, datadir, wsgi_mountpoint="/swab"):
"""
Create a new Swab test object
:param datadir: Path to data storage directory
:param wsgi_mountpoint: The path swab-specific views and resources will
be served from by the middleware.
"""
self.datadir = datadir
#: Mapping of {<experiment name>: <Experiment object>}
self.experiments = {}
#: Mapping of {<goal name>: [<Experiment object>, ...]}
self.experiments_by_goal = {}
self.wsgi_mountpoint = wsgi_mountpoint
makedir(self.datadir)
def include(self, environ, experiment):
e = self.experiments[experiment]
return e.include(environ) and not e.exclude(environ)
[docs]
def middleware(self, app, cookie_domain=None, cookie_path=None, cache_control=True):
"""
Middleware that sets a random identity cookie for tracking users.
The identity can be overwritten by setting ``environ['swab.id']``
before start_response is called. On egress this middleware will then
reset the cookie if required.
:param app: The WSGI application to wrap
:param cookie_domain: The domain to use when setting cookies. If
``None`` this will not be set and the browser
will default to the domain used for the request.
:param cache_control: If ``True``, replace the upstream application's
cache control headers for any request where
show_variant is invoked.
"""
swabapp = FrescoApp()
static_files = StaticFiles()
static_files.init_app(swabapp)
static_files.add_package(__name__, "static")
swabapp.route_wsgi("/results", self.results_app)
swabapp.route_wsgi("/r.js", self.record_trial_app)
def middleware(environ, start_response):
environ["swab.swab"] = self
environ["swab.id"] = initswabid = _getswabid(environ)
environ["swab.invoked"] = False
environ["swab.experiments_invoked"] = set()
if initswabid is None:
environ["swab.id"] = generate_id()
if (
environ["PATH_INFO"][: len(self.wsgi_mountpoint)]
== self.wsgi_mountpoint
):
environ["SCRIPT_NAME"] += self.wsgi_mountpoint
environ["PATH_INFO"] = environ["PATH_INFO"][len(self.wsgi_mountpoint) :]
return swabapp(environ, start_response)
def my_start_response(
status,
headers,
exc_info=None,
_cache_headers=set(
["cache-control", "expires", "etag", "last-modified"]
),
):
if not environ["swab.invoked"]:
return start_response(status, headers, exc_info)
swabid = _getswabid(environ)
if swabid == initswabid and swabid is not None:
return start_response(status, headers, exc_info)
if swabid is None:
swabid = generate_id()
environ["swab.id"] = swabid
_cookie_path = cookie_path or environ.get("SCRIPT_NAME") or "/"
cookie = Cookie(
"swab",
swabid,
path=_cookie_path,
domain=cookie_domain,
httponly=True,
max_age=86400 * 365,
)
if cache_control and environ.get("swab.experiments_invoked"):
headers = [
(k, v) for k, v in headers if k.lower() not in _cache_headers
]
headers.append(("Cache-Control", "no-cache"))
headers.append(("Set-Cookie", str(cookie)))
return start_response(status, headers, exc_info)
return app(environ, my_start_response)
return middleware
def add_goals(self, names):
for n in names:
self.experiments_by_goal.setdefault(n, set())
def add_experiment(self, name, variants=None, goal=None):
exp = self.experiments[name] = Experiment(name)
if variants:
exp.add(*variants)
goal = goal if goal is not None else name
self.add_goals({goal})
self.experiments_by_goal[goal].add(exp)
makedir(os.path.join(self.datadir, name))
return exp
def results_app(self, environ, start_response):
request = Request(environ)
dedupe = not bool(request.get("nodedupe"))
data = self.collect_experiment_data(dedupe=dedupe)
for exp in data:
vdata = data[exp]["variants"]
control = request.get("control." + exp, self.experiments[exp].control)
control_trials = vdata[control]["trials"]
control_goals = vdata[control]["goals"]
data[exp]["control"] = control
for variant in vdata:
variant_trials = vdata[variant]["trials"]
for goal, goaldata in vdata[variant]["goals"].items():
control_rate = control_goals[goal]["rate"]
control_conversions = control_goals[goal]["conversions"]
z = zscore(
goaldata["rate"], variant_trials, control_rate, control_trials
)
goaldata["z"] = z
goaldata["confidence"] = cumulative_normal_distribution(z)
goaldata["p_beats_control"] = probability_b_beats_a(
control_trials,
control_conversions,
variant_trials,
goaldata["conversions"],
)
return render.as_response(
"results.html",
{
"request": request,
"experiments": self.experiments.values(),
"data": data,
"dedupe": dedupe,
},
)(environ, start_response)
def record_trial_app(self, environ, start_response):
request = Request(environ)
experiment = request.query.get("e")
try:
record_trial(environ, experiment)
except KeyError:
pass
return Response([], cache_control="no-cache")(environ, start_response)
[docs]
def collect_experiment_data(self, dedupe=False):
"""
Collect experiment data from the log files
Return a dictionary of::
{<experiment>: {
'goals': [goal1, goal2, ...],
'variants': {
'v1': {
'trials': 1062,
'goals': {
'goal1': {'conversions': 43, 'rate': 0.0405},
'goal2': {'conversions': 29, 'rate': 0.0273},
}
},
...
}
}
"""
data = {}
for exp in self.experiments.values():
expdir = os.path.join(self.datadir, exp.name)
goals = sorted(
[
goal
for goal, experiments in self.experiments_by_goal.items()
if exp in experiments
]
)
data[exp.name] = {"goals": goals, "variants": {}}
for variant in exp.variants:
path = partial(os.path.join, expdir, variant)
if dedupe:
trial_identities = get_identities(path("__all__"))
trialc = len(trial_identities)
else:
trialc = count_entries(path("__all__"), dedupe=False)
data[exp.name]["variants"][variant] = vdata = {
"trials": trialc,
"goals": {},
}
for goal in goals:
vdata["goals"][goal] = goaldata = {}
if dedupe:
conv_identities = trial_identities.intersection(
get_identities(path(goal))
)
convc = len(conv_identities)
else:
convc = count_entries(path(goal), dedupe=False)
goaldata["conversions"] = convc
goaldata["rate"] = float(convc) / trialc if trialc else float("nan")
return data
class Experiment(object):
invalid_chars = r"\\/:,"
def __init__(self, name, seed_strategy=None):
if not self._is_valid_name(name):
raise ValueError(
"Experiment name may not contain any of {!r}".format(
set(self.invalid_chars)
)
)
self.name = name
#: The control variant name
self.control = None
#: Mapping of variant name → weight
self.variants = OrderedDict()
#: The default variant (typically shown to bots)
self.default_variant = None
#: Function returning a the RNG seed to use
self.seed_strategy = (
seed_strategy if seed_strategy is not None else default_seed_strategy
)
def add(self, *variants):
"""
Add variants. Each argument may be either a variant name, or a tuple
of (variant-name, weight). For example:
colors = Experiment('colors')
colors.add('blue', 'red') # equal split between blue/red
colors.add(('blue', 1), ('red', 3)) # red 3x more likely than blue
"""
for v in variants:
if isinstance(v, str):
name, weight = v, 1
else:
name, weight = v
if not self._is_valid_name(name):
raise ValueError(
"Variant name may not contain any of {!r}".format(
set(self.invalid_chars)
)
)
if self.control is None:
self.control = name
self.variants[name] = self.variants.get(name, 0) + 1
self._recalculate()
def _recalculate(self):
"""
Recalculate weighted_variants and default_variant
"""
self.weighted_variants = [
n for n in self.variants for i in range(self.variants[n])
]
default = self.control
default_weight = self.variants[self.control]
for name, weight in self.variants.items():
if weight > default_weight:
default, default_weight = name, weight
self.default_variant = default
def _is_valid_name(self, name):
return not any(c in name for c in self.invalid_chars)
def include(self, environ):
return True
def exclude(self, environ):
return False
def _getswabid(environ):
"""
Return the unique identifier from the WSGI environment if present,
otherwise return ``None``.
"""
try:
return environ["swab.id"]
except KeyError:
pass
swabid = Request(environ).cookies.get("swab")
if swabid:
environ["swab.id"] = swabid
return swabid
[docs]
def generate_id(urandom=os.urandom, encode=base64.b64encode):
"""
Return a unique id
"""
return encode(urandom(12)).strip().decode("ascii")
[docs]
def get_rng(environ, experiment, swabid):
"""
Return a random number generator with a fixed seed based on the
current session's swabid
"""
r = Random()
r.seed(experiment.seed_strategy(environ, experiment, swabid))
return r
[docs]
def get_seed_from_bytes(s):
"""
Given a byte string, return a RNG seed value
"""
return unpack_from("l", md5(s).digest())[0]
def default_seed_strategy(environ, experiment, swabid):
return get_seed_from_bytes(swabid.encode("ascii") + experiment.name.encode("UTF-8"))
[docs]
def show_variant(environ, experiment, record=False, variant=None):
"""
Return the variant name that ``environ`` is assigned to within
``experiment``
If ``record`` is true, write a line to the log file indicating that the
variant was shown. (No deduping is done - the log line is always written. A
page with ``show_variant`` might record multiple hits on reloads etc)
:param experiment: Name of the experiment
:param environ: WSGI environ
:param record: If ``True``, record a trial for the experiment in the log
file
:param variant: force the named variant. Use this if your application wants
to choose the variant based on some other criteria
(eg SEO a/b testing where you assign the variant based
on the URL)
"""
swab = environ["swab.swab"]
exp = swab.experiments[experiment]
variants = exp.weighted_variants
swabid = _getswabid(environ)
if not swab.include(environ, experiment):
variant = variant if variant is not None else exp.control
# Make sure bots don't receive randomized variants. Google's advice
# (as of August 2017): show the version the majority of users see.
# Failing that, show a consistent version (don't randomize)
# https://www.youtube.com/watch?v=EaSyuH2D7Mw&start=1920
if is_bot(environ):
variant = variant if variant is not None else exp.control
if variant is None:
request = Request(environ)
variant = request.query.get("swab." + experiment)
if variant is not None and variant in variants:
return variant
# Get a random int in the range 0 ≤ x < len(variants)
#
# We do this in preference to calling random.choice because
# it guarantees a particular property of the sampling if the list of
# variants changes.
#
# For example, given a list of variants ``['a', 'b']``
# we will choose a variant according to the output of
# `r = rng.random()` such that
#
# 0.0 ≤ r < 0.5 → 'a'
# 0.5 ≤ r < 1.0 → 'b'
#
# if later we find that 'a' is winning and want to exploit that
# variant, we could change the list of variants to
# ``['a', 'a', 'a', 'b']``, and the mapping would become:
#
# 0.0 ≤ r < 0.75 → 'a'
# 0.75 ≤ r < 1.0 → 'b'
#
# Notice that the range corresponding to variant 'a' completely
# contains the old values - ie users who previously saw the winning
# variant 'a' will continue to see that variant.
r = int(get_rng(environ, exp, swabid).random() * len(variants))
variant = variants[r]
if variant not in variants:
raise ValueError(
"Invalid variant {!r}. Choices are: {!r}".format(variant, variants)
)
environ["swab.invoked"] = True
invoked = environ.setdefault("swab.experiments_invoked", set())
if experiment in invoked:
return variant
invoked.add(experiment)
if not record or is_bot(environ):
return variant
path = os.path.join(swab.datadir, experiment, variant, "__all__")
try:
f = open(path, "a")
except IOError:
makedir(os.path.dirname(path))
f = open(path, "a")
try:
f.write(_logline(swabid))
finally:
f.close()
return variant
record_trial = partial(show_variant, record=True)
def record_trial_tag(environ, experiment=None):
experiments = environ.get("swab.experiments_invoked", frozenset())
if experiment is None:
if len(experiments) != 1:
raise ValueError(
"record_trial_tag can't guess the experiment "
"name, as show_variant has been called multiple "
"times (or not at all). "
"Fix this by passing an experiment name. "
)
experiment = next(iter(experiments))
if experiment not in experiments:
logger.warning(
"record_trial_tag called without a corresponding show_variant "
"(experiment=%r)",
experiment,
)
request = Request(environ)
if "swab." + experiment in request.query:
return ""
swab = environ["swab.swab"]
return (
"<script>"
"(function(f,o,x){{"
"x=f.getElementsByTagName(o)[0],"
"o=f.createElement(o),"
"o.async=1,"
'o.src="{0}?e={1};s={2}";'
'x.parentNode.insertBefore(o,x)}})(document,"script")'
"</script>"
).format(
request.make_url(path=swab.wsgi_mountpoint + "/" + "r.js", query=""),
quote_plus(experiment),
_getswabid(environ),
)
[docs]
def is_bot(environ, is_bot_ua=is_bot_ua):
"""
Return True if the request is from a bot.
Uses rather simplistic tests based on user agent and header signatures, but
should still catch most well behaved bots.
"""
if is_bot_ua(environ.get("HTTP_USER_AGENT", "")):
return True
if "HTTP_ACCEPT_LANGUAGE" not in environ:
return True
return False
def _logline(swabid):
return "%-14.2f:%s\n" % (time(), swabid)
def _logentries(path):
with open(path, "r") as f:
for line in f:
try:
t, id = line.strip().split(":")
except ValueError:
continue
yield float(t.strip()), id
[docs]
def record_goal(environ, goal, experiment=None):
"""
Record a goal conversion by adding a record to the file at
``swab-path/<experiment>/<variant>/<goal>``.
If experiment is not specified, all experiments linked to the named goal
are looked up.
This doesn't use any file locking, but we should be safe on any posix
system as we are appending each time to the file.
See http://www.perlmonks.org/?node_id=486488 for a discussion of the issue.
"""
if is_bot(environ):
return
swab = environ["swab.swab"]
if experiment is None:
try:
experiments = swab.experiments_by_goal[goal]
except KeyError:
raise ValueError("Invalid goal: {!r}".format(goal))
else:
experiments = [swab.experiments[experiment]]
for experiment in experiments:
if not swab.include(environ, experiment.name):
continue
variant = show_variant(environ, experiment.name, record=False)
path = os.path.join(swab.datadir, experiment.name, variant, goal)
try:
f = open(path, "a")
except IOError:
makedir(os.path.dirname(path))
f = open(path, "a")
try:
f.write(_logline(_getswabid(environ)))
finally:
f.close()
[docs]
def makedir(path):
"""
Create a directory at ``path``. Unlike ``os.makedirs`` don't raise an error
if ``path`` already exists.
"""
try:
os.makedirs(path)
except OSError:
# Path already exists or cannot be created
if not os.path.isdir(path):
raise
[docs]
def count_entries(path, dedupe=True):
"""
Count the number of entries in ``path``.
:param dedupe: if True, dedupe Entries so only one conversion is counted per
identity.
"""
if dedupe:
return len(get_identities(path))
else:
if not os.path.exists(path):
return 0
with open(path, "rb") as f:
return sum(1 for line in f)
[docs]
def get_identities(path):
"""
Return a Counter for identity entries in ``path``
"""
if not os.path.isfile(path):
return set()
return set(identity for t, identity in _logentries(path))
[docs]
def zscore(p, n, pc, nc):
"""
Calculate the zscore of probability ``p`` over ``n`` tests, compared to
control probability ``pc`` over ``nc`` tests
See http://20bits.com/articles/statistical-analysis-and-ab-testing/.
"""
from math import sqrt
try:
return (p - pc) / sqrt((p * (1 - p) / n) + (pc * (1 - pc) / nc))
except (ZeroDivisionError, ValueError):
return float("nan")
[docs]
def cumulative_normal_distribution(z):
"""
Return the confidence level from calculating of the cumulative normal
distribution for the given zscore.
See http://abtester.com/calculator/ and
http://www.sitmo.com/doc/Calculating_the_Cumulative_Normal_Distribution
"""
from math import exp
b1 = +0.319381530
b2 = -0.356563782
b3 = +1.781477937
b4 = -1.821255978
b5 = +1.330274429
p = +0.2316419
c = +0.39894228
if z >= 0.0:
t = 1.0 / (1.0 + p * z)
return 1.0 - c * exp(-z * z / 2.0) * t * (
t * (t * (t * (t * b5 + b4) + b3) + b2) + b1
)
else:
t = 1.0 / (1.0 - p * z)
return (
c * exp(-z * z / 2.0) * t * (t * (t * (t * (t * b5 + b4) + b3) + b2) + b1)
)
def probability_b_beats_a(trials_a, goals_a, trials_b, goals_b):
a_A = 1 + goals_a
b_A = 1 + trials_a - goals_a
a_B = 1 + goals_b
b_B = 1 + trials_b - goals_b
# Happens when goals > trials.
# In this case the gamma function isn't defined and the remaining code
# errors.
if b_A <= 0 or b_B <= 0:
return float("nan")
total = 0.0
log = math.log
exp = math.exp
# Define a log beta function in terms of gamma
lbeta = lambda a, b, lgamma=math.lgamma: (lgamma(a) + lgamma(b) - lgamma(a + b))
for i in range(a_B):
total += exp(
lbeta(a_A + i, b_B + b_A)
- log(b_B + i)
- lbeta(1 + i, b_B)
- lbeta(a_A, b_A)
)
return total