Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Support outputting structured logs in addition to standard logs #8607

Merged
merged 23 commits into from
Oct 29, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
2db1099
Remove the structured logging configuration code.
clokep Oct 21, 2020
509594f
Remove an unnecessary return value.
clokep Oct 20, 2020
99f50ec
Rework structured logging to use the Python standard library logging …
clokep Oct 21, 2020
fcb74ae
Pipe through the server_name properly.
clokep Oct 21, 2020
24a9882
Use a standard library logger.
clokep Oct 21, 2020
4462758
Update synmark for the changes.
clokep Oct 21, 2020
24ab2df
Update logging format
clokep Oct 26, 2020
5fbc11c
Fix-up formatting using __all__.
clokep Oct 26, 2020
b2fc88b
Do not build an unnecessary set.
clokep Oct 26, 2020
650bb09
Stop using the DEBUG decorators.
clokep Oct 26, 2020
7115697
Raise an error if structured is in the logging config.
clokep Oct 26, 2020
e855dbb
__all__ takes strings, not objects.
clokep Oct 26, 2020
7071d89
Update the sample config.
clokep Oct 26, 2020
6b785c1
Revamp tests a bit to avoid impacting other tests.
clokep Oct 26, 2020
8d51476
Abstract handling of loggers in tests.
clokep Oct 27, 2020
7fb5505
Add a test for including additional structured data.
clokep Oct 27, 2020
11a488c
Lint.
clokep Oct 27, 2020
a19c967
Fix test after rename.
clokep Oct 27, 2020
e98a6d1
Add an upgrade note.
clokep Oct 27, 2020
babdd5b
Rework the code to load logging configs.
clokep Oct 28, 2020
10738cc
Convert legacy drain configurations to standard library handler configs.
clokep Oct 28, 2020
f801d71
Add back a JSON formatter without time.
clokep Oct 28, 2020
1c0181a
Fix type hints.
clokep Oct 28, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 53 additions & 60 deletions docs/structured_logging.md
Original file line number Diff line number Diff line change
@@ -1,83 +1,76 @@
# Structured Logging

A structured logging system can be useful when your logs are destined for a machine to parse and process. By maintaining its machine-readable characteristics, it enables more efficient searching and aggregations when consumed by software such as the "ELK stack".
A structured logging system can be useful when your logs are destined for a
machine to parse and process. By maintaining its machine-readable characteristics,
it enables more efficient searching and aggregations when consumed by software
such as the "ELK stack".

Synapse's structured logging system is configured via the file that Synapse's `log_config` config option points to. The file must be YAML and contain `structured: true`. It must contain a list of "drains" (places where logs go to).
Synapse's structured logging system is configured via the file that Synapse's
`log_config` config option points to. The file should include a formatter which
uses the `synapse.logging.TerseJsonFormatter` class included with Synapse and a
handler which uses the above formatter.

A structured logging configuration looks similar to the following:

```yaml
structured: true
version: 1

formatters:
structured:
class: synapse.logging.TerseJsonFormatter

handlers:
file:
class: logging.handlers.TimedRotatingFileHandler
formatter: structured
filename: /path/to/my/logs/homeserver.log
when: midnight
backupCount: 3 # Does not include the current log file.
encoding: utf8

loggers:
synapse:
level: INFO
handlers: [remote]
synapse.storage.SQL:
level: WARNING

drains:
console:
type: console
location: stdout
file:
type: file_json
location: homeserver.log
```

The above logging config will set Synapse as 'INFO' logging level by default, with the SQL layer at 'WARNING', and will have two logging drains (to the console and to a file, stored as JSON).

## Drain Types

Drain types can be specified by the `type` key.

### `console`

Outputs human-readable logs to the console.

Arguments:

- `location`: Either `stdout` or `stderr`.

### `console_json`

Outputs machine-readable JSON logs to the console.

Arguments:

- `location`: Either `stdout` or `stderr`.

### `console_json_terse`
The above logging config will set Synapse as 'INFO' logging level by default,
with the SQL layer at 'WARNING', and will log to a file, stored as JSON.

