Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[disk-buffering] - Single responsibility for disk exporters #1161

Merged
merged 25 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
8388a4a
move exporters into exporter package
breedx-splk Jan 16, 2024
694c216
fix case of boolean values
breedx-splk Jan 16, 2024
2f388e7
rename StoredBatchExporter to FromDiskExporter for clarity
breedx-splk Jan 16, 2024
9a29319
split responsibility of "to disk" and "from disk"
breedx-splk Jan 17, 2024
36ef54e
split responsibility of "to disk" and "from disk"
breedx-splk Jan 17, 2024
101df22
spotless
breedx-splk Jan 17, 2024
9600235
fix comment
breedx-splk Jan 17, 2024
b7d2171
add javadoc
breedx-splk Jan 17, 2024
010c86a
reorder fields
breedx-splk Jan 17, 2024
ebd3d66
small edits
breedx-splk Jan 17, 2024
5a3dc36
definitely not a CONTRIBUTING.md as we know it, so rename. it's a des…
breedx-splk Jan 17, 2024
0638fa1
repackage, add create() to SpanToDiskExporter to hide ToDiskExporter …
breedx-splk Jan 17, 2024
357a4f0
continue hiding constructors and make create() methods
breedx-splk Jan 17, 2024
4885b11
design update
breedx-splk Jan 17, 2024
f4ef495
spotless
breedx-splk Jan 18, 2024
f3a3228
markdown format
breedx-splk Jan 19, 2024
d26a951
fix typo
breedx-splk Jan 19, 2024
62f41c3
update protos, add serialization for flags. Update tests.
breedx-splk Jan 22, 2024
b0a2a6e
fix tests
breedx-splk Jan 22, 2024
42ae1b4
apply some reuse and fix tests
breedx-splk Jan 23, 2024
7bd21cf
more test fixup
breedx-splk Jan 23, 2024
a134536
remove dead code
breedx-splk Jan 23, 2024
43edbf3
rename and fix up markdown links to renamed code
breedx-splk Jan 23, 2024
93af94a
markdown fixes
breedx-splk Jan 23, 2024
5061b58
remove stray * for list
breedx-splk Jan 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions disk-buffering/CONTRIBUTING.md → disk-buffering/DESIGN.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
# Contributor Guide

Each one of the three exporters provided by this
tool ([LogRecordDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordDiskExporter.java), [MetricDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/MetricDiskExporter.java)
and [SpanDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/SpanDiskExporter.java))
is responsible of performing 2 actions, `write` and `read/delegate`, the `write` one happens
automatically as a set of signals are provided from the processor, while the `read/delegate` one has
to be triggered manually by the consumer of this library as explained in the [README](README.md).
# Design Overview

There are three main disk-writing exporters provided by this module:
* [LogRecordToDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordToDiskExporter.java)
* [MetricToDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/MetricToDiskExporter.java)
* [SpanToDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/SpanToDiskExporter.java))

Each is responsible for writing a specific type of telemetry to disk storage for later
harvest/ingest.

For later reading, there is:
* [FromDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/FromDiskExporter.java)
which can be created with `FromDiskExporter.builder()....build()`.
As explained in the [README](README.md), this has to be triggered manually by the consumer of
* this library and does not happen automaticall

breedx-splk marked this conversation as resolved.
Show resolved Hide resolved
## Writing overview

Expand Down
6 changes: 3 additions & 3 deletions disk-buffering/README.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
# Disk buffering

This module provides signal exporter wrappers that intercept and store signals in files which can be
This module provides exporters that store telemetry data in files which can be
sent later on demand. A high level description of how it works is that there are two separate
processes in place, one for writing data in disk, and one for reading/exporting the previously
stored data.

