Source code for incendiary.xray.hooks

from json import JSONDecodeError

import traceback
from typing import Optional

import ujson as json
from aws_xray_sdk.core import AsyncAWSXRayRecorder

from aws_xray_sdk.core.exceptions import exceptions
from aws_xray_sdk.core.models import http
from aws_xray_sdk.core.models.subsegment import Subsegment
from aws_xray_sdk.ext.util import inject_trace_header, strip_url
from httpx import Request, TransportError

from insanic import status


# All aiohttp calls will entail outgoing HTTP requests, only in some ad-hoc
# exceptions the namespace will be flip back to local.
from incendiary.xray.utils import get_safe_dict

REMOTE_NAMESPACE = "remote"
LOCAL_NAMESPACE = "local"
LOCAL_EXCEPTIONS = (
    # DNS issues
    OSError,
    TransportError,
)


[docs]def begin_subsegment( request: Request, recorder: AsyncAWSXRayRecorder, name: str = None ) -> Optional[Subsegment]: """ Begins a subsegment before sending an interservice request. :param request: The httpx request object for interservice communications. :param recorder: The AWS X-Ray recorder for this application. :return: The started subsegment. """ name = name or strip_url(str(request.url)) try: subsegment = recorder.begin_subsegment(name, REMOTE_NAMESPACE) except ( exceptions.SegmentNotFoundException, exceptions.AlreadyEndedException, ): subsegment = None # No-op if subsegment is `None` due to `LOG_ERROR`. if not subsegment: request.give_up = True else: request.give_up = False subsegment.put_http_meta(http.METHOD, request.method) subsegment.put_http_meta(http.URL, str(request.url)) inject_trace_header(request.headers, subsegment) return subsegment
[docs]def end_subsegment( *, request, response, recorder, subsegment: Optional[Subsegment] = None ) -> Optional[Subsegment]: """ The function that ends the subsegment after a response gets received. :param request: The request object for interservice communications. :param response: Response object of the request. :param subsegment: Subsegment of this request. :param recorder: The aws xray recorder. """ if getattr(request, "give_up", None): return subsegment = subsegment or recorder.current_subsegment() if subsegment.sampled: subsegment.put_http_meta(http.STATUS, response.status_code) if response.status_code >= status.HTTP_400_BAD_REQUEST: try: resp = response.json() except JSONDecodeError: resp = response.text else: resp = get_safe_dict(resp) resp = json.dumps(resp) subsegment.put_annotation("response", resp) # recorder.end_subsegment() subsegment.close() return subsegment
[docs]def end_subsegment_with_exception( *, request: Request, exception: Exception, subsegment: Optional[Subsegment] = None, recorder: AsyncAWSXRayRecorder, ) -> Optional[Subsegment]: """ The function that ends the subsegment when an exception is raised while attempting to send an interservice request. """ if getattr(request, "give_up", None): return subsegment = subsegment or recorder.current_subsegment() if subsegment.sampled: subsegment.add_exception( exception, traceback.extract_stack(limit=recorder._max_trace_back), ) if isinstance(exception, LOCAL_EXCEPTIONS): subsegment.namespace = LOCAL_NAMESPACE subsegment.close() return subsegment