Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: edges cache is not cleared when clearing backend #488

Merged
merged 3 commits into from
May 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion hugegraph-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<dependency>
<groupId>com.baidu.hugegraph</groupId>
<artifactId>hugegraph-common</artifactId>
<version>1.6.0</version>
<version>1.6.2</version>
</dependency>

<!-- tinkerpop -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public class HugeGraph implements GremlinGraph {
private final HugeConfig configuration;

private final EventHub schemaEventHub;
private final EventHub graphEventHub;
private final EventHub indexEventHub;
private final RateLimiter rateLimiter;
private final TaskManager taskManager;
Expand All @@ -119,6 +120,7 @@ public HugeGraph(HugeConfig configuration) {
this.configuration = configuration;

this.schemaEventHub = new EventHub("schema");
this.graphEventHub = new EventHub("graph");
this.indexEventHub = new EventHub("index");

final int limit = configuration.get(CoreOptions.RATE_LIMIT);
Expand All @@ -137,6 +139,7 @@ public HugeGraph(HugeConfig configuration) {
try {
this.storeProvider = this.loadStoreProvider();
} catch (BackendException e) {
LockUtil.destroy(this.name);
String message = "Failed to init backend store";
LOG.error("{}: {}", message, e.getMessage());
throw new HugeException(message);
Expand Down Expand Up @@ -182,6 +185,10 @@ public EventHub schemaEventHub() {
return this.schemaEventHub;
}

public EventHub graphEventHub() {
return this.graphEventHub;
}

public EventHub indexEventHub() {
return this.indexEventHub;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;

import com.baidu.hugegraph.HugeGraph;
import com.baidu.hugegraph.backend.cache.CachedBackendStore.QueryId;
Expand All @@ -34,11 +35,15 @@
import com.baidu.hugegraph.backend.tx.GraphTransaction;
import com.baidu.hugegraph.config.CoreOptions;
import com.baidu.hugegraph.config.HugeConfig;
import com.baidu.hugegraph.event.EventHub;
import com.baidu.hugegraph.event.EventListener;
import com.baidu.hugegraph.schema.IndexLabel;
import com.baidu.hugegraph.structure.HugeEdge;
import com.baidu.hugegraph.structure.HugeVertex;
import com.baidu.hugegraph.type.HugeType;
import com.baidu.hugegraph.util.Events;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;

public final class CachedGraphTransaction extends GraphTransaction {

Expand All @@ -47,6 +52,9 @@ public final class CachedGraphTransaction extends GraphTransaction {
private final Cache verticesCache;
private final Cache edgesCache;

private EventListener storeEventListener;
private EventListener cacheEventListener;

public CachedGraphTransaction(HugeGraph graph, BackendStore store) {
super(graph, store);

Expand All @@ -59,6 +67,17 @@ public CachedGraphTransaction(HugeGraph graph, BackendStore store) {
capacity = conf.get(CoreOptions.EDGE_CACHE_CAPACITY);
expire = conf.get(CoreOptions.EDGE_CACHE_EXPIRE);
this.edgesCache = this.cache("edge", capacity, expire);

this.listenChanges();
}

@Override
public void close() {
try {
super.close();
} finally {
this.unlistenChanges();
}
}

private Cache cache(String prefix, int capacity, long expire) {
Expand All @@ -68,6 +87,61 @@ private Cache cache(String prefix, int capacity, long expire) {
return cache;
}

private void listenChanges() {
// Listen store event: "store.init", "store.clear", ...
Set<String> storeEvents = ImmutableSet.of(Events.STORE_INIT,
Events.STORE_CLEAR,
Events.STORE_TRUNCATE);
this.storeEventListener = event -> {
if (storeEvents.contains(event.name())) {
LOG.debug("Graph {} clear graph cache on event '{}'",
this.graph(), event.name());
this.verticesCache.clear();
this.edgesCache.clear();
return true;
}
return false;
};
this.store().provider().listen(this.storeEventListener);

// Listen cache event: "cache"(invalid cache item)
this.cacheEventListener = event -> {
LOG.debug("Graph {} received graph cache event: {}",
this.graph(), event);
event.checkArgs(String.class, Id.class);
Object[] args = event.args();
if (args[0].equals("invalid")) {
Id id = (Id) args[1];
if (this.verticesCache.get(id) != null) {
// Invalidate vertex cache
this.verticesCache.invalidate(id);
} else if (this.edgesCache.get(id) != null) {
// Invalidate edge cache
this.edgesCache.invalidate(id);
}
return true;
} else if (args[0].equals("clear")) {
this.verticesCache.clear();
this.edgesCache.clear();
return true;
}
return false;
};
EventHub schemaEventHub = this.graph().graphEventHub();
if (!schemaEventHub.containsListener(Events.CACHE)) {
schemaEventHub.listen(Events.CACHE, this.cacheEventListener);
}
}

private void unlistenChanges() {
// Unlisten store event
this.store().provider().unlisten(this.storeEventListener);

// Unlisten cache event
EventHub graphEventHub = this.graph().graphEventHub();
graphEventHub.unlisten(Events.CACHE, this.cacheEventListener);
}

@Override
protected Iterator<HugeVertex> queryVerticesFromBackend(Query query) {
if (!query.ids().isEmpty() && query.conditions().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private void listenChanges() {
Events.STORE_TRUNCATE);
this.storeEventListener = event -> {
if (storeEvents.contains(event.name())) {
LOG.debug("Graph {} clear cache on event '{}'",
LOG.debug("Graph {} clear schema cache on event '{}'",
this.graph(), event.name());
this.idCache.clear();
this.nameCache.clear();
Expand All @@ -98,7 +98,7 @@ private void listenChanges() {

// Listen cache event: "cache"(invalid cache item)
this.cacheEventListener = event -> {
LOG.debug("Graph {} received cache event: {}",
LOG.debug("Graph {} received schema cache event: {}",
this.graph(), event);
event.checkArgs(String.class, Id.class);
Object[] args = event.args();
Expand All @@ -116,6 +116,11 @@ private void listenChanges() {
this.nameCache.invalidate(prefixedName);
}
return true;
} else if (args[0].equals("clear")) {
this.idCache.clear();
this.nameCache.clear();
this.cachedTypes.clear();
return true;
}
return false;
};
Expand Down Expand Up @@ -238,7 +243,7 @@ protected <T extends SchemaElement> List<T> getAllSchema(HugeType type) {
} else {
List<T> results = super.getAllSchema(type);
long free = this.idCache.capacity() - this.idCache.size();
if (results.size() < free) {
if (results.size() <= free) {
// Update cache
for (T schema : results) {
Id prefixedId = generateId(schema.type(), schema.id());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ private final void write(Id id, Object value) {

@Watched(prefix = "ramcache")
private final void remove(Id id) {
if (id == null) {
return;
}
assert id != null;

final Lock lock = this.keyLock.lock(id);
Expand All @@ -193,6 +196,9 @@ private final void remove(Id id) {
@Watched(prefix = "ramcache")
@Override
public Object get(Id id) {
if (id == null) {
return null;
}
Object value = null;
if (this.map.size() <= this.halfCapacity || this.map.containsKey(id)) {
// Maybe the id removed by other threads and returned null value
Expand All @@ -218,6 +224,9 @@ public Object get(Id id) {
@Watched(prefix = "ramcache")
@Override
public Object getOrFetch(Id id, Function<Id, Object> fetcher) {
if (id == null) {
return null;
}
Object value = null;
if (this.map.size() <= this.halfCapacity || this.map.containsKey(id)) {
// Maybe the id removed by other threads and returned null value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ public void removeEdge(HugeEdge edge) {
}

public Iterator<Edge> queryEdgesByVertex(Id id) {
return queryEdges(constructEdgesQuery(id, Directions.BOTH));
return this.queryEdges(constructEdgesQuery(id, Directions.BOTH));
}

public Iterator<Edge> queryEdges(Object... edgeIds) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

package com.baidu.hugegraph.testutil;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Date;
import java.util.List;

Expand All @@ -33,7 +31,6 @@
import com.baidu.hugegraph.testutil.FakeObjects.FakeEdge;
import com.baidu.hugegraph.testutil.FakeObjects.FakeVertex;
import com.baidu.hugegraph.util.DateUtil;
import com.baidu.hugegraph.util.E;

public class Utils {

Expand Down Expand Up @@ -83,29 +80,4 @@ public static boolean contains(List<Edge> edges, FakeEdge fakeEdge) {
public static Date date(String rawDate) {
return DateUtil.parse(rawDate);
}

public static <T> T invokeStatic(Class<?> clazz, String methodName,
Object... args) {
Class<?>[] classes = new Class<?>[args.length];
int i = 0;
for (Object arg : args) {
E.checkArgument(arg != null, "The argument can't be null");
classes[i++] = arg.getClass();
}
try {
Method method = clazz.getDeclaredMethod(methodName, classes);
method.setAccessible(true);
@SuppressWarnings("unchecked")
T result = (T) method.invoke(null, args);
return result;
} catch (NoSuchMethodException e) {
throw new RuntimeException(String.format(
"Can't find method '%s' of class '%s'",
methodName, clazz), e);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(String.format(
"Can't invoke method '%s' of class '%s'",
methodName, clazz), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@
* under the License.
*/

package com.baidu.hugegraph.unit.core;
package com.baidu.hugegraph.unit;

import java.util.Collections;

import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.mockito.Mockito;

import com.baidu.hugegraph.HugeGraph;
import com.baidu.hugegraph.backend.id.Id;
import com.baidu.hugegraph.config.HugeConfig;
import com.baidu.hugegraph.schema.EdgeLabel;
import com.baidu.hugegraph.schema.IndexLabel;
import com.baidu.hugegraph.schema.PropertyKey;
Expand All @@ -34,12 +39,24 @@
import com.baidu.hugegraph.type.define.IdStrategy;
import com.baidu.hugegraph.type.define.IndexType;

public final class FakeObject {
public final class FakeObjects {

private final HugeGraph graph;

public FakeObject() {
public FakeObjects() {
this.graph = Mockito.mock(HugeGraph.class);
Mockito.doReturn(newConfig()).when(this.graph).configuration();
}

public FakeObjects(String name) {
this();
Mockito.doReturn(name).when(this.graph).name();
}

public static HugeConfig newConfig() {
Configuration conf = Mockito.mock(PropertiesConfiguration.class);
Mockito.when(conf.getKeys()).thenReturn(Collections.emptyIterator());
return new HugeConfig(conf);
}

public HugeGraph graph() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.junit.runners.Suite;

import com.baidu.hugegraph.unit.cache.CacheManagerTest;
import com.baidu.hugegraph.unit.cache.CachedGraphTransactionTest;
import com.baidu.hugegraph.unit.cache.CachedSchemaTransactionTest;
import com.baidu.hugegraph.unit.cache.RamCacheTest;
import com.baidu.hugegraph.unit.core.AnalyzerTest;
import com.baidu.hugegraph.unit.core.BackendMutationTest;
Expand All @@ -37,6 +39,8 @@
@RunWith(Suite.class)
@Suite.SuiteClasses({
RamCacheTest.class,
CachedSchemaTransactionTest.class,
CachedGraphTransactionTest.class,
CacheManagerTest.class,

VersionTest.class,
Expand Down
Loading