Skip to content

Commit

Permalink
fixes neo4j-contrib#1547: Add a new after-async or async phase to tri…
Browse files Browse the repository at this point in the history
…ggers (neo4j-contrib#1573) (neo4j-contrib#1735)

* fixes neo4j-contrib#1547: Add a new after-async or async phase to triggers

* Changed parameters to new syntax

* Added "and thread" in afterAsync description

Co-authored-by: Michael Hunger <github@jexp.de>
Co-authored-by: Mark Needham <m.h.needham@gmail.com>

Co-authored-by: Andrea Santurbano <santand@gmail.com>
Co-authored-by: Michael Hunger <github@jexp.de>
Co-authored-by: Mark Needham <m.h.needham@gmail.com>
  • Loading branch information
4 people committed Mar 29, 2021
1 parent 804bce0 commit 29e52df
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 29 deletions.
4 changes: 3 additions & 1 deletion core/src/main/java/apoc/CoreApocGlobalComponents.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ public Map<String,Lifecycle> getServices(GraphDatabaseAPI db, ApocExtensionFacto
return Collections.singletonMap("trigger", new TriggerHandler(db,
dependencies.databaseManagementService(),
dependencies.apocConfig(),
dependencies.log().getUserLog(TriggerHandler.class))
dependencies.log().getUserLog(TriggerHandler.class),
dependencies.globalProceduresRegistry(),
dependencies.pools())
);
}

Expand Down
78 changes: 61 additions & 17 deletions core/src/main/java/apoc/trigger/TriggerHandler.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package apoc.trigger;

import apoc.ApocConfig;
import apoc.Pools;
import apoc.SystemLabels;
import apoc.SystemPropertyKeys;
import apoc.convert.Convert;
Expand All @@ -24,7 +25,6 @@
import org.neo4j.kernel.api.procedure.GlobalProcedures;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
import org.neo4j.procedure.impl.GlobalProceduresRegistry;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -41,23 +41,30 @@

