Skip to content

Commit

Permalink
Synchronization at instance level as suggested in PR
Browse files Browse the repository at this point in the history
  • Loading branch information
twasyl committed Jun 7, 2021
1 parent d13c9a6 commit 689f233
Showing 1 changed file with 99 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;

/**
* Provides overall insight into the structure of a flow graph... but with limited visibility so we can change implementation.
Expand All @@ -31,56 +30,44 @@ public final class StandardGraphLookupView implements GraphLookupView, GraphList
/** Map a node to its nearest enclosing block */
HashMap<String, String> nearestEnclosingBlock = new HashMap<>();

final ReentrantLock lock = new ReentrantLock();

public void clearCache() {
lock.lock();
try {
blockStartToEnd.clear();
nearestEnclosingBlock.clear();
} finally {
lock.unlock();
}
public synchronized void clearCache() {
blockStartToEnd.clear();
nearestEnclosingBlock.clear();
}

/** Update with a new node added to the flowgraph */
@Override
public void onNewHead(@Nonnull FlowNode newHead) {
lock.lock();
try {
if (newHead instanceof BlockEndNode) {
blockStartToEnd.put(((BlockEndNode) newHead).getStartNode().getId(), newHead.getId());
String overallEnclosing = nearestEnclosingBlock.get(((BlockEndNode) newHead).getStartNode().getId());
if (overallEnclosing != null) {
nearestEnclosingBlock.put(newHead.getId(), overallEnclosing);
}
} else {
if (newHead instanceof BlockStartNode) {
blockStartToEnd.put(newHead.getId(), INCOMPLETE);
}
public synchronized void onNewHead(@Nonnull FlowNode newHead) {
if (newHead instanceof BlockEndNode) {
blockStartToEnd.put(((BlockEndNode) newHead).getStartNode().getId(), newHead.getId());
String overallEnclosing = nearestEnclosingBlock.get(((BlockEndNode) newHead).getStartNode().getId());
if (overallEnclosing != null) {
nearestEnclosingBlock.put(newHead.getId(), overallEnclosing);
}
} else {
if (newHead instanceof BlockStartNode) {
blockStartToEnd.put(newHead.getId(), INCOMPLETE);
}

// Now we try to generate enclosing block info for caching, by looking at parents
// But we don't try to hard -- usually the cache is populated and we defer recursive walks of the graph
List<FlowNode> parents = newHead.getParents();
if (parents.size() > 0) {
FlowNode parent = parents.get(0); // Multiple parents only for end of a parallel, and then both are BlockEndNodes

// Now we try to generate enclosing block info for caching, by looking at parents
// But we don't try to hard -- usually the cache is populated and we defer recursive walks of the graph
List<FlowNode> parents = newHead.getParents();
if (parents.size() > 0) {
FlowNode parent = parents.get(0); // Multiple parents only for end of a parallel, and then both are BlockEndNodes

if (parent instanceof BlockStartNode) {
nearestEnclosingBlock.put(newHead.getId(), parent.getId());
} else {
// Try to reuse info from cache for this node:
// If the parent ended a block, we use info from the start of the block since that is at the same block nesting level as our new head
// Otherwise the parent is a normal atom node, and the head is enclosed by the same block
String lookupId = (parent instanceof BlockEndNode) ? ((BlockEndNode) parent).getStartNode().getId() : parent.getId();
String enclosingId = nearestEnclosingBlock.get(lookupId);
if (enclosingId != null) {
nearestEnclosingBlock.put(newHead.getId(), enclosingId);
}
if (parent instanceof BlockStartNode) {
nearestEnclosingBlock.put(newHead.getId(), parent.getId());
} else {
// Try to reuse info from cache for this node:
// If the parent ended a block, we use info from the start of the block since that is at the same block nesting level as our new head
// Otherwise the parent is a normal atom node, and the head is enclosed by the same block
String lookupId = (parent instanceof BlockEndNode) ? ((BlockEndNode) parent).getStartNode().getId() : parent.getId();
String enclosingId = nearestEnclosingBlock.get(lookupId);
if (enclosingId != null) {
nearestEnclosingBlock.put(newHead.getId(), enclosingId);
}
}
}
} finally {
lock.unlock();
}
}

Expand All @@ -100,131 +87,108 @@ public boolean isActive(@Nonnull FlowNode node) {
}

// Do a brute-force scan for the block end matching the start, caching info along the way for future use
BlockEndNode bruteForceScanForEnd(@Nonnull BlockStartNode start) {
synchronized BlockEndNode bruteForceScanForEnd(@Nonnull BlockStartNode start) {
DepthFirstScanner scan = new DepthFirstScanner();
scan.setup(start.getExecution().getCurrentHeads());
lock.lock();
try {
for (FlowNode f : scan) {
if (f instanceof BlockEndNode) {
BlockEndNode end = (BlockEndNode) f;
BlockStartNode maybeStart = end.getStartNode();
// Cache start in case we need to scan again in the future
blockStartToEnd.put(maybeStart.getId(), end.getId());
if (start.equals(maybeStart)) {
return end;
}
} else if (f instanceof BlockStartNode) {
BlockStartNode maybeThis = (BlockStartNode) f;
for (FlowNode f : scan) {
if (f instanceof BlockEndNode) {
BlockEndNode end = (BlockEndNode) f;
BlockStartNode maybeStart = end.getStartNode();
// Cache start in case we need to scan again in the future
blockStartToEnd.put(maybeStart.getId(), end.getId());
if (start.equals(maybeStart)) {
return end;
}
} else if (f instanceof BlockStartNode) {
BlockStartNode maybeThis = (BlockStartNode) f;

// We're walking from the end to the start and see the start without finding the end first, block is incomplete
blockStartToEnd.putIfAbsent(maybeThis.getId(), INCOMPLETE);
if (start.equals(maybeThis)) { // Early exit, the end can't be encountered before the start
return null;
}
// We're walking from the end to the start and see the start without finding the end first, block is incomplete
blockStartToEnd.putIfAbsent(maybeThis.getId(), INCOMPLETE);
if (start.equals(maybeThis)) { // Early exit, the end can't be encountered before the start
return null;
}
}
} finally {
lock.unlock();
}
return null;
}




/** Do a brute-force scan for the enclosing blocks **/
BlockStartNode bruteForceScanForEnclosingBlock(@Nonnull final FlowNode node) {
synchronized BlockStartNode bruteForceScanForEnclosingBlock(@Nonnull final FlowNode node) {
FlowNode current = node;
lock.lock();
try {
while (!(current instanceof FlowStartNode)) { // Hunt back for enclosing blocks, a potentially expensive operation
if (current instanceof BlockEndNode) {
// Hop over the block to the start
BlockStartNode start = ((BlockEndNode) current).getStartNode();
blockStartToEnd.put(start.getId(), current.getId());
current = start;
continue; // Simplifies cases below we only need to look at the immediately preceding node.
}
while (!(current instanceof FlowStartNode)) { // Hunt back for enclosing blocks, a potentially expensive operation
if (current instanceof BlockEndNode) {
// Hop over the block to the start
BlockStartNode start = ((BlockEndNode) current).getStartNode();
blockStartToEnd.put(start.getId(), current.getId());
current = start;
continue; // Simplifies cases below we only need to look at the immediately preceding node.
}

// Try for a cache hit
if (current != node) {
String enclosingIdFromCache = nearestEnclosingBlock.get(current.getId());
if (enclosingIdFromCache != null) {
try {
return (BlockStartNode) node.getExecution().getNode(enclosingIdFromCache);
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
// Try for a cache hit
if (current != node) {
String enclosingIdFromCache = nearestEnclosingBlock.get(current.getId());
if (enclosingIdFromCache != null) {
try {
return (BlockStartNode) node.getExecution().getNode(enclosingIdFromCache);
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}
}

// Now see if we have a winner among parents
if (current.getParents().isEmpty()) {
return null;
}
FlowNode parent = current.getParents().get(0);
if (parent instanceof BlockStartNode) {
nearestEnclosingBlock.put(current.getId(), parent.getId());
return (BlockStartNode) parent;
}
current = parent;
// Now see if we have a winner among parents
if (current.getParents().isEmpty()) {
return null;
}
FlowNode parent = current.getParents().get(0);
if (parent instanceof BlockStartNode) {
nearestEnclosingBlock.put(current.getId(), parent.getId());
return (BlockStartNode) parent;
}
} finally {
lock.unlock();
current = parent;
}
return null;
}

@CheckForNull
@Override
public BlockEndNode getEndNode(@Nonnull final BlockStartNode startNode) {

lock.lock();
try {
String id = blockStartToEnd.get(startNode.getId());
if (id != null) {
try {
return id == INCOMPLETE ? null : (BlockEndNode) startNode.getExecution().getNode(id);
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
} else {
BlockEndNode node = bruteForceScanForEnd(startNode);
if (node != null) {
blockStartToEnd.put(startNode.getId(), node.getId());
}
return node;
public synchronized BlockEndNode getEndNode(@Nonnull final BlockStartNode startNode) {

String id = blockStartToEnd.get(startNode.getId());
if (id != null) {
try {
return id == INCOMPLETE ? null : (BlockEndNode) startNode.getExecution().getNode(id);
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
} else {
BlockEndNode node = bruteForceScanForEnd(startNode);
if (node != null) {
blockStartToEnd.put(startNode.getId(), node.getId());
}
} finally {
lock.unlock();
return node;
}
}

@CheckForNull
@Override
public BlockStartNode findEnclosingBlockStart(@Nonnull FlowNode node) {
public synchronized BlockStartNode findEnclosingBlockStart(@Nonnull FlowNode node) {
if (node instanceof FlowStartNode || node instanceof FlowEndNode) {
return null;
}
lock.lock();
try {
String id = nearestEnclosingBlock.get(node.getId());
if (id != null) {
try {
return (BlockStartNode) node.getExecution().getNode(id);
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
String id = nearestEnclosingBlock.get(node.getId());
if (id != null) {
try {
return (BlockStartNode) node.getExecution().getNode(id);
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}

BlockStartNode enclosing = bruteForceScanForEnclosingBlock(node);
if (enclosing != null) {
nearestEnclosingBlock.put(node.getId(), enclosing.getId());
return enclosing;
}
} finally {
lock.unlock();
BlockStartNode enclosing = bruteForceScanForEnclosingBlock(node);
if (enclosing != null) {
nearestEnclosingBlock.put(node.getId(), enclosing.getId());
return enclosing;
}
return null;
}
Expand Down

0 comments on commit 689f233

Please sign in to comment.