Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New Component proposal: DeDuplicator processor / sampler #17874

Closed
2 tasks
yuvalo1212 opened this issue Jan 20, 2023 · 10 comments
Closed
2 tasks

New Component proposal: DeDuplicator processor / sampler #17874

yuvalo1212 opened this issue Jan 20, 2023 · 10 comments
Labels

Comments

@yuvalo1212
Copy link

The purpose and use-cases of the new component

Problem description

opentelemetry-collector is designed to handle high throughput of spans, sampling strategies can be applied to reduce the amount of spans being processed and exported.

Current sampling strategies lack the ability for handling a case of high data-duplication, high duplication count could cause the “same logical trace” (duplicated trace) to be exported multiple times leading for duplicates being stored processed and transferred over the network.

This statement is even more relevant for high-throughput distributed systems, where the chances of a duplicate trace being produced multiple times are high as the flow of data would mostly stay unchanged. I have personally experienced this problem which lead for the development of the described processor.

We would consider a trace a duplicate if it:

  • Has the same amount of spans
  • Has the same hierarchical order of the same spans
  • Spans are considered the same if:
    • They have the same serviceName
    • They have the same operationName
    • They have the same corresponding values for specific tag keys (tag keys are configurable by the user)

Let’s take for example the following simple traces T1 and T2. Those traces are considered duplicates as they represent the same “logical flow” of the system:

image

Consider a case where the same trace (T1) is generated at a rate of 1000 duplicates per-second (T2), depending on your use-case, you might only want to store and process this trace once every configurable time-bucket (10 minutes for example), while dropping all duplicates (T2).

Alternative sampling strategies

There are a couple of existing sampling strategies that can be applied inside the collector to reduce the amount of exported traces, they are good for specific use cases, they don’t exactly apply for the case described above for the following reasons:

Statistical span sampling

Using statistical span sampling, the collector is able to accept only a certain percentage of the total spans. This could lead to a case of a “broken trace” where the exported trace would have missing spans, causing the trace to appear as if there are no parents and children for certain spans because they were dropped by the sampler.

Statistical trace sampling

It is possible to group the spans by their trace-id and after a certain amount of time apply statistical sampling on the aggregated trace.
Problem with this approach is that it is possible that certain distinct-traces would not be sampled at all while a certain distinct-trace that is generated more frequently than others would be sampled. This could cause a case where not all of the distinct-traces are sampled.

Tail sampling with filters

We can tail sample with specific span filters, the problem with this approach is:

  • Only traces that pass the filter are exported
  • It is possible that duplicates of a trace that passes the filter are generated causing high throughput of duplicated trace

In conclusion, currently there is no solution for high data-deduplication

The proposed solution

The solution implemented to solve this problem was to create a processor that performs trace de-duplication (in my case I just edited the existing implementation of group-by-trace processor but this can be created as a separate processor).

The processor works the following way:

  1. The processor aggregates spans that belong to the same trace.
  2. When the first span of a trace is accepted the processor starts a timer for a configurable amount of time. When timer expires we consider the entire trace to be ready, as implemented in both tail-based-sampling and group-by-trace processors
  3. After the timer for a specific trace has expired the processor considers the trace as ready to be calculated
  4. The processor takes the entire trace and calls a function that generates a uniqueID.
    The function receives both the trace itself and configuration provided by the user of span-tags/ resource-tags to be calculated as part of the uniqueID function.
  5. Using the generated uniqueID the processors checks against a local cache if it had already seen that uniqueID, if the uniqueID is not yet stored inside the cache, the processor “accepts” the trace and passes it to the next processor. If the uniqueID is located inside the local cache, the processor considers the trace a duplicate as it had already seen a trace that generated the same uniqueID, so it drops the given trace.
  6. If the processor inserted a new uniqueID into the cache, it would expire it after a configurable amount of time allowing for a trace with the same uniqueID to be processed again after a given time-frame.

It is important to note that the deduplication solution is suitable for cases of low cardinality of data. As the cardinality of values increases so does the amount of distinct traces.

Usage example

Taking the example from before:

image

Given traces T1 and T2 we would like to have the following configuration, which would accept only one of the traces every 10 seconds:

