Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[disk-buffering] - Single responsibility for disk exporters #1161

Merged
merged 25 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
8388a4a
move exporters into exporter package
breedx-splk Jan 16, 2024
694c216
fix case of boolean values
breedx-splk Jan 16, 2024
2f388e7
rename StoredBatchExporter to FromDiskExporter for clarity
breedx-splk Jan 16, 2024
9a29319
split responsibility of "to disk" and "from disk"
breedx-splk Jan 17, 2024
36ef54e
split responsibility of "to disk" and "from disk"
breedx-splk Jan 17, 2024
101df22
spotless
breedx-splk Jan 17, 2024
9600235
fix comment
breedx-splk Jan 17, 2024
b7d2171
add javadoc
breedx-splk Jan 17, 2024
010c86a
reorder fields
breedx-splk Jan 17, 2024
ebd3d66
small edits
breedx-splk Jan 17, 2024
5a3dc36
definitely not a CONTRIBUTING.md as we know it, so rename. it's a des…
breedx-splk Jan 17, 2024
0638fa1
repackage, add create() to SpanToDiskExporter to hide ToDiskExporter …
breedx-splk Jan 17, 2024
357a4f0
continue hiding constructors and make create() methods
breedx-splk Jan 17, 2024
4885b11
design update
breedx-splk Jan 17, 2024
f4ef495
spotless
breedx-splk Jan 18, 2024
f3a3228
markdown format
breedx-splk Jan 19, 2024
d26a951
fix typo
breedx-splk Jan 19, 2024
62f41c3
update protos, add serialization for flags. Update tests.
breedx-splk Jan 22, 2024
b0a2a6e
fix tests
breedx-splk Jan 22, 2024
42ae1b4
apply some reuse and fix tests
breedx-splk Jan 23, 2024
7bd21cf
more test fixup
breedx-splk Jan 23, 2024
a134536
remove dead code
breedx-splk Jan 23, 2024
43edbf3
rename and fix up markdown links to renamed code
breedx-splk Jan 23, 2024
93af94a
markdown fixes
breedx-splk Jan 23, 2024
5061b58
remove stray * for list
breedx-splk Jan 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 22 additions & 8 deletions disk-buffering/CONTRIBUTING.md → disk-buffering/DESIGN.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,25 @@
# Contributor Guide
# Design Overview

Each one of the three exporters provided by this
tool ([LogRecordDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordDiskExporter.java), [MetricDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/MetricDiskExporter.java)
and [SpanDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/SpanDiskExporter.java))
is responsible of performing 2 actions, `write` and `read/delegate`, the `write` one happens
automatically as a set of signals are provided from the processor, while the `read/delegate` one has
to be triggered manually by the consumer of this library as explained in the [README](README.md).
There are three main disk-writing exporters provided by this module:

* [LogRecordToDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordToDiskExporter.java)
* [MetricToDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/MetricToDiskExporter.java)
* [SpanToDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/SpanToDiskExporter.java))

Each is responsible for writing a specific type of telemetry to disk storage for later
harvest/ingest.

For later reading, there are:

* [LogRecordFromToDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordFromDiskExporter.java)
* [MetricFromDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/MetricFromDiskExporter.java)
* [SpanFromDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporter.java))

Each one of those has a `create()` method that takes a delegate exporter (to send data
to ingest) and the `StorageConfiguration` that tells them where to find buffered data.

As explained in the [README](README.md), this has to be triggered manually by the consumer of
* this library and does not happen automatically.

## Writing overview

