From 3b8f9554b0437fe8dc7addacee875e1f667695ed Mon Sep 17 00:00:00 2001 From: Andrew Chubatiuk Date: Tue, 16 Apr 2024 15:01:03 +0300 Subject: [PATCH 1/2] replaced-statsd-with-prometheus-client --- .dockerignore | 1 + poetry.lock | 27 +++++++++++---------- pyproject.toml | 2 +- redash/__init__.py | 7 ++++-- redash/app.py | 3 +++ redash/metrics/database.py | 11 ++++++--- redash/metrics/request.py | 11 ++++++--- redash/query_runner/__init__.py | 6 +++++ redash/query_runner/databricks.py | 4 ++-- redash/settings/__init__.py | 2 ++ redash/tasks/queries/maintenance.py | 15 ++++++++---- redash/tasks/worker.py | 19 +++++++++------ tests/handlers/test_authentication.py | 3 ++- tests/metrics/test_database.py | 6 ++--- tests/metrics/test_request.py | 6 ++--- tests/tasks/test_schedule.py | 4 ++-- tests/tasks/test_worker.py | 34 +++++++++------------------ 17 files changed, 95 insertions(+), 66 deletions(-) diff --git a/.dockerignore b/.dockerignore index b5a2c33ebb..45d8314a23 100644 --- a/.dockerignore +++ b/.dockerignore @@ -12,3 +12,4 @@ venv/ /.github/ /netlify.toml /setup/ +.github/ diff --git a/poetry.lock b/poetry.lock index a3ad412c46..d5e410d314 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2907,6 +2907,20 @@ nodeenv = ">=0.11.1" pyyaml = ">=5.1" virtualenv = ">=20.10.0" +[[package]] +name = "prometheus-client" +version = "0.20.0" +description = "Python client for the Prometheus monitoring system." +optional = false +python-versions = ">=3.8" +files = [ + {file = "prometheus_client-0.20.0-py3-none-any.whl", hash = "sha256:cde524a85bce83ca359cc837f28b8c0db5cac7aa653a588fd7e84ba061c329e7"}, + {file = "prometheus_client-0.20.0.tar.gz", hash = "sha256:287629d00b147a32dcb2be0b9df905da599b2d82f80377083ec8463309a4bb89"}, +] + +[package.extras] +twisted = ["twisted"] + [[package]] name = "prompt-toolkit" version = "3.0.43" @@ -4654,17 +4668,6 @@ build-sphinx = ["sphinx", "sphinxcontrib-napoleon"] dev = ["check-manifest"] test = ["tox (>=1.8.1)"] -[[package]] -name = "statsd" -version = "3.3.0" -description = "A simple statsd client." -optional = false -python-versions = "*" -files = [ - {file = "statsd-3.3.0-py2.py3-none-any.whl", hash = "sha256:c610fb80347fca0ef62666d241bce64184bd7cc1efe582f9690e045c25535eaa"}, - {file = "statsd-3.3.0.tar.gz", hash = "sha256:e3e6db4c246f7c59003e51c9720a51a7f39a396541cb9b147ff4b14d15b5dd1f"}, -] - [[package]] name = "supervisor" version = "4.1.0" @@ -5300,4 +5303,4 @@ testing = ["coverage (>=5.0.3)", "zope.event", "zope.testing"] [metadata] lock-version = "2.0" python-versions = ">=3.8,<3.11" -content-hash = "e7985ee5c3ca3a4389b4e85fda033a9b3b867dbbe4b4a7fca8ea5c35fc401148" +content-hash = "44e59c1db0eb736c5d9c72aea2ae404256c79981a76f94e1aaf0a2b4ca7aae26" diff --git a/pyproject.toml b/pyproject.toml index 8510146336..4af3b62b3c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -75,7 +75,7 @@ sqlalchemy-searchable = "1.2.0" sqlalchemy-utils = "0.34.2" sqlparse = "0.5.0" sshtunnel = "0.1.5" -statsd = "3.3.0" +prometheus-client = "0.20.0" supervisor = "4.1.0" supervisor-checks = "0.8.1" ua-parser = "0.18.0" diff --git a/redash/__init__.py b/redash/__init__.py index 1e2c987777..2598343e5e 100644 --- a/redash/__init__.py +++ b/redash/__init__.py @@ -7,7 +7,7 @@ from flask_limiter.util import get_remote_address from flask_mail import Mail from flask_migrate import Migrate -from statsd import StatsClient +from prometheus_client.bridge.graphite import GraphiteBridge from redash import settings from redash.app import create_app # noqa @@ -43,11 +43,14 @@ def setup_logging(): setup_logging() +if settings.STATSD_ENABLED: + gb = GraphiteBridge((settings.STATSD_HOST, settings.STATSD_PORT), tags=settings.STATSD_USE_TAGS) + gb.start(settings.STATSD_PERIOD) + redis_connection = redis.from_url(settings.REDIS_URL) rq_redis_connection = redis.from_url(settings.RQ_REDIS_URL) mail = Mail() migrate = Migrate(compare_type=True) -statsd_client = StatsClient(host=settings.STATSD_HOST, port=settings.STATSD_PORT, prefix=settings.STATSD_PREFIX) limiter = Limiter(key_func=get_remote_address, storage_uri=settings.LIMITER_STORAGE) import_query_runners(settings.QUERY_RUNNERS) diff --git a/redash/app.py b/redash/app.py index 437f766c87..8bb7730549 100644 --- a/redash/app.py +++ b/redash/app.py @@ -1,4 +1,6 @@ from flask import Flask +from prometheus_client import make_wsgi_app +from werkzeug.middleware.dispatcher import DispatcherMiddleware from werkzeug.middleware.proxy_fix import ProxyFix from redash import settings @@ -39,6 +41,7 @@ def create_app(): sentry.init() app = Redash() + app.wsgi_app = DispatcherMiddleware(app.wsgi_app, {"/metrics": make_wsgi_app()}) security.init_app(app) request_metrics.init_app(app) diff --git a/redash/metrics/database.py b/redash/metrics/database.py index 152427b2e5..1e80606775 100644 --- a/redash/metrics/database.py +++ b/redash/metrics/database.py @@ -2,15 +2,20 @@ import time from flask import g, has_request_context +from prometheus_client import Histogram from sqlalchemy.engine import Engine from sqlalchemy.event import listens_for from sqlalchemy.orm.util import _ORMJoin from sqlalchemy.sql.selectable import Alias -from redash import statsd_client - metrics_logger = logging.getLogger("metrics") +dbActionLatencyHistogram = Histogram( + "db_action_latency_milliseconds", + "Database operation duration", + ["name", "action"], +) + def _table_name_from_select_element(elt): t = elt.froms[0] @@ -48,7 +53,7 @@ def after_execute(conn, elt, multiparams, params, result): action = action.lower() - statsd_client.timing("db.{}.{}".format(name, action), duration) + dbActionLatencyHistogram.labels(name, action).observe(duration) metrics_logger.debug("table=%s query=%s duration=%.2f", name, action, duration) if has_request_context(): diff --git a/redash/metrics/request.py b/redash/metrics/request.py index 9dc169f3b7..294ca70bad 100644 --- a/redash/metrics/request.py +++ b/redash/metrics/request.py @@ -3,11 +3,16 @@ from collections import namedtuple from flask import g, request - -from redash import statsd_client +from prometheus_client import Histogram metrics_logger = logging.getLogger("metrics") +requestsHistogram = Histogram( + "requests_latency_milliseconds", + "Requests latency", + ["endpoint", "method"], +) + def record_request_start_time(): g.start_time = time.time() @@ -35,7 +40,7 @@ def calculate_metrics(response): queries_duration, ) - statsd_client.timing("requests.{}.{}".format(endpoint, request.method.lower()), request_duration) + requestsHistogram.labels(endpoint, request.method.lower()).observe(request_duration) return response diff --git a/redash/query_runner/__init__.py b/redash/query_runner/__init__.py index 10d6d6edf5..dc78cfda11 100644 --- a/redash/query_runner/__init__.py +++ b/redash/query_runner/__init__.py @@ -5,6 +5,7 @@ import sqlparse from dateutil import parser +from prometheus_client import Counter from rq.timeouts import JobTimeoutException from sshtunnel import open_tunnel @@ -120,6 +121,11 @@ class BaseQueryRunner: limit_query = " LIMIT 1000" limit_keywords = ["LIMIT", "OFFSET"] limit_after_select = False + queryRunnerResultsCounter = Counter( + "query_runner_results", + "Query Runner results counter", + ["runner", "name", "status"], + ) def __init__(self, configuration): self.syntax = "sql" diff --git a/redash/query_runner/databricks.py b/redash/query_runner/databricks.py index 886ba9b8b4..f6f099678b 100644 --- a/redash/query_runner/databricks.py +++ b/redash/query_runner/databricks.py @@ -2,7 +2,7 @@ import logging import os -from redash import __version__, statsd_client +from redash import __version__ from redash.query_runner import ( TYPE_BOOLEAN, TYPE_DATE, @@ -112,7 +112,7 @@ def run_query(self, query, user): if len(result_set) >= ROW_LIMIT and cursor.fetchone() is not None: logger.warning("Truncated result set.") - statsd_client.incr("redash.query_runner.databricks.truncated") + self.queryRunnerResultsCounter.labels("databricks", self.name, "truncated").inc() data["truncated"] = True error = None else: diff --git a/redash/settings/__init__.py b/redash/settings/__init__.py index 75c438cff2..ec215e55ea 100644 --- a/redash/settings/__init__.py +++ b/redash/settings/__init__.py @@ -22,9 +22,11 @@ REDIS_URL = add_decode_responses_to_redis_url(_REDIS_URL) PROXIES_COUNT = int(os.environ.get("REDASH_PROXIES_COUNT", "1")) +STATSD_ENABLED = parse_boolean(os.environ.get("REDASH_STATSD_ENABLED", "false")) STATSD_HOST = os.environ.get("REDASH_STATSD_HOST", "127.0.0.1") STATSD_PORT = int(os.environ.get("REDASH_STATSD_PORT", "8125")) STATSD_PREFIX = os.environ.get("REDASH_STATSD_PREFIX", "redash") +STATSD_PERIOD = float(os.environ.get("REDASH_STATSD_PERIOD", "30.0")) STATSD_USE_TAGS = parse_boolean(os.environ.get("REDASH_STATSD_USE_TAGS", "false")) # Connection settings for Redash's own database (where we store the queries, results, etc) diff --git a/redash/tasks/queries/maintenance.py b/redash/tasks/queries/maintenance.py index bbfebd053f..e79ecdb05c 100644 --- a/redash/tasks/queries/maintenance.py +++ b/redash/tasks/queries/maintenance.py @@ -1,9 +1,10 @@ import logging import time +from prometheus_client import Counter from rq.timeouts import JobTimeoutException -from redash import models, redis_connection, settings, statsd_client +from redash import models, redis_connection, settings from redash.models.parameterized_query import ( InvalidParameterError, QueryDetachedFromDataSourceError, @@ -17,6 +18,12 @@ logger = get_job_logger(__name__) +refreshSchemaCounter = Counter( + "refresh_schema", + "Refresh schema operation status counter", + ["status"], +) + def empty_schedules(): logger.info("Deleting schedules of past scheduled queries...") @@ -169,17 +176,17 @@ def refresh_schema(data_source_id): ds.id, time.time() - start_time, ) - statsd_client.incr("refresh_schema.success") + refreshSchemaCounter.labels("success").inc() except JobTimeoutException: logger.info( "task=refresh_schema state=timeout ds_id=%s runtime=%.2f", ds.id, time.time() - start_time, ) - statsd_client.incr("refresh_schema.timeout") + refreshSchemaCounter.labels("timeout").inc() except Exception: logger.warning("Failed refreshing schema for the data source: %s", ds.name, exc_info=1) - statsd_client.incr("refresh_schema.error") + refreshSchemaCounter.labels("error").inc() logger.info( "task=refresh_schema state=failed ds_id=%s runtime=%.2f", ds.id, diff --git a/redash/tasks/worker.py b/redash/tasks/worker.py index e1bfd457db..cf1acb37cc 100644 --- a/redash/tasks/worker.py +++ b/redash/tasks/worker.py @@ -3,6 +3,7 @@ import signal import sys +from prometheus_client import Gauge from rq import Queue as BaseQueue from rq.job import Job as BaseJob from rq.job import JobStatus @@ -13,14 +14,18 @@ Worker, ) -from redash import statsd_client - # HerokuWorker does not work in OSX https://github.com/getredash/redash/issues/5413 if sys.platform == "darwin": BaseWorker = Worker else: BaseWorker = HerokuWorker +rqJobsCounter = Gauge( + "rq_jobs", + "RQ jobs status count", + ["queue", "status"], +) + class CancellableJob(BaseJob): def cancel(self, pipeline=None): @@ -41,7 +46,7 @@ class StatsdRecordingQueue(BaseQueue): def enqueue_job(self, *args, **kwargs): job = super().enqueue_job(*args, **kwargs) - statsd_client.incr("rq.jobs.created.{}".format(self.name)) + rqJobsCounter.labels(self.name, "created").inc() return job @@ -59,13 +64,13 @@ class StatsdRecordingWorker(BaseWorker): """ def execute_job(self, job, queue): - statsd_client.incr("rq.jobs.running.{}".format(queue.name)) - statsd_client.incr("rq.jobs.started.{}".format(queue.name)) + rqJobsCounter.labels(queue.name, "running").inc() + rqJobsCounter.labels(queue.name, "started").inc() try: super().execute_job(job, queue) finally: - statsd_client.decr("rq.jobs.running.{}".format(queue.name)) - statsd_client.incr("rq.jobs.{}.{}".format(job.get_status(), queue.name)) + rqJobsCounter.labels(queue.name, "running").dec() + rqJobsCounter.labels(queue.name, job.get_status()).inc() class HardLimitingWorker(BaseWorker): diff --git a/tests/handlers/test_authentication.py b/tests/handlers/test_authentication.py index d2aec4386d..f3adad8f0f 100644 --- a/tests/handlers/test_authentication.py +++ b/tests/handlers/test_authentication.py @@ -17,7 +17,8 @@ def test_shows_reset_password_form(self): class TestInvite(BaseTestCase): - def test_expired_invite_token(self): + @mock.patch("prometheus_client.Histogram.observe") + def test_expired_invite_token(self, inc): with mock.patch("time.time") as patched_time: patched_time.return_value = time.time() - (7 * 24 * 3600) - 10 token = invite_token(self.factory.user) diff --git a/tests/metrics/test_database.py b/tests/metrics/test_database.py index 7206cc75c4..1d54417470 100644 --- a/tests/metrics/test_database.py +++ b/tests/metrics/test_database.py @@ -3,8 +3,8 @@ from tests import BaseTestCase -@patch("statsd.StatsClient.timing") +@patch("prometheus_client.Histogram.observe") class TestDatabaseMetrics(BaseTestCase): - def test_db_request_records_statsd_metrics(self, timing): + def test_db_request_records_metrics(self, observe): self.factory.create_query() - timing.assert_called_with("db.changes.insert", ANY) + observe.assert_called_with(ANY) diff --git a/tests/metrics/test_request.py b/tests/metrics/test_request.py index 64dc42b92c..89a2ae5379 100644 --- a/tests/metrics/test_request.py +++ b/tests/metrics/test_request.py @@ -3,8 +3,8 @@ from tests import BaseTestCase -@patch("statsd.StatsClient.timing") +@patch("prometheus_client.Histogram.observe") class TestRequestMetrics(BaseTestCase): - def test_flask_request_records_statsd_metrics(self, timing): + def test_flask_request_records_metrics(self, observe): self.client.get("/ping") - timing.assert_called_once_with("requests.redash_ping.get", ANY) + observe.assert_called_once_with(ANY) diff --git a/tests/tasks/test_schedule.py b/tests/tasks/test_schedule.py index ca553c6295..7a8c52a523 100644 --- a/tests/tasks/test_schedule.py +++ b/tests/tasks/test_schedule.py @@ -74,6 +74,6 @@ def foo(): schedule_periodic_jobs([{"func": foo, "interval": 60}]) - with patch("statsd.StatsClient.incr") as incr: + with patch("prometheus_client.Gauge.inc") as inc: rq_scheduler.enqueue_jobs() - incr.assert_called_once_with("rq.jobs.created.periodic") + inc.assert_called_once() diff --git a/tests/tasks/test_worker.py b/tests/tasks/test_worker.py index 2497c93275..4423c407a9 100644 --- a/tests/tasks/test_worker.py +++ b/tests/tasks/test_worker.py @@ -1,4 +1,4 @@ -from mock import call, patch +from mock import patch from rq import Connection from rq.job import JobStatus @@ -9,14 +9,14 @@ from tests import BaseTestCase -@patch("statsd.StatsClient.incr") +@patch("prometheus_client.Gauge.inc") class TestWorkerMetrics(BaseTestCase): def tearDown(self): with Connection(rq_redis_connection): for queue_name in default_queues: Queue(queue_name).empty() - def test_worker_records_success_metrics(self, incr): + def test_worker_records_success_metrics(self, inc): query = self.factory.create_query() with Connection(rq_redis_connection): @@ -31,16 +31,10 @@ def test_worker_records_success_metrics(self, incr): Worker(["queries"]).work(max_jobs=1) - calls = [ - call("rq.jobs.running.queries"), - call("rq.jobs.started.queries"), - call("rq.jobs.running.queries", -1, 1), - call("rq.jobs.finished.queries"), - ] - incr.assert_has_calls(calls) + inc.assert_called() @patch("rq.Worker.execute_job") - def test_worker_records_failure_metrics(self, _, incr): + def test_worker_records_failure_metrics(self, _, inc): """ Force superclass execute_job to do nothing and set status to JobStatus.Failed to simulate query failure """ @@ -59,23 +53,17 @@ def test_worker_records_failure_metrics(self, _, incr): Worker(["queries"]).work(max_jobs=1) - calls = [ - call("rq.jobs.running.queries"), - call("rq.jobs.started.queries"), - call("rq.jobs.running.queries", -1, 1), - call("rq.jobs.failed.queries"), - ] - incr.assert_has_calls(calls) + inc.assert_called() -@patch("statsd.StatsClient.incr") +@patch("prometheus_client.Gauge.inc") class TestQueueMetrics(BaseTestCase): def tearDown(self): with Connection(rq_redis_connection): for queue_name in default_queues: Queue(queue_name).empty() - def test_enqueue_query_records_created_metric(self, incr): + def test_enqueue_query_records_created_metric(self, inc): query = self.factory.create_query() with Connection(rq_redis_connection): @@ -88,12 +76,12 @@ def test_enqueue_query_records_created_metric(self, incr): {"Username": "Patrick", "query_id": query.id}, ) - incr.assert_called_with("rq.jobs.created.queries") + inc.assert_called_once() - def test_job_delay_records_created_metric(self, incr): + def test_job_delay_records_created_metric(self, inc): @job("default", timeout=300) def foo(): pass foo.delay() - incr.assert_called_with("rq.jobs.created.default") + inc.assert_called() From b06991b124264177221a6646ab75939c88e440a3 Mon Sep 17 00:00:00 2001 From: AndrewChubatiuk Date: Wed, 24 Apr 2024 07:23:51 +0300 Subject: [PATCH 2/2] use metrics prefix for statsd metrics --- redash/__init__.py | 2 +- redash/tasks/worker.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/redash/__init__.py b/redash/__init__.py index 2598343e5e..12b36bcc1b 100644 --- a/redash/__init__.py +++ b/redash/__init__.py @@ -45,7 +45,7 @@ def setup_logging(): if settings.STATSD_ENABLED: gb = GraphiteBridge((settings.STATSD_HOST, settings.STATSD_PORT), tags=settings.STATSD_USE_TAGS) - gb.start(settings.STATSD_PERIOD) + gb.start(settings.STATSD_PERIOD, settings.STATSD_PREFIX) redis_connection = redis.from_url(settings.REDIS_URL) rq_redis_connection = redis.from_url(settings.RQ_REDIS_URL) diff --git a/redash/tasks/worker.py b/redash/tasks/worker.py index cf1acb37cc..2b0877f532 100644 --- a/redash/tasks/worker.py +++ b/redash/tasks/worker.py @@ -64,9 +64,9 @@ class StatsdRecordingWorker(BaseWorker): """ def execute_job(self, job, queue): - rqJobsCounter.labels(queue.name, "running").inc() rqJobsCounter.labels(queue.name, "started").inc() try: + rqJobsCounter.labels(queue.name, "running").inc() super().execute_job(job, queue) finally: rqJobsCounter.labels(queue.name, "running").dec()