groupbytrace: # can be a seperate processor
    deduplication_timeout: 5m
    wait_duration: 10s
    num_traces: 20000

    hash_field:
      - name: "http.url"
        required: false
      - name: "http.target"
        required: false
      - name: "messaging.system"
        required: false
  • deduplication_timeout - timeout for uniqueIDs inserted into local cache
  • wait_duration - Time to wait since first span of a trace was accepted until performing the uniqueID function
  • num_traces - The maximum amount of traces allowed to be stored in memory
  • hash_field - list of fields to be extracted from the tags which values are added into the unique-id calculation function. The processor starts with looking at the resource-tags and upon failure to find the keys would fallback for looking at the span-tags
    • name - The key whose value is extracted from the tags
    • required - If set to true, when the processor fails to find this key inside the tags, either inside the resource or the span itself, the trace would be accepted as not all of the required fields are found for the uniqueID function.
      If set to false and the key is not found inside the tags, the processor ignores the value of this key.

Disclaimers

  • Horizontal scaling - when using the deduplicator with horizontal scaling we should consider the following:
    • A trace-id aware load-balancer should be placed before the deduplicator so that spans from the same trace would end up at the same collector instance
    • Deduplicator currently uses an in-memory cache to store the uniqueID of traces. This means, when horizontally scaled, the maximum potential amount of times a distinct trace can be accepted into the system is equivalent to the amount of collectors running in the system, this is because if multiple traces with the same uniqueID arrive to different collectors each would accept them as they use different caches.
  • Metrics - Any metrics processors that needs to ingest all of the spans received by the collector should be located before the deduplicator processor inside the pipeline as spans could get dropped by the deduplicator
  • Span links - Although we aggregate traces by their IDs, there is the possibility that a span references another span from a different trace, consider the following example:

image

Two new spans are added, spanID: M with traceID: T3 and spanID: U with traceID: T4.
They both represent a continuation of the trace. opentelemtry-specification allows spans to reference other spans that are part of other traces. In our example spanID M references spanID Z.

According to the RPC spec when a new RPC message is processed a process operation span is generated with a new traceID and must be linked to the send operation span that triggered it.

This imposes a problem as traces are aggregated using their traceID, but in the example above we saw that multiple traceIDs can compose a single “logical flow”. When deduplicating a trace, we might want to deduplicate the entire logical flow and not only a single trace based on it’s traceID

Given the traceIDs provided in the example consider the following arrival order:

T1T2T4T3
In this case T1 is seen for the first time so it is accepted, T2 is dropped as it is a duplicate of T1.
T4 is accepted as it is distinct, T3 is dropped as it is a duplicate of T4.

Eventually, we end up with with T1 and T4 accepted, but the accepted traces represent a “broken flow” as T4 references T2 which was dropped, and T3 was dropped which referenced T1

To solve this problem, each span that is received is checked wether it is linked to another trace, if a span is linked to another trace, the linked trace is marked as bypassed.
When a timer of a trace that is marked as bypassed expires, the deduplicator accepts it regardless of the uniqueID. That way all traces that contains spans that reference other traces are accepted. This can also be solved by recursively calculating uniqueID to all referenced traces.

When horizontally scaling, there is a possibility each trace would arrive to different collector instance for example we might have two collectors C1 and C2

The following might happen:

T1C1 # ACCEPTED

T4C2 # ACCEPTED

T2C1 # DROPPED

T3C2 # DROPPED

Even after the bypassed solution this case would result in T1 and T4 being accepted and both T2 and T3 being dropped.
I have yet to implement a solution for this case but it can be solved, if there is enough interest (It was not needed for my use-case).

Example configuration for the component

groupbytrace: # can be a seperate processor
    deduplication_timeout: 5m
    wait_duration: 10s
    num_traces: 20000

    hash_field:
      - name: "http.url"
        required: false
      - name: "http.target"
        required: false
      - name: "required.tag"
        required: true
  • deduplication_timeout - timeout for uniqueIDs inserted into local cache
  • wait_duration - Time to wait since first span of a trace was accepted until performing the uniqueID function
  • num_traces - The maximum amount of traces allowed to be stored in memory
  • hash_field - list of fields to be extracted from the tags which values are added into the unique-id calculation function. The processor starts with looking at the resource-tags and upon failure to find the keys would fallback for looking at the span-tags
    • name - The key whose value is extracted from the tags
    • required - If set to true, when the processor fails to find this key inside the tags, either inside the resource or the span itself, the trace would be accepted as not all of the required fields are found for the uniqueID function.
      If set to false and the key is not found inside the tags, the processor ignores the value of this key.

Telemetry data types supported

traces

Is this a vendor-specific component?

  • This is a vendor-specific component
  • If this is a vendor-specific component, I am proposing to contribute this as a representative of the vendor.

Sponsor (optional)

No response

Additional context

Have used this collector sampler in a deployment that received around 200 spans per second.
Using a deduplication_timeout: 5m was able to reduce the amount of exported spans by 99.98%.
This obviously depends on the the hash_field configuration and the fields cardinality