Outputs machine-readable JSON logs to the console, separated by newlines. This
format is not designed to be read and re-formatted into human-readable text, but
is optimal for a logging aggregation system.
It is also possible to figure Synapse to log to a remote endpoint by using the
`synapse.logging.RemoteHandler` class included with Synapse. It takes the
following arguments:

Arguments:

- `location`: Either `stdout` or `stderr`.

### `file`

Outputs human-readable logs to a file.

Arguments:

- `location`: An absolute path to the file to log to.

### `file_json`

Outputs machine-readable logs to a file.
- `host`: Hostname or IP address of the log aggregator.
- `port`: Numerical port to contact on the host.
- `maximum_buffer`: (Optional, defaults to 1000) The maximum buffer size to allow.

Arguments:
A remote structured logging configuration looks similar to the following:

- `location`: An absolute path to the file to log to.
```yaml
version: 1

### `network_json_terse`
formatters:
structured:
class: synapse.logging.TerseJsonFormatter

Delivers machine-readable JSON logs to a log aggregator over TCP. This is
compatible with LogStash's TCP input with the codec set to `json_lines`.
handlers:
remote:
class: synapse.logging.RemoteHandler
formatter: structured
host: 10.1.2.3
port: 9999

Arguments:
loggers:
synapse:
level: INFO
handlers: [remote]
synapse.storage.SQL:
level: WARNING
```

- `host`: Hostname or IP address of the log aggregator.
- `port`: Numerical port to contact on the host.
The above logging config will set Synapse as 'INFO' logging level by default,
with the SQL layer at 'WARNING', and will log JSON formatted messages to a
remote endpoint at 10.1.2.3:9999.
18 changes: 18 additions & 0 deletions synapse/logging/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# These are imported to allow for nicer logging configuration files.
from synapse.logging._remote import RemoteHandler # noqa
from synapse.logging._terse_json import TerseJsonFormatter # noqa
clokep marked this conversation as resolved.
Show resolved Hide resolved
95 changes: 51 additions & 44 deletions synapse/logging/_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import sys
import traceback
from collections import deque
Expand All @@ -21,6 +22,7 @@
from typing import Callable, Optional

import attr
from typing_extensions import Deque
from zope.interface import implementer

from twisted.application.internet import ClientService
Expand All @@ -32,7 +34,7 @@
)
from twisted.internet.interfaces import IPushProducer, ITransport
from twisted.internet.protocol import Factory, Protocol
from twisted.logger import ILogObserver, Logger, LogLevel
from twisted.logger import Logger


@attr.s
Expand All @@ -45,11 +47,11 @@ class LogProducer:
Args:
buffer: Log buffer to read logs from.
transport: Transport to write to.
format_event: A callable to format the log entry to a string.
format: A callable to format the log record to a string.
"""

transport = attr.ib(type=ITransport)
format_event = attr.ib(type=Callable[[dict], str])
_format = attr.ib(type=Callable[[logging.LogRecord], str])
_buffer = attr.ib(type=deque)
_paused = attr.ib(default=False, type=bool, init=False)

Expand All @@ -61,77 +63,84 @@ def stopProducing(self):
self._buffer = deque()

def resumeProducing(self):
# If we're already producing, nothing to do.
self._paused = False

# Loop until paused.
while self._paused is False and (self._buffer and self.transport.connected):
try:
# Request the next event and format it.
event = self._buffer.popleft()
msg = self.format_event(event)
# Request the next record and format it.
record = self._buffer.popleft()
msg = self._format(record)

# Send it as a new line over the transport.
self.transport.write(msg.encode("utf8"))
self.transport.write(b"\n")
except Exception:
# Something has gone wrong writing to the transport -- log it
# and break out of the while.
traceback.print_exc(file=sys.__stderr__)
break


@attr.s
@implementer(ILogObserver)
class TCPLogObserver:
class RemoteHandler(logging.Handler):
"""
An IObserver that writes JSON logs to a TCP target.
An logging handler that writes logs to a TCP target.

