Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Consistently use wrap_as_background_task in more places #8599

Merged
merged 2 commits into from
Oct 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8599.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow running background tasks in a separate worker process.
12 changes: 3 additions & 9 deletions synapse/handlers/account_validity.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from synapse.api.errors import StoreError
from synapse.logging.context import make_deferred_yieldable
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.types import UserID
from synapse.util import stringutils

Expand Down Expand Up @@ -63,16 +63,10 @@ def __init__(self, hs):
self._raw_from = email.utils.parseaddr(self._from_string)[1]

# Check the renewal emails to send and send them every 30min.
def send_emails():
# run as a background process to make sure that the database transactions
# have a logcontext to report to
return run_as_background_process(
"send_renewals", self._send_renewal_emails
)

if hs.config.run_background_tasks:
self.clock.looping_call(send_emails, 30 * 60 * 1000)
self.clock.looping_call(self._send_renewal_emails, 30 * 60 * 1000)

@wrap_as_background_process("send_renewals")
async def _send_renewal_emails(self):
"""Gets the list of users whose account is expiring in the amount of time
configured in the ``renew_at`` parameter from the ``account_validity``
Expand Down
10 changes: 3 additions & 7 deletions synapse/handlers/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
StoreError,
SynapseError,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.types import UserID, create_requester, get_domain_from_id

from ._base import BaseHandler
Expand Down Expand Up @@ -57,7 +57,7 @@ def __init__(self, hs):

if hs.config.run_background_tasks:
self.clock.looping_call(
self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS
self._update_remote_profile_cache, self.PROFILE_UPDATE_MS
)

async def get_profile(self, user_id):
Expand Down Expand Up @@ -370,11 +370,7 @@ async def check_profile_query_allowed(self, target_user, requester=None):
raise SynapseError(403, "Profile isn't available", Codes.FORBIDDEN)
raise

def _start_update_remote_profile_cache(self):
return run_as_background_process(
"Update remote profile", self._update_remote_profile_cache
)

@wrap_as_background_process("Update remote profile")
async def _update_remote_profile_cache(self):
"""Called periodically to check profiles of remote users we haven't
checked in a while.
Expand Down
11 changes: 6 additions & 5 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@
from synapse.events import EventBase, make_event_from_dict
from synapse.events.utils import prune_event
from synapse.logging.context import PreserveLoggingContext, current_context
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
)
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.replication.tcp.streams import BackfillStream
from synapse.replication.tcp.streams.events import EventsStream
Expand Down Expand Up @@ -140,10 +143,7 @@ def __init__(self, database: DatabasePool, db_conn, hs):
if hs.config.run_background_tasks:
# We periodically clean out old transaction ID mappings
self._clock.looping_call(
run_as_background_process,
5 * 60 * 1000,
"_cleanup_old_transaction_ids",
self._cleanup_old_transaction_ids,
self._cleanup_old_transaction_ids, 5 * 60 * 1000,
)

self._get_event_cache = LruCache(
Expand Down Expand Up @@ -1374,6 +1374,7 @@ async def get_already_persisted_events(

return mapping

@wrap_as_background_process("_cleanup_old_transaction_ids")
async def _cleanup_old_transaction_ids(self):
"""Cleans out transaction id mappings older than 24hrs.
"""
Expand Down
16 changes: 7 additions & 9 deletions synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
)
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import DatabasePool
from synapse.storage.databases.main.events_worker import EventsWorkerStore
Expand Down Expand Up @@ -67,16 +70,10 @@ def __init__(self, database: DatabasePool, db_conn, hs):
):
self._known_servers_count = 1
self.hs.get_clock().looping_call(
run_as_background_process,
60 * 1000,
"_count_known_servers",
self._count_known_servers,
self._count_known_servers, 60 * 1000,
)
self.hs.get_clock().call_later(
1000,
run_as_background_process,
"_count_known_servers",
self._count_known_servers,
1000, self._count_known_servers,
)
LaterGauge(
"synapse_federation_known_servers",
Expand All @@ -85,6 +82,7 @@ def __init__(self, database: DatabasePool, db_conn, hs):
lambda: self._known_servers_count,
)

@wrap_as_background_process("_count_known_servers")
async def _count_known_servers(self):
"""
Count the servers that this server knows about.
Expand Down