Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add prometheus metrics for the number of active pushers #7103

Merged
merged 2 commits into from
Mar 19, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7103.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add prometheus metrics for the number of active pushers.
13 changes: 8 additions & 5 deletions synapse/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
import platform
import threading
import time
from typing import Dict, Union
from numbers import Number
from typing import Callable, Dict, Iterable, Optional, Tuple, Union

import six

Expand Down Expand Up @@ -59,10 +60,12 @@ def collect():
@attr.s(hash=True)
class LaterGauge(object):

name = attr.ib()
desc = attr.ib()
labels = attr.ib(hash=False)
caller = attr.ib()
name = attr.ib(type=str)
desc = attr.ib(type=str)
labels = attr.ib(hash=False, type=Optional[Iterable[str]])
# callback: should either return a value (if there are no labels for this metric),
# or dict mapping from a label tuple to a value
caller = attr.ib(type=Callable[[], Union[Dict[Tuple[str], Number], Number]])

def collect(self):

Expand Down
24 changes: 23 additions & 1 deletion synapse/push/pusherpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@
# limitations under the License.

import logging
from collections import defaultdict
from typing import Dict, Union

from twisted.internet import defer

from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import PusherConfigException
from synapse.push.emailpusher import EmailPusher
from synapse.push.httppusher import HttpPusher
from synapse.push.pusher import PusherFactory
from synapse.util.async_helpers import concurrently_execute

Expand Down Expand Up @@ -47,7 +52,24 @@ def __init__(self, _hs):
self._should_start_pushers = _hs.config.start_pushers
self.store = self.hs.get_datastore()
self.clock = self.hs.get_clock()
self.pushers = {}

# map from user id to app_id:pushkey to pusher
self.pushers = {} # type: Dict[str, Dict[str, Union[HttpPusher, EmailPusher]]]

def count_pushers():
results = defaultdict(int)
for pushers in self.pushers.values():
for pusher in pushers.values():
k = (type(pusher).__name__, pusher.app_id)
results[k] += 1
return results

LaterGauge(
name="synapse_pushers",
desc="the number of active pushers",
labels=["kind", "app_id"],
caller=count_pushers,
)

def start(self):
"""Starts the pushers off in a background process.
Expand Down