Args:
hs (HomeServer): The homeserver that is being logged for.
host: The host of the logging target.
port: The logging target's port.
format_event: A callable to format the log entry to a string.
maximum_buffer: The maximum buffer size.
"""

hs = attr.ib()
host = attr.ib(type=str)
port = attr.ib(type=int)
format_event = attr.ib(type=Callable[[dict], str])
maximum_buffer = attr.ib(type=int)
_buffer = attr.ib(default=attr.Factory(deque), type=deque)
_connection_waiter = attr.ib(default=None, type=Optional[Deferred])
_logger = attr.ib(default=attr.Factory(Logger))
_producer = attr.ib(default=None, type=Optional[LogProducer])

def start(self) -> None:
def __init__(
self,
host: str,
port: int,
maximum_buffer: int = 1000,
level=logging.NOTSET,
_reactor=None,
):
super().__init__(level=level)
self.host = host
self.port = port
self.maximum_buffer = maximum_buffer

self._buffer = deque() # type: Deque[logging.LogRecord]
self._connection_waiter = None # type: Optional[Deferred]
self._logger = Logger()
self._producer = None # type: Optional[LogProducer]

# Connect without DNS lookups if it's a direct IP.
if _reactor is None:
from twisted.internet import reactor

_reactor = reactor

try:
ip = ip_address(self.host)
if isinstance(ip, IPv4Address):
endpoint = TCP4ClientEndpoint(
self.hs.get_reactor(), self.host, self.port
)
endpoint = TCP4ClientEndpoint(_reactor, self.host, self.port)
elif isinstance(ip, IPv6Address):
endpoint = TCP6ClientEndpoint(
self.hs.get_reactor(), self.host, self.port
)
endpoint = TCP6ClientEndpoint(_reactor, self.host, self.port)
else:
raise ValueError("Unknown IP address provided: %s" % (self.host,))
except ValueError:
endpoint = HostnameEndpoint(self.hs.get_reactor(), self.host, self.port)
endpoint = HostnameEndpoint(_reactor, self.host, self.port)

factory = Factory.forProtocol(Protocol)
self._service = ClientService(endpoint, factory, clock=self.hs.get_reactor())
self._service = ClientService(endpoint, factory, clock=_reactor)
self._service.startService()
self._connect()

def stop(self):
def close(self):
self._service.stopService()

def _connect(self) -> None:
"""
Triggers an attempt to connect then write to the remote if not already writing.
"""
# Do not attempt to open multiple connections.
if self._connection_waiter:
return

Expand All @@ -158,37 +167,35 @@ def writer(r):

# Make a new producer and start it.
self._producer = LogProducer(
buffer=self._buffer,
transport=r.transport,
format_event=self.format_event,
buffer=self._buffer, transport=r.transport, format=self.format,
)
r.transport.registerProducer(self._producer, True)
self._producer.resumeProducing()
self._connection_waiter = None

def _handle_pressure(self) -> None:
"""
Handle backpressure by shedding events.
Handle backpressure by shedding records.

The buffer will, in this order, until the buffer is below the maximum:
- Shed DEBUG events
- Shed INFO events
- Shed the middle 50% of the events.
- Shed DEBUG records.
- Shed INFO records.
- Shed the middle 50% of the records.
"""
if len(self._buffer) <= self.maximum_buffer:
return

# Strip out DEBUGs
self._buffer = deque(
filter(lambda event: event["log_level"] != LogLevel.debug, self._buffer)
filter(lambda record: record.levelno > logging.DEBUG, self._buffer)
)

if len(self._buffer) <= self.maximum_buffer:
return

# Strip out INFOs
self._buffer = deque(
filter(lambda event: event["log_level"] != LogLevel.info, self._buffer)
filter(lambda record: record.levelno > logging.INFO, self._buffer)
)

if len(self._buffer) <= self.maximum_buffer:
Expand All @@ -209,14 +216,14 @@ def _handle_pressure(self) -> None:

self._buffer.extend(reversed(end_buffer))

def __call__(self, event: dict) -> None:
self._buffer.append(event)
def emit(self, record: logging.LogRecord) -> None:
self._buffer.append(record)

# Handle backpressure, if it exists.
try:
self._handle_pressure()
except Exception:
# If handling backpressure fails,clear the buffer and log the
# If handling backpressure fails, clear the buffer and log the
# exception.
self._buffer.clear()
self._logger.failure("Failed clearing backpressure")
Expand Down
Loading