Skip to content

Commit

Permalink
Fixes for cross cluster
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <bharath78910@gmail.com>
  • Loading branch information
bharath-techie committed Sep 5, 2022
1 parent 2f65bf6 commit c0b7898
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ public void executeCreatePit(
task.getParentTaskId(),
Collections.emptyMap()
);
/**
* This is needed for cross cluster functionality to work with PITs and current ccsMinimizeRoundTrips is
* not supported for point in time
*/
searchRequest.setCcsMinimizeRoundtrips(false);
/**
* Phase 1 of create PIT
*/
Expand Down Expand Up @@ -193,6 +198,9 @@ void executeUpdatePitId(
);
for (Map.Entry<ShardId, SearchContextIdForNode> entry : contextId.shards().entrySet()) {
DiscoveryNode node = nodelookup.apply(entry.getValue().getClusterAlias(), entry.getValue().getNode());
if (node == null) {
node = this.clusterService.state().getNodes().get(entry.getValue().getNode());
}
try {
final Transport.Connection connection = searchTransportService.getConnection(entry.getValue().getClusterAlias(), node);
searchTransportService.updatePitContext(
Expand All @@ -206,11 +214,12 @@ void executeUpdatePitId(
groupedActionListener
);
} catch (Exception e) {
String nodeName = node.getName();
logger.error(
() -> new ParameterizedMessage(
"Create pit update phase failed for PIT ID [{}] on node [{}]",
searchResponse.pointInTimeId(),
node
nodeName
),
e
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ public void deletePitContexts(

for (Map.Entry<String, List<PitSearchContextIdForNode>> entry : nodeToContextsMap.entrySet()) {
String clusterAlias = entry.getValue().get(0).getSearchContextIdForNode().getClusterAlias();
final DiscoveryNode node = nodeLookup.apply(clusterAlias, entry.getValue().get(0).getSearchContextIdForNode().getNode());
DiscoveryNode node = nodeLookup.apply(clusterAlias, entry.getValue().get(0).getSearchContextIdForNode().getNode());
if (node == null) {
node = this.clusterService.state().getNodes().get(entry.getValue().get(0).getSearchContextIdForNode().getNode());
}
if (node == null) {
logger.error(
() -> new ParameterizedMessage("node [{}] not found", entry.getValue().get(0).getSearchContextIdForNode().getNode())
Expand All @@ -108,7 +111,8 @@ public void deletePitContexts(
final Transport.Connection connection = searchTransportService.getConnection(clusterAlias, node);
searchTransportService.sendFreePITContexts(connection, entry.getValue(), groupedListener);
} catch (Exception e) {
logger.error(() -> new ParameterizedMessage("Delete PITs failed on node [{}]", node.getName()), e);
String nodeName = node.getName();
logger.error(() -> new ParameterizedMessage("Delete PITs failed on node [{}]", nodeName), e);
List<DeletePitInfo> deletePitInfos = new ArrayList<>();
for (PitSearchContextIdForNode pitSearchContextIdForNode : entry.getValue()) {
deletePitInfos.add(new DeletePitInfo(false, pitSearchContextIdForNode.getPitId()));
Expand Down

0 comments on commit c0b7898

Please sign in to comment.