Skip to content

Commit

Permalink
Adds instrumentation for search path
Browse files Browse the repository at this point in the history
Signed-off-by: Gagan Juneja <gjjuneja@amazon.com>
  • Loading branch information
Gagan Juneja committed Jul 5, 2023
1 parent ee33785 commit 1444655
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskId;
import org.opensearch.telemetry.tracing.Scope;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.TracerFactory;
import org.opensearch.telemetry.tracing.listener.TracingActionListener;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -292,7 +292,7 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<
listener
);
}
Scope scope = tracerFactory.getTracer().startSpan("SearchTask_" + task.getId());
SpanScope scope = tracerFactory.getTracer().startSpan("SearchTask_" + task.getId());
TracingActionListener tracingActionListener = new TracingActionListener(tracerFactory, listener, scope);
executeRequest(task, searchRequest, this::searchAsyncAction, tracingActionListener);
}
Expand Down
56 changes: 33 additions & 23 deletions server/src/main/java/org/opensearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@
import org.opensearch.search.sort.SortOrder;
import org.opensearch.search.suggest.Suggest;
import org.opensearch.search.suggest.completion.CompletionSuggestion;
import org.opensearch.telemetry.tracing.Scope;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.TracerFactory;
import org.opensearch.threadpool.Scheduler.Cancellable;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -595,17 +595,18 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchSh
Releasable ignored = readerContext.markAsUsed(getKeepAlive(request));
SearchContext context = createContext(readerContext, request, task, true)
) {
final long afterQueryTime;
try (
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context);
Scope scope = tracerFactory.getTracer().startSpan("QueryPhase_" + context.shardTarget().getShardId())
) {
addtracingAttributes(context);
long afterQueryTime;
final SpanScope spanScope = tracerFactory.getTracer().startSpan("QueryPhase_" + context.shardTarget().getShardId());
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context); spanScope) {
addtracingAttributes(spanScope, context);
loadOrExecuteQueryPhase(request, context);
if (context.queryResult().hasSearchContext() == false && readerContext.singleSession()) {
freeReaderContext(readerContext.id());
}
afterQueryTime = executor.success();
} catch (Exception e) {
spanScope.setError(e);
afterQueryTime = 0l;
}
if (request.numberOfShards() == 1) {
return executeFetchPhase(readerContext, context, afterQueryTime);
Expand All @@ -630,23 +631,23 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchSh
}
}

