Source code for biothings.web.analytics.notifiers

from collections import defaultdict

from tornado.httpclient import AsyncHTTPClient
from tornado.web import RequestHandler

from biothings.web.analytics.channels import GA4Channel, GAChannel, SlackChannel


[docs] class Notifier: def __init__(self, settings): self.channels = [] if hasattr(settings, "SLACK_WEBHOOKS"): self.channels.append(SlackChannel(getattr(settings, "SLACK_WEBHOOKS"))) # noqa B009 if getattr(settings, "GA_ACCOUNT", None): self.channels.append( GAChannel( getattr(settings, "GA_ACCOUNT"), # noqa B009 getattr(settings, "GA_UID_GENERATOR_VERSION", 1), ) ) if getattr(settings, "GA4_MEASUREMENT_ID", None): self.channels.append( GA4Channel( measurement_id=getattr(settings, "GA4_MEASUREMENT_ID"), # noqa B009 api_secret=getattr(settings, "GA4_API_SECRET"), # noqa B009 uid_version=getattr(settings, "GA4_UID_GENERATOR_VERSION", 2), ) )
[docs] def broadcast(self, event): for channel in self.channels: if channel.handles(event): yield from channel.send(event)
# Web Framework Support # ---------------------------- # Tornado # https://www.tornadoweb.org/en/stable/httputil.html # #tornado.httputil.HTTPServerRequest.remote_ip
[docs] class AnalyticsMixin(RequestHandler):
[docs] def on_finish(self): super().on_finish() if self.get_argument("no_tracking", False): return # this feature is undocumented if self.settings.get("debug", False): return # for testing and development # Make sure to start the server with xheaders=True so that # remote_ip considers X-Real-Ip and X-Forwarded-For headers request = defaultdict(type(None)) request["user_ip"] = self.request.remote_ip request["user_agent"] = self.request.headers.get("User-Agent") request["host"] = self.request.host request["path"] = self.request.path request["referer"] = self.request.headers.get("Referer") self.event["__request__"] = request if hasattr(self, "biothings"): client = AsyncHTTPClient() notifier = self.biothings.notifier for request in notifier.broadcast(self.event): client.fetch(request) else: # need to initialize a notifier raise NotImplementedError()
# FastAPI # ...