public class TriggerHandler extends LifecycleAdapter implements TransactionEventListener<Void> {

private enum Phase {before, after, rollback, afterAsync}

private final ConcurrentHashMap<String, Map<String,Object>> activeTriggers = new ConcurrentHashMap();
private final Log log;
private final GraphDatabaseService db;
private final DatabaseManagementService databaseManagementService;
private final ApocConfig apocConfig;
private final Pools pools;

private final AtomicBoolean registeredWithKernel = new AtomicBoolean(false);

public static final String NOT_ENABLED_ERROR = "Triggers have not been enabled." +
" Set 'apoc.trigger.enabled=true' in your apoc.conf file located in the $NEO4J_HOME/conf/ directory.";
private final ThrowingFunction<Context, Transaction, ProcedureException> transactionComponentFunction;

public TriggerHandler(GraphDatabaseService db, DatabaseManagementService databaseManagementService,
ApocConfig apocConfig, Log log) {
ApocConfig apocConfig, Log log, GlobalProcedures globalProceduresRegistry,
Pools pools) {
this.db = db;
this.databaseManagementService = databaseManagementService;
this.apocConfig = apocConfig;
this.log = log;
transactionComponentFunction = globalProceduresRegistry.lookupComponentProvider(Transaction.class, true);
this.pools = pools;
}

private boolean isEnabled() {
Expand Down Expand Up @@ -184,23 +191,40 @@ public Map<String,Map<String,Object>> list() {

@Override
public Void beforeCommit(TransactionData txData, Transaction transaction, GraphDatabaseService databaseService) {
executeTriggers(transaction, txData, "before");
if (hasPhase(Phase.before)) {
executeTriggers(transaction, txData, Phase.before);
}
return null;
}

@Override
public void afterCommit(TransactionData txData, Void state, GraphDatabaseService databaseService) {
try (Transaction tx = db.beginTx()) {
executeTriggers(tx, txData, "after");
tx.commit();
if (hasPhase(Phase.after)) {
try (Transaction tx = db.beginTx()) {
executeTriggers(tx, txData, Phase.after);
tx.commit();
}
}
afterAsync(txData);
}

private void afterAsync(TransactionData txData) {
if (hasPhase(Phase.afterAsync)) {
Map<String, Object> params = txDataParams(txData, Phase.afterAsync);
Util.inTxFuture(pools.getDefaultExecutorService(), db, (inner) -> {
executeTriggers(inner, params, Phase.afterAsync);
return null;
});
}
}

@Override
public void afterRollback(TransactionData txData, Void state, GraphDatabaseService databaseService) {
try (Transaction tx = db.beginTx()) {
executeTriggers(tx, txData, "rollback");
tx.commit();
if (hasPhase(Phase.rollback)) {
try (Transaction tx = db.beginTx()) {
executeTriggers(tx, txData, Phase.rollback);
tx.commit();
}
}
}

Expand Down Expand Up @@ -235,9 +259,20 @@ private Map<String, List<Node>> aggregateLabels(Iterable<LabelEntry> labelEntrie
return result;
}

private Map<String, Object> txDataParams(TransactionData txData, String phase) {
return map("transactionId", phase.equals("after") ? txData.getTransactionId() : -1,
"commitTime", phase.equals("after") ? txData.getCommitTime() : -1,
private Map<String, Object> txDataParams(TransactionData txData, Phase phase) {
final long txId, commitTime;
switch (phase) {
case after:
case afterAsync:
txId = txData.getTransactionId();
commitTime = txData.getCommitTime();
break;
default:
txId = -1;
commitTime = -1;
}
return map("transactionId", txId,
"commitTime", commitTime,
"createdNodes", Convert.convertToList(txData.createdNodes()),
"createdRelationships", Convert.convertToList(txData.createdRelationships()),
"deletedNodes", Convert.convertToList(txData.deletedNodes()),
Expand All @@ -252,9 +287,18 @@ private Map<String, Object> txDataParams(TransactionData txData, String phase) {
);
}

private void executeTriggers(Transaction tx, TransactionData txData, String phase) {
private boolean hasPhase(Phase phase) {
return activeTriggers.values().stream()
.map(data -> (Map<String, Object>) data.get("selector"))
.anyMatch(selector -> when(selector, phase));
}

private void executeTriggers(Transaction tx, TransactionData txData, Phase phase) {
executeTriggers(tx, txDataParams(txData, phase), phase);
}

private void executeTriggers(Transaction tx, Map<String, Object> params, Phase phase) {
Map<String,String> exceptions = new LinkedHashMap<>();
Map<String, Object> params = txDataParams(txData, phase);
activeTriggers.forEach((name, data) -> {
if (data.get("params") != null) {
params.putAll((Map<String, Object>) data.get("params"));
Expand All @@ -277,9 +321,9 @@ private void executeTriggers(Transaction tx, TransactionData txData, String phas
}
}

private boolean when(Map<String, Object> selector, String phase) {
if (selector == null) return (phase.equals("before"));
return selector.getOrDefault("phase", "before").equals(phase);
private boolean when(Map<String, Object> selector, Phase phase) {
if (selector == null) return phase == Phase.before;
return Phase.valueOf(selector.getOrDefault("phase", "before").toString()) == phase;
}

@Override
Expand Down
77 changes: 67 additions & 10 deletions core/src/test/java/apoc/trigger/TriggerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,21 @@
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.Node;

import org.neo4j.graphdb.QueryExecutionException;
import org.neo4j.graphdb.Transaction;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.impl.coreapi.TransactionImpl;

import org.neo4j.test.rule.DbmsRule;
import org.neo4j.test.rule.ImpermanentDbmsRule;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

import static apoc.ApocSettings.apoc_trigger_enabled;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -51,6 +56,7 @@ public void testListTriggers() throws Exception {
assertEquals(true, row.get("installed"));
});
}

@Test
public void testRemoveNode() throws Exception {
db.executeTransactionally("CREATE (:Counter {count:0})");
Expand Down Expand Up @@ -94,6 +100,7 @@ public void testRemoveTrigger() throws Exception {
assertEquals(false, row.get("installed"));
});
}

@Test
public void testRemoveAllTrigger() throws Exception {
TestUtil.testCallCount(db, "CALL apoc.trigger.removeAll()", 0);
Expand All @@ -120,20 +127,54 @@ public void testTimeStampTrigger() throws Exception {
db.executeTransactionally("CALL apoc.trigger.add('timestamp','UNWIND $createdNodes AS n SET n.ts = timestamp()',{})");
db.executeTransactionally("CREATE (f:Foo)");
TestUtil.testCall(db, "MATCH (f:Foo) RETURN f", (row) -> {
assertEquals(true, ((Node)row.get("f")).hasProperty("ts"));
assertEquals(true, ((Node) row.get("f")).hasProperty("ts"));
});
}

@Test
public void testTimeStampTriggerForUpdatedProperties() throws Exception {
db.executeTransactionally("CALL apoc.trigger.add('timestamp','UNWIND apoc.trigger.nodesByLabel($assignedNodeProperties,null) AS n SET n.ts = timestamp()',{})");
db.executeTransactionally("CREATE (f:Foo) SET f.foo='bar'");
TestUtil.testCall(db, "MATCH (f:Foo) RETURN f", (row) -> {
assertEquals(true, ((Node) row.get("f")).hasProperty("ts"));
});
}

@Test
public void testLowerCaseName() throws Exception {
db.executeTransactionally("create constraint on (p:Person) assert p.id is unique");
db.executeTransactionally("CALL apoc.trigger.add('lowercase','UNWIND apoc.trigger.nodesByLabel($assignedLabels,\"Person\") AS n SET n.id = toLower(n.name)',{})");
db.executeTransactionally("CREATE (f:Person {name:'John Doe'})");
TestUtil.testCall(db, "MATCH (f:Person) RETURN f", (row) -> {
assertEquals("john doe", ((Node) row.get("f")).getProperty("id"));
assertEquals("John Doe", ((Node) row.get("f")).getProperty("name"));
});
}

@Test
public void testSetLabels() throws Exception {
db.executeTransactionally("CREATE (f {name:'John Doe'})");
db.executeTransactionally("CALL apoc.trigger.add('setlabels','UNWIND apoc.trigger.nodesByLabel($assignedLabels,\"Person\") AS n SET n:Man',{})");
db.executeTransactionally("MATCH (f) SET f:Person");
TestUtil.testCall(db, "MATCH (f:Man) RETURN f", (row) -> {
assertEquals("John Doe", ((Node) row.get("f")).getProperty("name"));
assertEquals(true, ((Node) row.get("f")).hasLabel(Label.label("Person")));
});

long count = TestUtil.singleResultFirstColumn(db, "MATCH (f:Man) RETURN count(*) as c");
assertEquals(1L, count);
}

@Test
public void testTxId() throws Exception {
db.executeTransactionally("CALL apoc.trigger.add('txinfo','UNWIND $createdNodes AS n SET n.txId = $transactionId, n.txTime = $commitTime',{phase:'after'})");
db.executeTransactionally("CREATE (f:Bar)");
TestUtil.testCall(db, "MATCH (f:Bar) RETURN f", (row) -> {
assertEquals(true, (Long)((Node)row.get("f")).getProperty("txId") > -1L);
assertEquals(true, (Long)((Node)row.get("f")).getProperty("txTime") > start);
assertEquals(true, (Long) ((Node) row.get("f")).getProperty("txId") > -1L);
assertEquals(true, (Long) ((Node) row.get("f")).getProperty("txTime") > start);
});
}

@Test
public void testMetaDataBefore() {
testMetaData("before");
Expand Down Expand Up @@ -195,9 +236,9 @@ public void testTriggerPause() throws Exception {
db.executeTransactionally("CALL apoc.trigger.pause('test')");
db.executeTransactionally("CREATE (f:Foo {name:'Michael'})");
TestUtil.testCall(db, "MATCH (f:Foo) RETURN f", (row) -> {
assertEquals(false, ((Node)row.get("f")).hasProperty("txId"));
assertEquals(false, ((Node)row.get("f")).hasProperty("txTime"));
assertEquals(true, ((Node)row.get("f")).hasProperty("name"));
assertEquals(false, ((Node) row.get("f")).hasProperty("txId"));
assertEquals(false, ((Node) row.get("f")).hasProperty("txTime"));
assertEquals(true, ((Node) row.get("f")).hasProperty("name"));
});
}

Expand All @@ -208,15 +249,31 @@ public void testTriggerResume() throws Exception {
db.executeTransactionally("CALL apoc.trigger.resume('test')");
db.executeTransactionally("CREATE (f:Foo {name:'Michael'})");
TestUtil.testCall(db, "MATCH (f:Foo) RETURN f", (row) -> {
assertEquals(true, ((Node)row.get("f")).hasProperty("txId"));
assertEquals(true, ((Node)row.get("f")).hasProperty("txTime"));
assertEquals(true, ((Node)row.get("f")).hasProperty("name"));
assertEquals(true, ((Node) row.get("f")).hasProperty("txId"));
assertEquals(true, ((Node) row.get("f")).hasProperty("txTime"));
assertEquals(true, ((Node) row.get("f")).hasProperty("name"));
});
}


@Test
public void testTxIdAfterAsync() throws Exception {
db.executeTransactionally("CALL apoc.trigger.add('triggerTest','UNWIND apoc.trigger.propertiesByKey($assignedNodeProperties, \"_executed\") as prop " +
" WITH prop.node as n " +
" CREATE (z:SON {father:id(n)}) " +
" CREATE (n)-[:GENERATED]->(z)', " +
"{phase:'afterAsync'})");
db.executeTransactionally("CREATE (:TEST {name:'x', _executed:0})");
db.executeTransactionally("CREATE (:TEST {name:'y', _executed:0})");
org.neo4j.test.assertion.Assert.assertEventually((Callable<Long>) () -> db.executeTransactionally("MATCH p = ()-[r:GENERATED]->() RETURN count(p) AS count",
Collections.emptyMap(), (r) -> r.<Long>columnAs("count").next()),
value -> value.equals(2L), 30L, TimeUnit.SECONDS);
}

@Test(expected = QueryExecutionException.class)
public void showThrowAnException() throws Exception {
db.executeTransactionally("CALL apoc.trigger.add('test','UNWIND $createdNodes AS n SET n.txId = , n.txTime = $commitTime',{})");
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ CALL apoc.trigger.add('lowercase-by-label','UNWIND apoc.trigger.nodesByLabel($as

.Trigger Phase Table
.Helper Functions
[cols="5m,5"]
[cols="1m,5"]
|===
| Phase | Description
| before | The trigger will be activate right `before` the commit. If no phase is specified, it's the default.
Expand Down

0 comments on commit 29e52df

Please sign in to comment.