Source code for incendiary.xray.app

import socket
from typing import List

from insanic import Insanic
from insanic.monitor import MONITOR_ENDPOINTS
from sanic.config import Config

from incendiary.loggers import logger, error_logger
from incendiary.xray import config
from incendiary.xray.contexts import IncendiaryAsyncContext
from incendiary.xray.middlewares import before_request, after_request
from incendiary.xray.mixins import CaptureMixin
from incendiary.xray.sampling import IncendiaryDefaultSampler
from incendiary.xray.services import IncendiaryService
from incendiary.xray.utils import tracing_name

from aws_xray_sdk.core import patch, AsyncAWSXRayRecorder, xray_recorder
from aws_xray_sdk import global_sdk_config


[docs]class Incendiary(CaptureMixin): config_imported = False extra_recorder_configurations = {} app = None @classmethod def load_config(cls, settings_object: Config) -> None: if not cls.config_imported: for c in dir(config): if c.isupper(): if not hasattr(settings_object, c): conf = getattr(config, c) setattr(settings_object, c, conf) cls.config_imported = True @classmethod def _handle_error(cls, app: Insanic, messages: List[str]) -> None: error_message = ( "[XRAY] Tracing was not initialized because: " + ", ".join(messages) ) error_logger.critical(error_message) @classmethod def _check_prerequisites(cls, app: Insanic) -> List[str]: """ Checks to see if xray daemon is accessible with :code:`INCENDIARY_XRAY_DAEMON_HOST` and :code:`INCENDIARY_XRAY_DAEMON_PORT`. :return: List of error messages while validating xray prerequisites. """ messages = [] tracing_host = app.config.INCENDIARY_XRAY_DAEMON_HOST tracing_port = app.config.INCENDIARY_XRAY_DAEMON_PORT sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: # pragma: no cover socket.gethostbyname(tracing_host) sock.settimeout(1) if sock.connect_ex((tracing_host, int(tracing_port))) != 0: messages.append( f"Could not connect to port on [{tracing_host}:{tracing_port}]." ) except socket.gaierror: messages.append(f"Could not resolve host [{tracing_host}].") except socket.error as e: # pragma: no cover messages.append( f"Could not connect to [{tracing_host}:{tracing_port}]: {str(e)}" ) finally: sock.close() return messages
[docs] @classmethod def init_app( cls, app: Insanic, recorder: AsyncAWSXRayRecorder = None ) -> None: """ Initializes Insanic to use Incendiary. - This loads all default Incendiary configs. - Validates connection information to X-Ray Daemon. - Configures X-Ray SDK Recorder - Attaches middlewares to start stop segments - Replaces :code:`Service` object with :code:`IncendiaryService` to trace interservice communications. - Replaces asyncio task factory. - Patches configured modules. :param app: Your Insanic application/ :param recorder: If you want to use your own recorder. """ # checks to see if tracing can be enabled cls.app = app cls.load_config(app.config) messages = cls._check_prerequisites(app) if len(messages) == 0: global_sdk_config.set_sdk_enabled(True) app.xray_recorder = recorder or xray_recorder cls.setup_middlewares(app) cls.setup_client(app) cls.setup_listeners(app) patch(app.config.INCENDIARY_XRAY_PATCH_MODULES, raise_errors=False) app.plugin_initialized("incendiary", cls) else: cls._handle_error(app, messages) app.config.INCENDIARY_XRAY_ENABLED = False global_sdk_config.set_sdk_enabled(False)
[docs] @classmethod def setup_listeners(cls, app: Insanic) -> None: """ - Attaches before server start listener that configures the X-Ray Recorder. - Attaches a before server start listener that replaces the default asyncio task factory that can hold context. """ async def before_server_start_start_tracing(app, loop=None, **kwargs): app.xray_recorder.configure(**cls.xray_config(app)) # need to configure xray as the first thing that happens so insert into 0 if ( before_server_start_start_tracing not in app.listeners["before_server_start"] ): # need to attach context after insanic's set task factory has been set for i, l in enumerate(app.listeners["before_server_start"]): if l.__name__ == "before_server_start_set_task_factory": insert_index = i + 1 break else: insert_index = 0 app.listeners["before_server_start"].insert( insert_index, before_server_start_start_tracing )
[docs] @classmethod def setup_client(cls, app: Insanic) -> None: """ Replaces the :code:`Service` class on the service registry with :code:`IncendiaryService`. """ from insanic.services.registry import LazyServiceRegistry LazyServiceRegistry.service_class = IncendiaryService LazyServiceRegistry.service_class.xray_recorder = app.xray_recorder
[docs] @classmethod def setup_middlewares(cls, app: Insanic) -> None: """ Sets up request and response middlewares that starts a segment, or creates segment, and ends the segment on response. """ logger.debug("[XRAY] Initializing xray middleware") @app.middleware("request") async def start_trace(request): for ep in MONITOR_ENDPOINTS: if request.path.endswith(ep): break else: await before_request(request) @app.middleware("response") async def end_trace(request, response): for ep in MONITOR_ENDPOINTS: if request.path.endswith(ep): break else: await after_request(request, response) return response
[docs] @classmethod def xray_config(cls, app: Insanic) -> dict: """ Class method that returns all the configurations for the X-Ray SDK Recoder. """ xray_config = dict( service=tracing_name(app.config.SERVICE_NAME), context=IncendiaryAsyncContext(), sampling=True, sampler=IncendiaryDefaultSampler(app), # sampling_rules=app.sampler.sampling_rules, daemon_address=f"{app.config.INCENDIARY_XRAY_DAEMON_HOST}:{app.config.INCENDIARY_XRAY_DAEMON_PORT}", context_missing=app.config.INCENDIARY_XRAY_CONTEXT_MISSING_STRATEGY, streaming_threshold=10, plugins=("ECSPlugin",), ) xray_config.update(cls.extra_recorder_configurations) return xray_config