Expand All @@ -14,7 +28,7 @@ to be triggered manually by the consumer of this library as explained in the [RE
* The writing process happens automatically within its `export(Collection<SignalData> signals)`
method, which is called by the configured signal processor.
* When a set of signals is received, these are delegated over to
the [DiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporters/DiskExporter.java)
a type-specific wrapper of [ToDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporter.java)
class which then serializes them using an implementation
of [SignalSerializer](src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/SignalSerializer.java)
and then the serialized data is appended into a File using an instance of
Expand Down
12 changes: 6 additions & 6 deletions disk-buffering/README.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
# Disk buffering

This module provides signal exporter wrappers that intercept and store signals in files which can be
This module provides exporters that store telemetry data in files which can be
sent later on demand. A high level description of how it works is that there are two separate
processes in place, one for writing data in disk, and one for reading/exporting the previously
stored data.

* Each exporter stores the received data automatically in disk right after it's received from its
processor.
* The reading of the data back from disk and exporting process has to be done manually. At
the moment there's no automatic mechanism to do so. There's more information on it can be
the moment there's no automatic mechanism to do so. There's more information on how it can be
achieved, under [Reading data](#reading-data).

> For a more detailed information on how the whole process works, take a look at
> the [CONTRIBUTING](CONTRIBUTING.md) file.
> the [DESIGN.md](DESIGN.md) file.

## Configuration

Expand Down Expand Up @@ -43,11 +43,11 @@ In order to use it, you need to wrap your own exporter with a new instance of
the ones provided in here:

* For a LogRecordExporter, it must be wrapped within
a [LogRecordDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordDiskExporter.java).
a [LogRecordToDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordToDiskExporter.java).
* For a MetricExporter, it must be wrapped within
a [MetricDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/MetricDiskExporter.java).
a [MetricToDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/MetricToDiskExporter.java).
* For a SpanExporter, it must be wrapped within
a [SpanDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/SpanDiskExporter.java).
a [SpanToDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/SpanToDiskExporter.java).

Each wrapper will need the following when instantiating them:

Expand Down
2 changes: 1 addition & 1 deletion disk-buffering/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ wire {
java {}

sourcePath {
srcJar("io.opentelemetry.proto:opentelemetry-proto:0.20.0-alpha")
srcJar("io.opentelemetry.proto:opentelemetry-proto:1.1.0-alpha")
}

root(
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.disk.buffering;

import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporter;
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class LogRecordFromDiskExporter implements FromDiskExporter {

private final FromDiskExporterImpl<LogRecordData> delegate;

public static LogRecordFromDiskExporter create(
LogRecordExporter exporter, StorageConfiguration config) throws IOException {
FromDiskExporterImpl<LogRecordData> delegate =
FromDiskExporterImpl.<LogRecordData>builder()
.setFolderName("logs")
.setStorageConfiguration(config)
.setDeserializer(SignalSerializer.ofLogs())
.setExportFunction(exporter::export)
.build();
return new LogRecordFromDiskExporter(delegate);
}

private LogRecordFromDiskExporter(FromDiskExporterImpl<LogRecordData> delegate) {
this.delegate = delegate;
}

@Override
public boolean exportStoredBatch(long timeout, TimeUnit unit) throws IOException {
return delegate.exportStoredBatch(timeout, unit);
}

@Override
public void shutdown() throws IOException {
delegate.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.disk.buffering;

import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.exporter.ToDiskExporter;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
import java.io.IOException;
import java.util.Collection;

/**
* This class implements a {@link LogRecordExporter} that delegates to an instance of {@code
* ToDiskExporter<LogRecordData>}.
*/
public class LogRecordToDiskExporter implements LogRecordExporter {
private final ToDiskExporter<LogRecordData> delegate;

/**
* Creates a new LogRecordToDiskExporter that will buffer LogRecordData telemetry on disk storage.
*
* @param delegate - The LogRecordExporter to delegate to if disk writing fails.
* @param config - The StorageConfiguration that specifies how storage is managed.
* @return A new LogRecordToDiskExporter instance.
* @throws IOException if the delegate ToDiskExporter could not be created.
*/
public static LogRecordToDiskExporter create(
LogRecordExporter delegate, StorageConfiguration config) throws IOException {
ToDiskExporter<LogRecordData> toDisk =
ToDiskExporter.<LogRecordData>builder()
.setFolderName("logs")
.setStorageConfiguration(config)
.setSerializer(SignalSerializer.ofLogs())
.setExportFunction(delegate::export)
.build();
return new LogRecordToDiskExporter(toDisk);
}

// Visible for testing
LogRecordToDiskExporter(ToDiskExporter<LogRecordData> delegate) {
this.delegate = delegate;
}

@Override
public CompletableResultCode export(Collection<LogRecordData> logs) {
return delegate.export(logs);
}

@Override
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
}

@Override
public CompletableResultCode shutdown() {
try {
delegate.shutdown();
return CompletableResultCode.ofSuccess();
} catch (IOException e) {
return CompletableResultCode.ofFailure();
}
}
}
Loading
Loading