private void addtracingAttributes(SearchContext context) {
tracerFactory.getTracer().addSpanAttribute("shard_id", context.shardTarget().getShardId().getId());
tracerFactory.getTracer().addSpanAttribute("node_id", context.shardTarget().getNodeId());
private void addtracingAttributes(SpanScope scope, SearchContext context) {
scope.addSpanAttribute("shard_id", context.shardTarget().getShardId().getId());
scope.addSpanAttribute("node_id", context.shardTarget().getNodeId());
}

private QueryFetchSearchResult executeFetchPhase(ReaderContext reader, SearchContext context, long afterQueryTime) {
try (
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, afterQueryTime);
Scope scope = tracerFactory.getTracer().startSpan("FetchPhase_" + context.shardTarget().getShardId())
) {
addtracingAttributes(context);
final SpanScope spanScope = tracerFactory.getTracer().startSpan("FetchPhase_" + context.shardTarget().getShardId());
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, afterQueryTime); spanScope) {
addtracingAttributes(spanScope, context);
shortcutDocIdsToLoad(context);
fetchPhase.execute(context);
if (reader.singleSession()) {
freeReaderContext(reader.id());
}
executor.success();
} catch (Exception e) {
spanScope.setError(e);
}
return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
}
Expand All @@ -667,12 +668,13 @@ public void executeQueryPhase(
}
runAsync(getExecutor(readerContext.indexShard()), () -> {
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null);
final SpanScope spanScope = tracerFactory.getTracer().startSpan("QueryPhase_" + shardSearchRequest.shardId().getId());
try (
SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false);
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext);
Scope scope = tracerFactory.getTracer().startSpan("QueryPhase_" + searchContext.shardTarget().getShardId())
spanScope
) {
addtracingAttributes(searchContext);
addtracingAttributes(spanScope, searchContext);
searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null));
processScroll(request, readerContext, searchContext);
queryPhase.execute(searchContext);
Expand All @@ -681,6 +683,7 @@ public void executeQueryPhase(
return new ScrollQuerySearchResult(searchContext.queryResult(), searchContext.shardTarget());
} catch (Exception e) {
logger.trace("Query phase failed", e);
spanScope.setError(e);
// we handle the failure in the failure listener below
throw e;
}
Expand All @@ -693,12 +696,13 @@ public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task,
final Releasable markAsUsed = readerContext.markAsUsed(getKeepAlive(shardSearchRequest));
runAsync(getExecutor(readerContext.indexShard()), () -> {
readerContext.setAggregatedDfs(request.dfs());
final SpanScope spanScope = tracerFactory.getTracer().startSpan("QueryPhase_" + shardSearchRequest.shardId().getId());
try (
SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, true);
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext);
Scope scope = tracerFactory.getTracer().startSpan("QueryPhase_" + searchContext.shardTarget().getShardId())
spanScope
) {
addtracingAttributes(searchContext);
addtracingAttributes(spanScope, searchContext);
searchContext.searcher().setAggregatedDfs(request.dfs());
queryPhase.execute(searchContext);
if (searchContext.queryResult().hasSearchContext() == false && readerContext.singleSession()) {
Expand All @@ -714,6 +718,7 @@ public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task,
return searchContext.queryResult();
} catch (Exception e) {
assert TransportActions.isShardNotAvailableException(e) == false : new AssertionError(e);
spanScope.setError(e);
logger.trace("Query phase failed", e);
// we handle the failure in the failure listener below
throw e;
Expand Down Expand Up @@ -750,12 +755,13 @@ public void executeFetchPhase(
}
runAsync(getExecutor(readerContext.indexShard()), () -> {
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null);
final SpanScope spanScope = tracerFactory.getTracer().startSpan("FetchPhase_" + shardSearchRequest.shardId().getId());
try (
SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false);
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext);
Scope scope = tracerFactory.getTracer().startSpan("FetchPhase_" + searchContext.shardTarget().getShardId())
spanScope
) {
addtracingAttributes(searchContext);
addtracingAttributes(spanScope, searchContext);
searchContext.assignRescoreDocIds(readerContext.getRescoreDocIds(null));
searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null));
processScroll(request, readerContext, searchContext);
Expand All @@ -765,6 +771,7 @@ public void executeFetchPhase(
return new ScrollQueryFetchSearchResult(fetchSearchResult, searchContext.shardTarget());
} catch (Exception e) {
assert TransportActions.isShardNotAvailableException(e) == false : new AssertionError(e);
spanScope.setError(e);
logger.trace("Fetch phase failed", e);
// we handle the failure in the failure listener below
throw e;
Expand All @@ -784,16 +791,19 @@ public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, A
searchContext.assignRescoreDocIds(readerContext.getRescoreDocIds(request.getRescoreDocIds()));
searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(request.getAggregatedDfs()));
searchContext.docIdsToLoad(request.docIds(), 0, request.docIdsSize());
final SpanScope spanScope = tracerFactory.getTracer().startSpan("FetchPhase_" + searchContext.shardTarget().getShardId());
try (
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext, true, System.nanoTime());
Scope scope = tracerFactory.getTracer().startSpan("FetchPhase_" + searchContext.shardTarget().getShardId())
spanScope
) {
addtracingAttributes(searchContext);
addtracingAttributes(spanScope, searchContext);
fetchPhase.execute(searchContext);
if (readerContext.singleSession()) {
freeReaderContext(request.contextId());
}
executor.success();
} catch (Exception e) {
spanScope.setError(e);
}
return searchContext.fetchResult();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@

package org.opensearch.telemetry.tracing.listener;

import java.util.Locale;
import org.opensearch.action.ActionListener;
import org.opensearch.telemetry.tracing.Scope;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.TracerFactory;

/**
Expand All @@ -20,34 +19,32 @@
public class TracingActionListener<Response> implements ActionListener<Response> {

private final ActionListener<Response> delegate;
private final Scope scope;
private final SpanScope spanScope;
private final TracerFactory tracerFactory;

/**
* Creates instance.
* @param tracerFactory tracer factory
* @param delegate action listener to be delegated
* @param scope tracer scope.
* @param spanScope tracer scope.
*/
public TracingActionListener(TracerFactory tracerFactory, ActionListener<Response> delegate, Scope scope) {
public TracingActionListener(TracerFactory tracerFactory, ActionListener<Response> delegate, SpanScope spanScope) {
this.tracerFactory = tracerFactory;
this.delegate = delegate;
this.scope = scope;
this.spanScope = spanScope;
}

@Override
public void onResponse(Response response) {
try (scope) {
try (spanScope) {
delegate.onResponse(response);
}
}

@Override
public void onFailure(Exception e) {
try (scope) {
String message = String.format(Locale.ROOT, "Span ended with an operation failure with message [%s]", e.getMessage());
// TODO: It also make sense to move addSpanEvent and addSpanAttributes to the Scope (may need to change the name.)
tracerFactory.getTracer().addSpanEvent(message);
try (spanScope) {
spanScope.setError(e);
delegate.onFailure(e);
}
}
Expand Down

0 comments on commit 1444655

Please sign in to comment.