@yuvalo1212 yuvalo1212 added the needs triage New item requiring triage label Jan 20, 2023
@yuvalo1212 yuvalo1212 changed the title New Component proposal: DeDuplicator sampler New Component proposal: DeDuplicator processor / sampler Jan 20, 2023
@jpkrohling jpkrohling added Sponsor Needed New component seeking sponsor and removed needs triage New item requiring triage labels Jan 20, 2023
@jpkrohling
Copy link
Member

I love this idea and it would be interesting to see how it performs in the real world and outside of your infra, where it does work nicely.

There are a lot of caveats and users of this processor should be very conscious of the trade-offs. Once traces are sampled out, no statistics on traces are possible anymore (hence your comment about having metrics processors happening before this).

Given how many times I heard about people wanting this, I think we should accept this component.

I'm not ready to be a sponsor at this moment, but I'll gladly be a co-sponsor if you can find a primary sponsor.

@bryan-aguilar
Copy link
Contributor

This seems like a good idea! Have you considered if this could built on top of the tail sampling processor as an additional policy or should it live as it's own processor?

@austinlparker
Copy link
Member

austinlparker commented Jan 20, 2023

I don't see a mention of it, but I'm also kinda confused by the disclaimer around stats -- does this update the sampling probability of the retained spans appropriately? For that matter, does it appropriately calculate updated sampling probabilities (per https://github.com/open-telemetry/oteps/blob/main/text/trace/0170-sampling-probability.md) if it 'de-duplicates' previously sampled spans?

I'd also quibble with the name a bit here, as "de-duplicator" could be a bit confusing ("why is opentelemetry producing duplicate data, shouldn't traces be unique?") without a nuanced understanding of what this is doing.

edit: actually this is kinda worse than I thought since you're de-duplicating just on structure and not on duration, so sampling would get messed up on multiple levels and would completely invalidate any downstream stat computation unless you performed bucketing or got really granular with what counts as a 'dupe'

@asafm

This comment was marked as off-topic.

@yuvalo1212
Copy link
Author

Thanks @bryan-aguilar !

I believe it can be adapted to the tail-sampling processor as an additional policy, with that being said, implementation could get problematic around the FOLLOWS_FROM case where the new policy would need to have access to multiple traces, both on trace expiry and upon span reception. This would require modification beyond adding a new policy to the tail-sampling processor.

@yuvalo1212
Copy link
Author

Thanks for the Feedback @austinlparker !

I totally agree with the statement you made about sampling based on duration, for my use case I had traces exported to a system that disregards the duration but is highly dependent upon the operations, services and the tags inside the trace.
Even further elaborating on your point, I believe that if a bucketing system was to be implemented, not only that it should consider the duration of a trace, but the duration of each span composing the trace. Each span might have a different “weight” on the total trace duration, so even if the total duration of two “duplicate traces” falls under the same bucket, they might be composed of different span “weights”.

Regarding your question about updating sampling probability, in the current implementation, when a new uniqueID is discovered the sampling probability of that specific uniqueID drops to 0% as there is no need to sample it again. It would return back to 100% after a configurable amount of time. This sampling state is managed by the processor itself.

I agree that name might be kind of confusing, should maybe name it something around user-defined-duplicates
what do you think?

@github-actions
Copy link
Contributor

This issue has been inactive for 60 days. It will be closed in 60 days if there is no activity. To ping code owners by adding a component label, see Adding Labels via Comments, or if you are unsure of which component this issue relates to, please ping @open-telemetry/collector-contrib-triagers. If this issue is still relevant, please ping the code owners or leave a comment explaining why it is still relevant. Otherwise, please close it.

@jiekun
Copy link
Member

jiekun commented May 5, 2023

When I looking for a way to achieve similar goal, I found a probabilistic counter named Count–min sketch. It's hard to count the appear time for each unique path (or unique id, in this issue). We need a datastructure that implement like a BloomFilter + Counter. I think Count–min sketch might be a potential solution.

cc @yuvalo1212 and @nicolastakashi

@github-actions github-actions bot removed the Stale label May 26, 2023
@github-actions
Copy link
Contributor

This issue has been inactive for 60 days. It will be closed in 60 days if there is no activity. To ping code owners by adding a component label, see Adding Labels via Comments, or if you are unsure of which component this issue relates to, please ping @open-telemetry/collector-contrib-triagers. If this issue is still relevant, please ping the code owners or leave a comment explaining why it is still relevant. Otherwise, please close it.

@github-actions
Copy link
Contributor

This issue has been closed as inactive because it has been stale for 120 days with no activity.

@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Sep 24, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

6 participants