diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index 1aacceeb97757..5686300babdf4 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -87,7 +87,7 @@ import org.opensearch.tasks.CancellableTask; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskId; -import org.opensearch.telemetry.tracing.Scope; +import org.opensearch.telemetry.tracing.SpanScope; import org.opensearch.telemetry.tracing.TracerFactory; import org.opensearch.telemetry.tracing.listener.TracingActionListener; import org.opensearch.threadpool.ThreadPool; @@ -292,7 +292,7 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener< listener ); } - Scope scope = tracerFactory.getTracer().startSpan("SearchTask_" + task.getId()); + SpanScope scope = tracerFactory.getTracer().startSpan("SearchTask_" + task.getId()); TracingActionListener tracingActionListener = new TracingActionListener(tracerFactory, listener, scope); executeRequest(task, searchRequest, this::searchAsyncAction, tracingActionListener); } diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index 5e7e143784a83..a7931a29b5db7 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -135,7 +135,7 @@ import org.opensearch.search.sort.SortOrder; import org.opensearch.search.suggest.Suggest; import org.opensearch.search.suggest.completion.CompletionSuggestion; -import org.opensearch.telemetry.tracing.Scope; +import org.opensearch.telemetry.tracing.SpanScope; import org.opensearch.telemetry.tracing.TracerFactory; import org.opensearch.threadpool.Scheduler.Cancellable; import org.opensearch.threadpool.ThreadPool; @@ -595,17 +595,18 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchSh Releasable ignored = readerContext.markAsUsed(getKeepAlive(request)); SearchContext context = createContext(readerContext, request, task, true) ) { - final long afterQueryTime; - try ( - SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context); - Scope scope = tracerFactory.getTracer().startSpan("QueryPhase_" + context.shardTarget().getShardId()) - ) { - addtracingAttributes(context); + long afterQueryTime; + final SpanScope spanScope = tracerFactory.getTracer().startSpan("QueryPhase_" + context.shardTarget().getShardId()); + try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context); spanScope) { + addtracingAttributes(spanScope, context); loadOrExecuteQueryPhase(request, context); if (context.queryResult().hasSearchContext() == false && readerContext.singleSession()) { freeReaderContext(readerContext.id()); } afterQueryTime = executor.success(); + } catch (Exception e) { + spanScope.setError(e); + afterQueryTime = 0l; } if (request.numberOfShards() == 1) { return executeFetchPhase(readerContext, context, afterQueryTime); @@ -630,23 +631,23 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchSh } } - private void addtracingAttributes(SearchContext context) { - tracerFactory.getTracer().addSpanAttribute("shard_id", context.shardTarget().getShardId().getId()); - tracerFactory.getTracer().addSpanAttribute("node_id", context.shardTarget().getNodeId()); + private void addtracingAttributes(SpanScope scope, SearchContext context) { + scope.addSpanAttribute("shard_id", context.shardTarget().getShardId().getId()); + scope.addSpanAttribute("node_id", context.shardTarget().getNodeId()); } private QueryFetchSearchResult executeFetchPhase(ReaderContext reader, SearchContext context, long afterQueryTime) { - try ( - SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, afterQueryTime); - Scope scope = tracerFactory.getTracer().startSpan("FetchPhase_" + context.shardTarget().getShardId()) - ) { - addtracingAttributes(context); + final SpanScope spanScope = tracerFactory.getTracer().startSpan("FetchPhase_" + context.shardTarget().getShardId()); + try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, afterQueryTime); spanScope) { + addtracingAttributes(spanScope, context); shortcutDocIdsToLoad(context); fetchPhase.execute(context); if (reader.singleSession()) { freeReaderContext(reader.id()); } executor.success(); + } catch (Exception e) { + spanScope.setError(e); } return new QueryFetchSearchResult(context.queryResult(), context.fetchResult()); } @@ -667,12 +668,13 @@ public void executeQueryPhase( } runAsync(getExecutor(readerContext.indexShard()), () -> { final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null); + final SpanScope spanScope = tracerFactory.getTracer().startSpan("QueryPhase_" + shardSearchRequest.shardId().getId()); try ( SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false); SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext); - Scope scope = tracerFactory.getTracer().startSpan("QueryPhase_" + searchContext.shardTarget().getShardId()) + spanScope ) { - addtracingAttributes(searchContext); + addtracingAttributes(spanScope, searchContext); searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null)); processScroll(request, readerContext, searchContext); queryPhase.execute(searchContext); @@ -681,6 +683,7 @@ public void executeQueryPhase( return new ScrollQuerySearchResult(searchContext.queryResult(), searchContext.shardTarget()); } catch (Exception e) { logger.trace("Query phase failed", e); + spanScope.setError(e); // we handle the failure in the failure listener below throw e; } @@ -693,12 +696,13 @@ public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task, final Releasable markAsUsed = readerContext.markAsUsed(getKeepAlive(shardSearchRequest)); runAsync(getExecutor(readerContext.indexShard()), () -> { readerContext.setAggregatedDfs(request.dfs()); + final SpanScope spanScope = tracerFactory.getTracer().startSpan("QueryPhase_" + shardSearchRequest.shardId().getId()); try ( SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, true); SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext); - Scope scope = tracerFactory.getTracer().startSpan("QueryPhase_" + searchContext.shardTarget().getShardId()) + spanScope ) { - addtracingAttributes(searchContext); + addtracingAttributes(spanScope, searchContext); searchContext.searcher().setAggregatedDfs(request.dfs()); queryPhase.execute(searchContext); if (searchContext.queryResult().hasSearchContext() == false && readerContext.singleSession()) { @@ -714,6 +718,7 @@ public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task, return searchContext.queryResult(); } catch (Exception e) { assert TransportActions.isShardNotAvailableException(e) == false : new AssertionError(e); + spanScope.setError(e); logger.trace("Query phase failed", e); // we handle the failure in the failure listener below throw e; @@ -750,12 +755,13 @@ public void executeFetchPhase( } runAsync(getExecutor(readerContext.indexShard()), () -> { final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null); + final SpanScope spanScope = tracerFactory.getTracer().startSpan("FetchPhase_" + shardSearchRequest.shardId().getId()); try ( SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false); SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext); - Scope scope = tracerFactory.getTracer().startSpan("FetchPhase_" + searchContext.shardTarget().getShardId()) + spanScope ) { - addtracingAttributes(searchContext); + addtracingAttributes(spanScope, searchContext); searchContext.assignRescoreDocIds(readerContext.getRescoreDocIds(null)); searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null)); processScroll(request, readerContext, searchContext); @@ -765,6 +771,7 @@ public void executeFetchPhase( return new ScrollQueryFetchSearchResult(fetchSearchResult, searchContext.shardTarget()); } catch (Exception e) { assert TransportActions.isShardNotAvailableException(e) == false : new AssertionError(e); + spanScope.setError(e); logger.trace("Fetch phase failed", e); // we handle the failure in the failure listener below throw e; @@ -784,16 +791,19 @@ public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, A searchContext.assignRescoreDocIds(readerContext.getRescoreDocIds(request.getRescoreDocIds())); searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(request.getAggregatedDfs())); searchContext.docIdsToLoad(request.docIds(), 0, request.docIdsSize()); + final SpanScope spanScope = tracerFactory.getTracer().startSpan("FetchPhase_" + searchContext.shardTarget().getShardId()); try ( SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext, true, System.nanoTime()); - Scope scope = tracerFactory.getTracer().startSpan("FetchPhase_" + searchContext.shardTarget().getShardId()) + spanScope ) { - addtracingAttributes(searchContext); + addtracingAttributes(spanScope, searchContext); fetchPhase.execute(searchContext); if (readerContext.singleSession()) { freeReaderContext(request.contextId()); } executor.success(); + } catch (Exception e) { + spanScope.setError(e); } return searchContext.fetchResult(); } catch (Exception e) { diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/listener/TracingActionListener.java b/server/src/main/java/org/opensearch/telemetry/tracing/listener/TracingActionListener.java index 6f84e07a2f134..7601298da049c 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/listener/TracingActionListener.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/listener/TracingActionListener.java @@ -8,9 +8,8 @@ package org.opensearch.telemetry.tracing.listener; -import java.util.Locale; import org.opensearch.action.ActionListener; -import org.opensearch.telemetry.tracing.Scope; +import org.opensearch.telemetry.tracing.SpanScope; import org.opensearch.telemetry.tracing.TracerFactory; /** @@ -20,34 +19,32 @@ public class TracingActionListener implements ActionListener { private final ActionListener delegate; - private final Scope scope; + private final SpanScope spanScope; private final TracerFactory tracerFactory; /** * Creates instance. * @param tracerFactory tracer factory * @param delegate action listener to be delegated - * @param scope tracer scope. + * @param spanScope tracer scope. */ - public TracingActionListener(TracerFactory tracerFactory, ActionListener delegate, Scope scope) { + public TracingActionListener(TracerFactory tracerFactory, ActionListener delegate, SpanScope spanScope) { this.tracerFactory = tracerFactory; this.delegate = delegate; - this.scope = scope; + this.spanScope = spanScope; } @Override public void onResponse(Response response) { - try (scope) { + try (spanScope) { delegate.onResponse(response); } } @Override public void onFailure(Exception e) { - try (scope) { - String message = String.format(Locale.ROOT, "Span ended with an operation failure with message [%s]", e.getMessage()); - // TODO: It also make sense to move addSpanEvent and addSpanAttributes to the Scope (may need to change the name.) - tracerFactory.getTracer().addSpanEvent(message); + try (spanScope) { + spanScope.setError(e); delegate.onFailure(e); } }