Skip to content

Commit

Permalink
[disk-buffering] - Single responsibility for disk exporters (open-tel…
Browse files Browse the repository at this point in the history
  • Loading branch information
breedx-splk authored and psx95 committed Jan 26, 2024
1 parent 12bd584 commit 396b640
Show file tree
Hide file tree
Showing 45 changed files with 1,823 additions and 1,077 deletions.
30 changes: 22 additions & 8 deletions disk-buffering/CONTRIBUTING.md → disk-buffering/DESIGN.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,25 @@
# Contributor Guide
# Design Overview

Each one of the three exporters provided by this
tool ([LogRecordDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordDiskExporter.java), [MetricDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/MetricDiskExporter.java)
and [SpanDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/SpanDiskExporter.java))
is responsible of performing 2 actions, `write` and `read/delegate`, the `write` one happens
automatically as a set of signals are provided from the processor, while the `read/delegate` one has
to be triggered manually by the consumer of this library as explained in the [README](README.md).
There are three main disk-writing exporters provided by this module:

* [LogRecordToDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordToDiskExporter.java)
* [MetricToDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/MetricToDiskExporter.java)
* [SpanToDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/SpanToDiskExporter.java))

Each is responsible for writing a specific type of telemetry to disk storage for later
harvest/ingest.

For later reading, there are:

* [LogRecordFromToDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordFromDiskExporter.java)
* [MetricFromDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/MetricFromDiskExporter.java)
* [SpanFromDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporter.java))

Each one of those has a `create()` method that takes a delegate exporter (to send data
to ingest) and the `StorageConfiguration` that tells them where to find buffered data.

As explained in the [README](README.md), this has to be triggered manually by the consumer of
this library and does not happen automatically.

## Writing overview

Expand All @@ -14,7 +28,7 @@ to be triggered manually by the consumer of this library as explained in the [RE
* The writing process happens automatically within its `export(Collection<SignalData> signals)`
method, which is called by the configured signal processor.
* When a set of signals is received, these are delegated over to
the [DiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporters/DiskExporter.java)
a type-specific wrapper of [ToDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporter.java)
class which then serializes them using an implementation
of [SignalSerializer](src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/SignalSerializer.java)
and then the serialized data is appended into a File using an instance of
Expand Down
12 changes: 6 additions & 6 deletions disk-buffering/README.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
# Disk buffering

This module provides signal exporter wrappers that intercept and store signals in files which can be
This module provides exporters that store telemetry data in files which can be
sent later on demand. A high level description of how it works is that there are two separate
processes in place, one for writing data in disk, and one for reading/exporting the previously
stored data.

* Each exporter stores the received data automatically in disk right after it's received from its
processor.
* The reading of the data back from disk and exporting process has to be done manually. At
the moment there's no automatic mechanism to do so. There's more information on it can be
the moment there's no automatic mechanism to do so. There's more information on how it can be
achieved, under [Reading data](#reading-data).

> For a more detailed information on how the whole process works, take a look at
> the [CONTRIBUTING](CONTRIBUTING.md) file.
> the [DESIGN.md](DESIGN.md) file.
## Configuration

Expand Down Expand Up @@ -43,11 +43,11 @@ In order to use it, you need to wrap your own exporter with a new instance of
the ones provided in here:

* For a LogRecordExporter, it must be wrapped within
a [LogRecordDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordDiskExporter.java).
a [LogRecordToDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordToDiskExporter.java).
* For a MetricExporter, it must be wrapped within
a [MetricDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/MetricDiskExporter.java).
a [MetricToDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/MetricToDiskExporter.java).
* For a SpanExporter, it must be wrapped within
a [SpanDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/SpanDiskExporter.java).
a [SpanToDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/SpanToDiskExporter.java).

Each wrapper will need the following when instantiating them:

Expand Down
2 changes: 1 addition & 1 deletion disk-buffering/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ wire {
java {}

sourcePath {
srcJar("io.opentelemetry.proto:opentelemetry-proto:0.20.0-alpha")
srcJar("io.opentelemetry.proto:opentelemetry-proto:1.1.0-alpha")
}

root(
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.disk.buffering;

import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporter;
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class LogRecordFromDiskExporter implements FromDiskExporter {

private final FromDiskExporterImpl<LogRecordData> delegate;

public static LogRecordFromDiskExporter create(
LogRecordExporter exporter, StorageConfiguration config) throws IOException {
FromDiskExporterImpl<LogRecordData> delegate =
FromDiskExporterImpl.<LogRecordData>builder()
.setFolderName("logs")
.setStorageConfiguration(config)
.setDeserializer(SignalSerializer.ofLogs())
.setExportFunction(exporter::export)
.build();
return new LogRecordFromDiskExporter(delegate);
}

private LogRecordFromDiskExporter(FromDiskExporterImpl<LogRecordData> delegate) {
this.delegate = delegate;
}

@Override
public boolean exportStoredBatch(long timeout, TimeUnit unit) throws IOException {
return delegate.exportStoredBatch(timeout, unit);
}

@Override
public void shutdown() throws IOException {
delegate.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.disk.buffering;

import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.exporter.ToDiskExporter;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
import java.io.IOException;
import java.util.Collection;

/**
* This class implements a {@link LogRecordExporter} that delegates to an instance of {@code
* ToDiskExporter<LogRecordData>}.
*/
public class LogRecordToDiskExporter implements LogRecordExporter {
private final ToDiskExporter<LogRecordData> delegate;

/**
* Creates a new LogRecordToDiskExporter that will buffer LogRecordData telemetry on disk storage.
*
* @param delegate - The LogRecordExporter to delegate to if disk writing fails.
* @param config - The StorageConfiguration that specifies how storage is managed.
* @return A new LogRecordToDiskExporter instance.
* @throws IOException if the delegate ToDiskExporter could not be created.
*/
public static LogRecordToDiskExporter create(
LogRecordExporter delegate, StorageConfiguration config) throws IOException {
ToDiskExporter<LogRecordData> toDisk =
ToDiskExporter.<LogRecordData>builder()
.setFolderName("logs")
.setStorageConfiguration(config)
.setSerializer(SignalSerializer.ofLogs())
.setExportFunction(delegate::export)
.build();
return new LogRecordToDiskExporter(toDisk);
}

// Visible for testing
LogRecordToDiskExporter(ToDiskExporter<LogRecordData> delegate) {
this.delegate = delegate;
}

@Override
public CompletableResultCode export(Collection<LogRecordData> logs) {
return delegate.export(logs);
}

@Override
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
}

@Override
public CompletableResultCode shutdown() {
try {
delegate.shutdown();
return CompletableResultCode.ofSuccess();
} catch (IOException e) {
return CompletableResultCode.ofFailure();
}
}
}
Loading

0 comments on commit 396b640

Please sign in to comment.