* Each exporter stores the received data automatically in disk right after it's received from its
processor.
* The reading of the data back from disk and exporting process has to be done manually. At
the moment there's no automatic mechanism to do so. There's more information on it can be
the moment there's no automatic mechanism to do so. There's more information on how it can be
achieved, under [Reading data](#reading-data).

> For a more detailed information on how the whole process works, take a look at
> the [CONTRIBUTING](CONTRIBUTING.md) file.
> the [DESIGN.md](DESIGN.md) file.

## Configuration

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.disk.buffering;

import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* Signal-type generic class that can read telemetry previously buffered on disk and send it to
* another delegated exporter.
*/
public final class FromDiskExporter<EXPORT_DATA> {
private final Storage storage;
private final SignalSerializer<EXPORT_DATA> deserializer;
private final Function<Collection<EXPORT_DATA>, CompletableResultCode> exportFunction;
private static final Logger logger = Logger.getLogger(FromDiskExporter.class.getName());

FromDiskExporter(
SignalSerializer<EXPORT_DATA> deserializer,
Function<Collection<EXPORT_DATA>, CompletableResultCode> exportFunction,
Storage storage) {
this.deserializer = deserializer;
this.exportFunction = exportFunction;
this.storage = storage;
}

public static <T> FromDiskExporterBuilder<T> builder() {
return new FromDiskExporterBuilder<>();
}

/**
* Reads data from the disk and attempts to export it.
*
* @param timeout The amount of time to wait for the wrapped exporter to finish.
* @param unit The unit of the time provided.
* @return true if there was data available and it was successfully exported within the timeout
* provided. false otherwise.
* @throws IOException If an unexpected error happens.
*/
public boolean exportStoredBatch(long timeout, TimeUnit unit) throws IOException {
logger.log(Level.INFO, "Attempting to export batch from disk.");
ReadableResult result =
storage.readAndProcess(
bytes -> {
logger.log(Level.INFO, "About to export stored batch.");
CompletableResultCode join =
exportFunction.apply(deserializer.deserialize(bytes)).join(timeout, unit);
return join.isSuccess();
});
return result == ReadableResult.SUCCEEDED;
}

public void onShutDown() throws IOException {
storage.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.disk.buffering;

import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
import io.opentelemetry.contrib.disk.buffering.internal.storage.StorageBuilder;
import io.opentelemetry.sdk.common.Clock;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import org.jetbrains.annotations.NotNull;

public class FromDiskExporterBuilder<T> {

private SignalSerializer<T> serializer = noopSerializer();
private Function<Collection<T>, CompletableResultCode> exportFunction =
x -> CompletableResultCode.ofFailure();

@NotNull
private static <T> SignalSerializer<T> noopSerializer() {
return new SignalSerializer<T>() {

@Override
public byte[] serialize(Collection<T> ts) {
return new byte[0];
}

@Override
public List<T> deserialize(byte[] source) {
return Collections.emptyList();
}
};
}

private final StorageBuilder storageBuilder = Storage.builder();

@CanIgnoreReturnValue
public FromDiskExporterBuilder<T> setFolderName(String folderName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The folder name and the serializer must be the same as the ones used in each ToDiskExporter impl for it to work properly, so I'm a bit concerned about providing this builder publicly as it seems like it could be easy to misconfigure it. Instead, I would move it (and also FromDiskExporter.java) to the internal package and create public implementations for each signal in this package, similarly to what's done for the ToDiskExporters.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I gave that a shot....and it ended up working out and now there's nice parity between the ToDisk and the FromDisk per signal types. Thanks for the idea!

Unfortunately, it ended up being a LOT of work to get the tests shored up. For starters, we were several versions behind on the protobufs...and I got us on the latest. The latest, tho, includes the TraceFlags as part of the protos, which we hadn't accounted for. There's also asymmetry in several places because the TraceFlags are not serialized into protobufs for the Exemplars (and maybe also for parent context, I forget).

I any case, I had to shim in a bunch of slightly different expected results, which contain the TraceFlags.getSampled() which is what we hard-code to on the de-serialization side. I think overall though it probably cleaned up the tests a little and definitely encouraged some reuse.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh man, I know how painful dealing with these serializations is, especially when it comes to metrics! 😩 thank you for taking the time to update it 🙏

storageBuilder.setFolderName(folderName);
return this;
}

@CanIgnoreReturnValue
public FromDiskExporterBuilder<T> setStorageConfiguration(StorageConfiguration configuration) {
storageBuilder.setStorageConfiguration(configuration);
return this;
}

@CanIgnoreReturnValue
public FromDiskExporterBuilder<T> setStorageClock(Clock clock) {
storageBuilder.setStorageClock(clock);
return this;
}

@CanIgnoreReturnValue
public FromDiskExporterBuilder<T> setDeserializer(SignalSerializer<T> serializer) {
this.serializer = serializer;
return this;
}

@CanIgnoreReturnValue
public FromDiskExporterBuilder<T> setExportFunction(
Function<Collection<T>, CompletableResultCode> exportFunction) {
this.exportFunction = exportFunction;
return this;
}

public FromDiskExporter<T> build() throws IOException {
Storage storage = storageBuilder.build();
return new FromDiskExporter<>(serializer, exportFunction, storage);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.disk.buffering;

import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.exporter.ToDiskExporter;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
import java.io.IOException;
import java.util.Collection;

/**
* This class implements a {@link LogRecordExporter} that delegates to an instance of {@code
* ToDiskExporter<LogRecordData>}.
*/
public class LogRecordToDiskExporter implements LogRecordExporter {
private final ToDiskExporter<LogRecordData> delegate;

/**
* Creates a new LogRecordToDiskExporter that will buffer LogRecordData telemetry on
* disk storage.
* @param delegate - The LogRecordExporter to delegate to if disk writing fails.
* @param config - The StorageConfiguration that specifies how storage is managed.
* @return A new LogRecordToDiskExporter instance.
* @throws IOException if the delegate ToDiskExporter could not be created.
*/
public static LogRecordToDiskExporter create(LogRecordExporter delegate, StorageConfiguration config) throws IOException {
ToDiskExporter<LogRecordData> toDisk = ToDiskExporter.<LogRecordData>builder()
.setFolderName("logs")
.setStorageConfiguration(config)
.setSerializer(SignalSerializer.ofLogs())
.setExportFunction(delegate::export)
.build();
return new LogRecordToDiskExporter(toDisk);
}

// Visible for testing
LogRecordToDiskExporter(ToDiskExporter<LogRecordData> delegate) {
this.delegate = delegate;
}

@Override
public CompletableResultCode export(Collection<LogRecordData> logs) {
return delegate.export(logs);
}

@Override
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
}

@Override
public CompletableResultCode shutdown() {
try {
delegate.shutdown();
return CompletableResultCode.ofSuccess();
} catch (IOException e) {
return CompletableResultCode.ofFailure();
}
}
}
Loading
Loading