Skip to content

Commit

Permalink
feat: Add execution id (#320)
Browse files Browse the repository at this point in the history
* feat: Add execution id

Adds an execution id for each request. When the LOG_EXECUTION_ID env var is set, the execution id will be included in logs.
  • Loading branch information
nifflets authored May 7, 2024
1 parent dfc5059 commit 662bf4c
Show file tree
Hide file tree
Showing 6 changed files with 570 additions and 2 deletions.
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
"watchdog>=1.0.0",
"gunicorn>=19.2.0; platform_system!='Windows'",
"cloudevents>=1.2.0,<2.0.0",
"Werkzeug>=0.14,<4.0.0",
],
entry_points={
"console_scripts": [
Expand Down
37 changes: 36 additions & 1 deletion src/functions_framework/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import io
import json
import logging
import logging.config
import os
import os.path
import pathlib
import sys
Expand All @@ -32,7 +34,12 @@
from cloudevents.http import from_http, is_binary
from cloudevents.http.event import CloudEvent

from functions_framework import _function_registry, _typed_event, event_conversion
from functions_framework import (
_function_registry,
_typed_event,
event_conversion,
execution_id,
)
from functions_framework.background_event import BackgroundEvent
from functions_framework.exceptions import (
EventConversionException,
Expand Down Expand Up @@ -129,6 +136,7 @@ def setup_logging():


def _http_view_func_wrapper(function, request):
@execution_id.set_execution_context(request, _enable_execution_id_logging())
@functools.wraps(function)
def view_func(path):
return function(request._get_current_object())
Expand All @@ -143,6 +151,7 @@ def _run_cloud_event(function, request):


def _typed_event_func_wrapper(function, request, inputType: Type):
@execution_id.set_execution_context(request, _enable_execution_id_logging())
def view_func(path):
try:
data = request.get_json()
Expand All @@ -163,6 +172,7 @@ def view_func(path):


def _cloud_event_view_func_wrapper(function, request):
@execution_id.set_execution_context(request, _enable_execution_id_logging())
def view_func(path):
ce_exception = None
event = None
Expand Down Expand Up @@ -198,6 +208,7 @@ def view_func(path):


def _event_view_func_wrapper(function, request):
@execution_id.set_execution_context(request, _enable_execution_id_logging())
def view_func(path):
if event_conversion.is_convertable_cloud_event(request):
# Convert this CloudEvent to the equivalent background event data and context.
Expand Down Expand Up @@ -332,6 +343,9 @@ def create_app(target=None, source=None, signature_type=None):

source_module, spec = _function_registry.load_function_module(source)

if _enable_execution_id_logging():
_configure_app_execution_id_logging()

# Create the application
_app = flask.Flask(target, template_folder=template_folder)
_app.register_error_handler(500, crash_handler)
Expand All @@ -355,6 +369,7 @@ def handle_none(rv):
sys.stderr = _LoggingHandler("ERROR", sys.stderr)
setup_logging()

_app.wsgi_app = execution_id.WsgiMiddleware(_app.wsgi_app)
# Execute the module, within the application context
with _app.app_context():
try:
Expand Down Expand Up @@ -411,6 +426,26 @@ def __call__(self, *args, **kwargs):
return self.app(*args, **kwargs)


def _configure_app_execution_id_logging():
# Logging needs to be configured before app logger is accessed
logging.config.dictConfig(
{
"version": 1,
"handlers": {
"wsgi": {
"class": "logging.StreamHandler",
"stream": "ext://functions_framework.execution_id.logging_stream",
},
},
"root": {"level": "INFO", "handlers": ["wsgi"]},
}
)


def _enable_execution_id_logging():
return os.environ.get("LOG_EXECUTION_ID")


app = LazyWSGIApp()


Expand Down
156 changes: 156 additions & 0 deletions src/functions_framework/execution_id.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import contextlib
import functools
import io
import json
import logging
import random
import re
import string
import sys

import flask

from werkzeug.local import LocalProxy

_EXECUTION_ID_LENGTH = 12
_EXECUTION_ID_CHARSET = string.digits + string.ascii_letters
_LOGGING_API_LABELS_FIELD = "logging.googleapis.com/labels"
_LOGGING_API_SPAN_ID_FIELD = "logging.googleapis.com/spanId"
_TRACE_CONTEXT_REGEX_PATTERN = re.compile(
r"^(?P<trace_id>[\w\d]+)/(?P<span_id>\d+);o=(?P<options>[01])$"
)
EXECUTION_ID_REQUEST_HEADER = "Function-Execution-Id"
TRACE_CONTEXT_REQUEST_HEADER = "X-Cloud-Trace-Context"

logger = logging.getLogger(__name__)


class ExecutionContext:
def __init__(self, execution_id=None, span_id=None):
self.execution_id = execution_id
self.span_id = span_id


def _get_current_context():
return (
flask.g.execution_id_context
if flask.has_request_context() and "execution_id_context" in flask.g
else None
)


def _set_current_context(context):
if flask.has_request_context():
flask.g.execution_id_context = context


def _generate_execution_id():
return "".join(
_EXECUTION_ID_CHARSET[random.randrange(len(_EXECUTION_ID_CHARSET))]
for _ in range(_EXECUTION_ID_LENGTH)
)


# Middleware to add execution id to request header if one does not already exist
class WsgiMiddleware:
def __init__(self, wsgi_app):
self.wsgi_app = wsgi_app

def __call__(self, environ, start_response):
execution_id = (
environ.get("HTTP_FUNCTION_EXECUTION_ID") or _generate_execution_id()
)
environ["HTTP_FUNCTION_EXECUTION_ID"] = execution_id
return self.wsgi_app(environ, start_response)


# Sets execution id and span id for the request
def set_execution_context(request, enable_id_logging=False):
if enable_id_logging:
stdout_redirect = contextlib.redirect_stdout(
LoggingHandlerAddExecutionId(sys.stdout)
)
stderr_redirect = contextlib.redirect_stderr(
LoggingHandlerAddExecutionId(sys.stderr)
)
else:
stdout_redirect = contextlib.nullcontext()
stderr_redirect = contextlib.nullcontext()

def decorator(view_function):
@functools.wraps(view_function)
def wrapper(*args, **kwargs):
trace_context = re.match(
_TRACE_CONTEXT_REGEX_PATTERN,
request.headers.get(TRACE_CONTEXT_REQUEST_HEADER, ""),
)
execution_id = request.headers.get(EXECUTION_ID_REQUEST_HEADER)
span_id = trace_context.group("span_id") if trace_context else None
_set_current_context(ExecutionContext(execution_id, span_id))

with stderr_redirect, stdout_redirect:
return view_function(*args, **kwargs)

return wrapper

return decorator


@LocalProxy
def logging_stream():
return LoggingHandlerAddExecutionId(stream=flask.logging.wsgi_errors_stream)


class LoggingHandlerAddExecutionId(io.TextIOWrapper):
def __new__(cls, stream=sys.stdout):
if isinstance(stream, LoggingHandlerAddExecutionId):
return stream
else:
return super(LoggingHandlerAddExecutionId, cls).__new__(cls)

def __init__(self, stream=sys.stdout):
io.TextIOWrapper.__init__(self, io.StringIO())
self.stream = stream

def write(self, contents):
if contents == "\n":
return
current_context = _get_current_context()
if current_context is None:
self.stream.write(contents + "\n")
self.stream.flush()
return
try:
execution_id = current_context.execution_id
span_id = current_context.span_id
payload = json.loads(contents)
if not isinstance(payload, dict):
payload = {"message": contents}
except json.JSONDecodeError:
if len(contents) > 0 and contents[-1] == "\n":
contents = contents[:-1]
payload = {"message": contents}
if execution_id:
payload[_LOGGING_API_LABELS_FIELD] = payload.get(
_LOGGING_API_LABELS_FIELD, {}
)
payload[_LOGGING_API_LABELS_FIELD]["execution_id"] = execution_id
if span_id:
payload[_LOGGING_API_SPAN_ID_FIELD] = span_id
self.stream.write(json.dumps(payload))
self.stream.write("\n")
self.stream.flush()
Loading

0 comments on commit 662bf4c

Please sign in to comment.