diff --git a/server/src/main/java/org/opensearch/action/search/CreatePitController.java b/server/src/main/java/org/opensearch/action/search/CreatePitController.java index f64dd3d7efae6..3d7a92ec1ed06 100644 --- a/server/src/main/java/org/opensearch/action/search/CreatePitController.java +++ b/server/src/main/java/org/opensearch/action/search/CreatePitController.java @@ -98,6 +98,11 @@ public void executeCreatePit( task.getParentTaskId(), Collections.emptyMap() ); + /** + * This is needed for cross cluster functionality to work with PITs and current ccsMinimizeRoundTrips is + * not supported for point in time + */ + searchRequest.setCcsMinimizeRoundtrips(false); /** * Phase 1 of create PIT */ @@ -193,6 +198,9 @@ void executeUpdatePitId( ); for (Map.Entry entry : contextId.shards().entrySet()) { DiscoveryNode node = nodelookup.apply(entry.getValue().getClusterAlias(), entry.getValue().getNode()); + if (node == null) { + node = this.clusterService.state().getNodes().get(entry.getValue().getNode()); + } try { final Transport.Connection connection = searchTransportService.getConnection(entry.getValue().getClusterAlias(), node); searchTransportService.updatePitContext( @@ -206,11 +214,12 @@ void executeUpdatePitId( groupedActionListener ); } catch (Exception e) { + String nodeName = node.getName(); logger.error( () -> new ParameterizedMessage( "Create pit update phase failed for PIT ID [{}] on node [{}]", searchResponse.pointInTimeId(), - node + nodeName ), e ); diff --git a/server/src/main/java/org/opensearch/action/search/PitService.java b/server/src/main/java/org/opensearch/action/search/PitService.java index ff068397ad94e..17a00916401fe 100644 --- a/server/src/main/java/org/opensearch/action/search/PitService.java +++ b/server/src/main/java/org/opensearch/action/search/PitService.java @@ -93,7 +93,10 @@ public void deletePitContexts( for (Map.Entry> entry : nodeToContextsMap.entrySet()) { String clusterAlias = entry.getValue().get(0).getSearchContextIdForNode().getClusterAlias(); - final DiscoveryNode node = nodeLookup.apply(clusterAlias, entry.getValue().get(0).getSearchContextIdForNode().getNode()); + DiscoveryNode node = nodeLookup.apply(clusterAlias, entry.getValue().get(0).getSearchContextIdForNode().getNode()); + if (node == null) { + node = this.clusterService.state().getNodes().get(entry.getValue().get(0).getSearchContextIdForNode().getNode()); + } if (node == null) { logger.error( () -> new ParameterizedMessage("node [{}] not found", entry.getValue().get(0).getSearchContextIdForNode().getNode()) @@ -108,7 +111,8 @@ public void deletePitContexts( final Transport.Connection connection = searchTransportService.getConnection(clusterAlias, node); searchTransportService.sendFreePITContexts(connection, entry.getValue(), groupedListener); } catch (Exception e) { - logger.error(() -> new ParameterizedMessage("Delete PITs failed on node [{}]", node.getName()), e); + String nodeName = node.getName(); + logger.error(() -> new ParameterizedMessage("Delete PITs failed on node [{}]", nodeName), e); List deletePitInfos = new ArrayList<>(); for (PitSearchContextIdForNode pitSearchContextIdForNode : entry.getValue()) { deletePitInfos.add(new DeletePitInfo(false, pitSearchContextIdForNode.getPitId()));