diff --git a/examples/cloud_run_cloud_events/send_cloud_event.py b/examples/cloud_run_cloud_events/send_cloud_event.py index b523c31a..c08b8f93 100644 --- a/examples/cloud_run_cloud_events/send_cloud_event.py +++ b/examples/cloud_run_cloud_events/send_cloud_event.py @@ -13,9 +13,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from cloudevents.http import CloudEvent, to_structured import requests +from cloudevents.http import CloudEvent, to_structured # Create a cloudevent using https://github.com/cloudevents/sdk-python # Note we only need source and type because the cloudevents constructor by diff --git a/playground/main.py b/playground/main.py new file mode 100644 index 00000000..b974ddbc --- /dev/null +++ b/playground/main.py @@ -0,0 +1,16 @@ +import logging +import time + +import functions_framework + +logger = logging.getLogger(__name__) + + +@functions_framework.http +def main(request): + timeout = 2 + for _ in range(timeout * 10): + time.sleep(0.1) + logger.info("logging message after timeout elapsed") + return "Hello, world!" + diff --git a/src/functions_framework/_http/gunicorn.py b/src/functions_framework/_http/gunicorn.py index 050f766c..92cad90e 100644 --- a/src/functions_framework/_http/gunicorn.py +++ b/src/functions_framework/_http/gunicorn.py @@ -1,4 +1,4 @@ -# Copyright 2020 Google LLC +# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,17 +16,43 @@ import gunicorn.app.base +from gunicorn.workers.gthread import ThreadWorker + +from ..request_timeout import ThreadingTimeout + +# global for use in our custom gthread worker; the gunicorn arbiter spawns these +# and it's not possible to inject (and self.timeout means something different to +# async workers!) +# set/managed in gunicorn application init for test-friendliness +TIMEOUT_SECONDS = None + class GunicornApplication(gunicorn.app.base.BaseApplication): def __init__(self, app, host, port, debug, **options): + threads = int(os.environ.get("THREADS", (os.cpu_count() or 1) * 4)) + + global TIMEOUT_SECONDS + TIMEOUT_SECONDS = int(os.environ.get("CLOUD_RUN_TIMEOUT_SECONDS", 0)) + self.options = { "bind": "%s:%s" % (host, port), - "workers": os.environ.get("WORKERS", 1), - "threads": os.environ.get("THREADS", (os.cpu_count() or 1) * 4), - "timeout": os.environ.get("CLOUD_RUN_TIMEOUT_SECONDS", 0), - "loglevel": "error", + "workers": int(os.environ.get("WORKERS", 1)), + "threads": threads, + "loglevel": os.environ.get("GUNICORN_LOG_LEVEL", "error"), "limit_request_line": 0, } + + if ( + TIMEOUT_SECONDS > 0 + and threads > 1 + and (os.environ.get("THREADED_TIMEOUT_ENABLED", "False").lower() == "true") + ): # pragma: no cover + self.options["worker_class"] = ( + "functions_framework._http.gunicorn.GThreadWorkerWithTimeoutSupport" + ) + else: + self.options["timeout"] = TIMEOUT_SECONDS + self.options.update(options) self.app = app @@ -38,3 +64,9 @@ def load_config(self): def load(self): return self.app + + +class GThreadWorkerWithTimeoutSupport(ThreadWorker): # pragma: no cover + def handle_request(self, req, conn): + with ThreadingTimeout(TIMEOUT_SECONDS): + super(GThreadWorkerWithTimeoutSupport, self).handle_request(req, conn) diff --git a/src/functions_framework/exceptions.py b/src/functions_framework/exceptions.py index 671a28a4..81b9f8f0 100644 --- a/src/functions_framework/exceptions.py +++ b/src/functions_framework/exceptions.py @@ -1,4 +1,4 @@ -# Copyright 2020 Google LLC +# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -35,3 +35,7 @@ class MissingTargetException(FunctionsFrameworkException): class EventConversionException(FunctionsFrameworkException): pass + + +class RequestTimeoutException(FunctionsFrameworkException): + pass diff --git a/src/functions_framework/request_timeout.py b/src/functions_framework/request_timeout.py new file mode 100644 index 00000000..ed34f66e --- /dev/null +++ b/src/functions_framework/request_timeout.py @@ -0,0 +1,42 @@ +import ctypes +import logging +import threading + +from .exceptions import RequestTimeoutException + +logger = logging.getLogger(__name__) + + +class ThreadingTimeout(object): # pragma: no cover + def __init__(self, seconds): + self.seconds = seconds + self.target_tid = threading.current_thread().ident + self.timer = None + + def __enter__(self): + self.timer = threading.Timer(self.seconds, self._raise_exc) + self.timer.start() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.timer.cancel() + if exc_type is RequestTimeoutException: + logger.warning( + "Request handling exceeded {0} seconds timeout; terminating request handling...".format( + self.seconds + ), + exc_info=(exc_type, exc_val, exc_tb), + ) + return False + + def _raise_exc(self): + ret = ctypes.pythonapi.PyThreadState_SetAsyncExc( + ctypes.c_long(self.target_tid), ctypes.py_object(RequestTimeoutException) + ) + if ret == 0: + raise ValueError("Invalid thread ID {}".format(self.target_tid)) + elif ret > 1: + ctypes.pythonapi.PyThreadState_SetAsyncExc( + ctypes.c_long(self.target_tid), None + ) + raise SystemError("PyThreadState_SetAsyncExc failed") diff --git a/tests/test_execution_id.py b/tests/test_execution_id.py index c650ee31..a2601817 100644 --- a/tests/test_execution_id.py +++ b/tests/test_execution_id.py @@ -378,4 +378,5 @@ async def test_maintains_execution_id_for_concurrent_requests(monkeypatch, capsy logs = record.err.strip().split("\n") logs_as_json = tuple(json.loads(log) for log in logs) - assert logs_as_json == expected_logs + sort_key = lambda d: d["message"] + assert sorted(logs_as_json, key=sort_key) == sorted(expected_logs, key=sort_key) diff --git a/tests/test_functions/timeout/main.py b/tests/test_functions/timeout/main.py new file mode 100644 index 00000000..09efeb88 --- /dev/null +++ b/tests/test_functions/timeout/main.py @@ -0,0 +1,12 @@ +import logging +import time + +logger = logging.getLogger(__name__) + + +def function(request): + # sleep for 1200 total ms (1.2 sec) + for _ in range(12): + time.sleep(0.1) + logger.info("some extra logging message") + return "success", 200 diff --git a/tests/test_timeouts.py b/tests/test_timeouts.py new file mode 100644 index 00000000..9637de67 --- /dev/null +++ b/tests/test_timeouts.py @@ -0,0 +1,265 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import pathlib +import socket +import time + +from multiprocessing import Process + +import pytest +import requests + +ff_gunicorn = pytest.importorskip("functions_framework._http.gunicorn") + + +from functions_framework import create_app + +TEST_FUNCTIONS_DIR = pathlib.Path(__file__).resolve().parent / "test_functions" +TEST_HOST = "0.0.0.0" +TEST_PORT = "8080" + + +@pytest.fixture(autouse=True) +def run_around_tests(): + # the test samples test also listens on 8080, so let's be good stewards of + # the port and make sure it's free + _wait_for_no_listen(TEST_HOST, TEST_PORT) + yield + _wait_for_no_listen(TEST_HOST, TEST_PORT) + + +@pytest.mark.skipif("platform.system() == 'Windows'") +@pytest.mark.skipif("platform.system() == 'Darwin'") +@pytest.mark.slow_integration_test +def test_no_timeout_allows_request_processing_to_finish(): + source = TEST_FUNCTIONS_DIR / "timeout" / "main.py" + target = "function" + + app = create_app(target, source) + + options = {} + + gunicorn_app = ff_gunicorn.GunicornApplication( + app, TEST_HOST, TEST_PORT, False, **options + ) + + gunicorn_p = Process(target=gunicorn_app.run) + gunicorn_p.start() + + _wait_for_listen(TEST_HOST, TEST_PORT) + + result = requests.get("http://{}:{}/".format(TEST_HOST, TEST_PORT)) + + gunicorn_p.terminate() + gunicorn_p.join() + + assert result.status_code == 200 + + +@pytest.mark.skipif("platform.system() == 'Windows'") +@pytest.mark.skipif("platform.system() == 'Darwin'") +@pytest.mark.slow_integration_test +def test_timeout_but_not_threaded_timeout_enabled_does_not_kill(monkeypatch): + monkeypatch.setenv("CLOUD_RUN_TIMEOUT_SECONDS", "1") + monkeypatch.setenv("THREADED_TIMEOUT_ENABLED", "false") + source = TEST_FUNCTIONS_DIR / "timeout" / "main.py" + target = "function" + + app = create_app(target, source) + + options = {} + + gunicorn_app = ff_gunicorn.GunicornApplication( + app, TEST_HOST, TEST_PORT, False, **options + ) + + gunicorn_p = Process(target=gunicorn_app.run) + gunicorn_p.start() + + _wait_for_listen(TEST_HOST, TEST_PORT) + + result = requests.get("http://{}:{}/".format(TEST_HOST, TEST_PORT)) + + gunicorn_p.terminate() + gunicorn_p.join() + + assert result.status_code == 200 + + +@pytest.mark.skipif("platform.system() == 'Windows'") +@pytest.mark.skipif("platform.system() == 'Darwin'") +@pytest.mark.slow_integration_test +def test_timeout_and_threaded_timeout_enabled_kills(monkeypatch): + monkeypatch.setenv("CLOUD_RUN_TIMEOUT_SECONDS", "1") + monkeypatch.setenv("THREADED_TIMEOUT_ENABLED", "true") + source = TEST_FUNCTIONS_DIR / "timeout" / "main.py" + target = "function" + + app = create_app(target, source) + + options = {} + + gunicorn_app = ff_gunicorn.GunicornApplication( + app, TEST_HOST, TEST_PORT, False, **options + ) + + gunicorn_p = Process(target=gunicorn_app.run) + gunicorn_p.start() + + _wait_for_listen(TEST_HOST, TEST_PORT) + + result = requests.get("http://{}:{}/".format(TEST_HOST, TEST_PORT)) + + gunicorn_p.terminate() + gunicorn_p.join() + + # Any exception raised in execution is a 500 error. Cloud Functions 1st gen and + # 2nd gen/Cloud Run infrastructure doing the timeout will return a 408 (gen 1) + # or 504 (gen 2/CR) at the infrastructure layer when request timeouts happen, + # and this code will only be available to the user in logs. + assert result.status_code == 500 + + +@pytest.mark.skipif("platform.system() == 'Windows'") +@pytest.mark.skipif("platform.system() == 'Darwin'") +@pytest.mark.slow_integration_test +def test_timeout_and_threaded_timeout_enabled_but_timeout_not_exceeded_doesnt_kill( + monkeypatch, +): + monkeypatch.setenv("CLOUD_RUN_TIMEOUT_SECONDS", "2") + monkeypatch.setenv("THREADED_TIMEOUT_ENABLED", "true") + source = TEST_FUNCTIONS_DIR / "timeout" / "main.py" + target = "function" + + app = create_app(target, source) + + options = {} + + gunicorn_app = ff_gunicorn.GunicornApplication( + app, TEST_HOST, TEST_PORT, False, **options + ) + + gunicorn_p = Process(target=gunicorn_app.run) + gunicorn_p.start() + + _wait_for_listen(TEST_HOST, TEST_PORT) + + result = requests.get("http://{}:{}/".format(TEST_HOST, TEST_PORT)) + + gunicorn_p.terminate() + gunicorn_p.join() + + assert result.status_code == 200 + + +@pytest.mark.skipif("platform.system() == 'Windows'") +@pytest.mark.skipif("platform.system() == 'Darwin'") +@pytest.mark.slow_integration_test +def test_timeout_sync_worker_kills_on_timeout( + monkeypatch, +): + monkeypatch.setenv("CLOUD_RUN_TIMEOUT_SECONDS", "1") + monkeypatch.setenv("WORKERS", 2) + monkeypatch.setenv("THREADS", 1) + source = TEST_FUNCTIONS_DIR / "timeout" / "main.py" + target = "function" + + app = create_app(target, source) + + options = {} + + gunicorn_app = ff_gunicorn.GunicornApplication( + app, TEST_HOST, TEST_PORT, False, **options + ) + + gunicorn_p = Process(target=gunicorn_app.run) + gunicorn_p.start() + + _wait_for_listen(TEST_HOST, TEST_PORT) + + result = requests.get("http://{}:{}/".format(TEST_HOST, TEST_PORT)) + + gunicorn_p.terminate() + gunicorn_p.join() + + assert result.status_code == 500 + + +@pytest.mark.skipif("platform.system() == 'Windows'") +@pytest.mark.skipif("platform.system() == 'Darwin'") +@pytest.mark.slow_integration_test +def test_timeout_sync_worker_does_not_kill_if_less_than_timeout( + monkeypatch, +): + monkeypatch.setenv("CLOUD_RUN_TIMEOUT_SECONDS", "2") + monkeypatch.setenv("WORKERS", 2) + monkeypatch.setenv("THREADS", 1) + source = TEST_FUNCTIONS_DIR / "timeout" / "main.py" + target = "function" + + app = create_app(target, source) + + options = {} + + gunicorn_app = ff_gunicorn.GunicornApplication( + app, TEST_HOST, TEST_PORT, False, **options + ) + + gunicorn_p = Process(target=gunicorn_app.run) + gunicorn_p.start() + + _wait_for_listen(TEST_HOST, TEST_PORT) + + result = requests.get("http://{}:{}/".format(TEST_HOST, TEST_PORT)) + + gunicorn_p.terminate() + gunicorn_p.join() + + assert result.status_code == 200 + + +@pytest.mark.skip +def _wait_for_listen(host, port, timeout=10): + # Used in tests to make sure that the gunicorn app has booted and is + # listening before sending a test request + start_time = time.perf_counter() + while True: + try: + with socket.create_connection((host, port), timeout=timeout): + break + except OSError as ex: + time.sleep(0.01) + if time.perf_counter() - start_time >= timeout: + raise TimeoutError( + "Waited too long for port {} on host {} to start accepting " + "connections.".format(port, host) + ) from ex + + +@pytest.mark.skip +def _wait_for_no_listen(host, port, timeout=10): + # Used in tests to make sure that the port is actually free after + # the process binding to it should have been killed + start_time = time.perf_counter() + while True: + try: + with socket.create_connection((host, port), timeout=timeout): + time.sleep(0.01) + if time.perf_counter() - start_time >= timeout: + raise TimeoutError( + "Waited too long for port {} on host {} to stop accepting " + "connections.".format(port, host) + ) + except OSError as ex: + break diff --git a/tox.ini b/tox.ini index e8c555b5..6ba6d3b4 100644 --- a/tox.ini +++ b/tox.ini @@ -5,6 +5,7 @@ envlist = py{35,36,37,38,39,310}-{ubuntu-latest,macos-latest,windows-latest},lin usedevelop = true deps = docker + pytest-asyncio pytest-cov pytest-integration pretend