From a14b394f2003520a873b42e12a1ac3946ff9f62c Mon Sep 17 00:00:00 2001 From: wucc <77946882+DanGuge@users.noreply.github.com> Date: Sun, 3 Sep 2023 16:09:33 +0800 Subject: [PATCH 01/11] feat: enhanced consumers --- .../org/apache/hugegraph/util/Consumers.java | 130 +++++++++++++----- 1 file changed, 98 insertions(+), 32 deletions(-) diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java b/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java index 00689e0c5e..bf470df20e 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java @@ -27,16 +27,16 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; -import org.apache.hugegraph.config.CoreOptions; -import org.slf4j.Logger; - import org.apache.hugegraph.HugeException; +import org.apache.hugegraph.config.CoreOptions; import org.apache.hugegraph.task.TaskManager.ContextCallable; +import org.slf4j.Logger; public final class Consumers { @@ -49,13 +49,13 @@ public final class Consumers { private final ExecutorService executor; private final Consumer consumer; private final Runnable done; - + private final Consumer exceptionHandle; private final int workers; + private final List runnings; private final int queueSize; private final CountDownLatch latch; - private final BlockingQueue queue; - - private volatile boolean ending = false; + private final BlockingQueue> queue; + private final VWrapper queueEnd = new VWrapper(null); private volatile Throwable exception = null; public Consumers(ExecutorService executor, Consumer consumer) { @@ -64,22 +64,39 @@ public Consumers(ExecutorService executor, Consumer consumer) { public Consumers(ExecutorService executor, Consumer consumer, Runnable done) { + this(executor, consumer, done, QUEUE_WORKER_SIZE); + } + + public Consumers(ExecutorService executor, + Consumer consumer, + Runnable done, + int queueWorkerSize) { + this(executor, consumer, done, null, queueWorkerSize); + } + + public Consumers(ExecutorService executor, + Consumer consumer, + Runnable done, + Consumer handle, + int queueWorkerSize) { this.executor = executor; this.consumer = consumer; this.done = done; + this.exceptionHandle = handle; int workers = THREADS; if (this.executor instanceof ThreadPoolExecutor) { workers = ((ThreadPoolExecutor) this.executor).getCorePoolSize(); } this.workers = workers; - this.queueSize = QUEUE_WORKER_SIZE * workers; + + this.runnings = new ArrayList<>(workers); + this.queueSize = queueWorkerSize * workers + 1; this.latch = new CountDownLatch(workers); this.queue = new ArrayBlockingQueue<>(this.queueSize); } public void start(String name) { - this.ending = false; this.exception = null; if (this.executor == null) { return; @@ -87,7 +104,7 @@ public void start(String name) { LOG.info("Starting {} workers[{}] with queue size {}...", this.workers, name, this.queueSize); for (int i = 0; i < this.workers; i++) { - this.executor.submit(new ContextCallable<>(this::runAndDone)); + this.runnings.add(this.executor.submit(new ContextCallable<>(this::runAndDone))); } } @@ -95,11 +112,15 @@ private Void runAndDone() { try { this.run(); } catch (Throwable e) { - // Only the first exception of one thread can be stored - this.exception = e; - if (!(e instanceof StopExecution)) { + if (e instanceof StopExecution) { + this.queue.clear(); + putEnd(); + } else { + // Only the first exception of one thread can be stored + this.exception = e; LOG.error("Error when running task", e); } + exceptionHandle(e); } finally { this.done(); this.latch.countDown(); @@ -109,11 +130,7 @@ private Void runAndDone() { private void run() { LOG.debug("Start to work..."); - while (!this.ending) { - this.consume(); - } - assert this.ending; - while (this.consume()){ + while (this.consume()) { // ignore } @@ -121,21 +138,41 @@ private void run() { } private boolean consume() { - V elem; - try { - elem = this.queue.poll(CONSUMER_WAKE_PERIOD, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - // ignore - return true; + VWrapper elem = null; + while (elem == null) { + try { + elem = this.queue.poll(CONSUMER_WAKE_PERIOD, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + // ignore + return false; + } } - if (elem == null) { + + if (elem == queueEnd) { + putEnd(); return false; } // do job - this.consumer.accept(elem); + this.consumer.accept(elem.v); return true; } + private void exceptionHandle(Throwable e) { + if (this.exceptionHandle == null) { + return; + } + + try { + this.exceptionHandle.accept(e); + } catch (Throwable ex) { + if (this.exception == null) { + this.exception = ex; + } else { + LOG.warn("Error while calling exceptionHandle()", ex); + } + } + } + private void done() { if (this.done == null) { return; @@ -168,7 +205,17 @@ public void provide(V v) throws Throwable { throw this.throwException(); } else { try { - this.queue.put(v); + this.queue.put(new VWrapper<>(v)); + } catch (InterruptedException e) { + LOG.warn("Interrupted while enqueue", e); + } + } + } + + private void putEnd() { + if (this.executor != null) { + try { + this.queue.put(queueEnd); } catch (InterruptedException e) { LOG.warn("Interrupted while enqueue", e); } @@ -176,15 +223,18 @@ public void provide(V v) throws Throwable { } public void await() throws Throwable { - this.ending = true; if (this.executor == null) { // call done() directly if without thread pool this.done(); } else { try { + putEnd(); this.latch.await(); } catch (InterruptedException e) { String error = "Interrupted while waiting for consumers"; + for (Future f: this.runnings) { + f.cancel(true); + } this.exception = new HugeException(error, e); LOG.warn(error, e); } @@ -202,7 +252,7 @@ public ExecutorService executor() { public static void executeOncePerThread(ExecutorService executor, int totalThreads, Runnable callback) - throws InterruptedException { + throws InterruptedException { // Ensure callback execute at least once for every thread final Map threadsTimes = new ConcurrentHashMap<>(); final List> tasks = new ArrayList<>(); @@ -230,7 +280,7 @@ public static void executeOncePerThread(ExecutorService executor, for (int i = 0; i < totalThreads; i++) { tasks.add(task); } - executor.invokeAll(tasks); + executor.invokeAll(tasks, 5, TimeUnit.SECONDS); } public static ExecutorService newThreadPool(String prefix, int workers) { @@ -290,13 +340,21 @@ public synchronized ExecutorService getExecutor() { public synchronized void returnExecutor(ExecutorService executor) { E.checkNotNull(executor, "executor"); if (!this.executors.offer(executor)) { - executor.shutdown(); + try { + executor.shutdown(); + } catch (Exception e) { + LOG.warn("close ExecutorService with error:", e); + } } } public synchronized void destroy() { for (ExecutorService executor : this.executors) { - executor.shutdown(); + try { + executor.shutdownNow(); + } catch (Exception e) { + LOG.warn("close ExecutorService with error:", e); + } } this.executors.clear(); } @@ -314,4 +372,12 @@ public StopExecution(String message, Object... args) { super(message, args); } } + + public static class VWrapper { + public V v; + + public VWrapper(V v) { + this.v = v; + } + } } From eabe95a84af65447d77015473209db273a33af45 Mon Sep 17 00:00:00 2001 From: wucc <77946882+DanGuge@users.noreply.github.com> Date: Thu, 14 Sep 2023 16:52:56 +0800 Subject: [PATCH 02/11] add nested iterator && support batch execution --- .../backend/query/EdgesQueryIterator.java | 64 ++++++ .../traversal/algorithm/HugeTraverser.java | 38 +++ .../traversal/algorithm/OltpTraverser.java | 217 +++++++++++++++++- 3 files changed, 310 insertions(+), 9 deletions(-) create mode 100644 hugegraph-core/src/main/java/org/apache/hugegraph/backend/query/EdgesQueryIterator.java diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/query/EdgesQueryIterator.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/query/EdgesQueryIterator.java new file mode 100644 index 0000000000..cf773b4474 --- /dev/null +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/query/EdgesQueryIterator.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hugegraph.backend.query; + +import java.util.Iterator; +import java.util.List; + +import org.apache.hugegraph.backend.id.Id; +import org.apache.hugegraph.backend.tx.GraphTransaction; +import org.apache.hugegraph.type.define.Directions; + +public class EdgesQueryIterator implements Iterator { + private final List labels; + private final Directions directions; + private final long limit; + private final Iterator sources; + + public EdgesQueryIterator(Iterator sources, + Directions directions, + List labels, + long limit) { + this.sources = sources; + this.labels = labels; + this.directions = directions; + // Traverse NO_LIMIT 和 Query.NO_LIMIT 不同 + this.limit = limit < 0 ? Query.NO_LIMIT : limit; + } + + @Override + public boolean hasNext() { + return sources.hasNext(); + } + + @Override + public Query next() { + Id sourceId = this.sources.next(); + ConditionQuery query = GraphTransaction.constructEdgesQuery(sourceId, + this.directions, + this.labels); + if (this.limit != Query.NO_LIMIT) { + query.limit(this.limit); + query.capacity(this.limit); + } else { + query.capacity(Query.NO_CAPACITY); + } + return query; + } + +} diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/HugeTraverser.java b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/HugeTraverser.java index f5415d9c51..f5df51ddff 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/HugeTraverser.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/HugeTraverser.java @@ -17,6 +17,8 @@ package org.apache.hugegraph.traversal.algorithm; +import java.io.Closeable; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -37,6 +39,7 @@ import org.apache.hugegraph.backend.id.Id; import org.apache.hugegraph.backend.query.Aggregate; import org.apache.hugegraph.backend.query.ConditionQuery; +import org.apache.hugegraph.backend.query.EdgesQueryIterator; import org.apache.hugegraph.backend.query.Query; import org.apache.hugegraph.backend.query.QueryResults; import org.apache.hugegraph.backend.tx.GraphTransaction; @@ -66,6 +69,7 @@ import org.apache.hugegraph.util.collection.ObjectIntMapping; import org.apache.hugegraph.util.collection.ObjectIntMappingFactory; import org.apache.tinkerpop.gremlin.structure.Edge; +import org.apache.tinkerpop.gremlin.structure.util.CloseableIterator; import org.slf4j.Logger; import com.google.common.collect.ImmutableList; @@ -465,6 +469,13 @@ private Iterator edgesOfVertex(Id source, EdgeStep edgeStep, return edgeStep.skipSuperNodeIfNeeded(edges); } + public EdgesIterator edgesOfVertices(Iterator sources, + Directions dir, + List labelIds, + long degree) { + return new EdgesIterator(new EdgesQueryIterator(sources, dir, labelIds, degree)); + } + public Iterator edgesOfVertex(Id source, Steps steps) { List edgeLabels = steps.edgeLabels(); ConditionQuery cq = GraphTransaction.constructEdgesQuery( @@ -1004,4 +1015,31 @@ public Set getEdges(Iterator vertexIter) { return edges; } } + + public class EdgesIterator implements Iterator>, Closeable { + private final Iterator> currentIt; + + public EdgesIterator(EdgesQueryIterator queryIterator) { + List> iteratorList = new ArrayList<>(); + while (queryIterator.hasNext()) { + iteratorList.add(graph().edges(queryIterator.next())); + } + this.currentIt = iteratorList.iterator(); + } + + @Override + public boolean hasNext() { + return this.currentIt.hasNext(); + } + + @Override + public Iterator next() { + return this.currentIt.next(); + } + + @Override + public void close() throws IOException { + CloseableIterator.closeIterator(currentIt); + } + } } diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java index b05de24228..d3444fcb4b 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java @@ -17,27 +17,39 @@ package org.apache.hugegraph.traversal.algorithm; +import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; -import com.google.common.base.Objects; import org.apache.commons.lang3.tuple.Pair; import org.apache.hugegraph.HugeGraph; +import org.apache.hugegraph.backend.id.EdgeId; import org.apache.hugegraph.backend.id.Id; +import org.apache.hugegraph.backend.query.EdgesQueryIterator; import org.apache.hugegraph.config.CoreOptions; +import org.apache.hugegraph.iterator.FilterIterator; +import org.apache.hugegraph.iterator.MapperIterator; +import org.apache.hugegraph.structure.HugeEdge; +import org.apache.hugegraph.traversal.algorithm.steps.Steps; +import org.apache.hugegraph.type.define.Directions; import org.apache.hugegraph.util.Consumers; +import org.apache.tinkerpop.gremlin.structure.Edge; import org.apache.tinkerpop.gremlin.structure.Element; import org.apache.tinkerpop.gremlin.structure.Property; import org.apache.tinkerpop.gremlin.structure.Vertex; import org.apache.tinkerpop.gremlin.structure.util.CloseableIterator; -import org.apache.hugegraph.iterator.FilterIterator; +import com.google.common.base.Objects; public abstract class OltpTraverser extends HugeTraverser - implements AutoCloseable { + implements AutoCloseable { private static final String EXECUTOR_NAME = "oltp"; private static Consumers.ExecutorPool executors; @@ -59,11 +71,6 @@ protected OltpTraverser(HugeGraph graph) { } } - @Override - public void close() { - // pass - } - public static void destroy() { synchronized (OltpTraverser.class) { if (executors != null) { @@ -73,6 +80,11 @@ public static void destroy() { } } + @Override + public void close() { + // pass + } + protected long traversePairs(Iterator> pairs, Consumer> consumer) { return this.traverse(pairs, consumer, "traverse-pairs"); @@ -129,6 +141,64 @@ protected long traverse(Iterator iterator, Consumer consumer, return total; } + protected long traverseBatch(Iterator> iterator, + Consumer> consumer, + String name, int queueWorkerSize) { + if (!iterator.hasNext()) { + return 0L; + } + AtomicBoolean done = new AtomicBoolean(false); + Consumers> consumers = null; + try { + consumers = getConsumers(consumer, queueWorkerSize, done, + executors.getExecutor()); + return consumersStart(iterator, name, done, consumers); + } finally { + assert consumers != null; + executors.returnExecutor(consumers.executor()); + } + } + + private long consumersStart(Iterator> iterator, String name, + AtomicBoolean done, + Consumers> consumers) { + long total = 0L; + try { + consumers.start(name); + while (iterator.hasNext() && !done.get()) { + total++; + Iterator v = iterator.next(); + consumers.provide(v); + } + } catch (Consumers.StopExecution e) { + // pass + } catch (Throwable e) { + throw Consumers.wrapException(e); + } finally { + try { + consumers.await(); + } catch (Throwable e) { + throw Consumers.wrapException(e); + } finally { + CloseableIterator.closeIterator(iterator); + } + } + return total; + } + + private Consumers> getConsumers(Consumer> consumer, + int queueWorkerSize, + AtomicBoolean done, + ExecutorService executor) { + Consumers> consumers; + consumers = new Consumers<>(executor, + consumer, + null, + e -> done.set(true), + queueWorkerSize); + return consumers; + } + protected Iterator filter(Iterator vertices, String key, Object value) { return new FilterIterator<>(vertices, vertex -> { @@ -144,8 +214,41 @@ protected boolean match(Element elem, String key, Object value) { return p.isPresent() && Objects.equal(p.value(), value); } + protected void bfsQuery(Iterator vertices, + Directions dir, + Id label, + long degree, + long capacity, + Consumer parseConsumer) { + List labels = + label == null ? Collections.emptyList() : Collections.singletonList(label); + CapacityConsumer consumer = new CapacityConsumer(parseConsumer, capacity); + + EdgesIterator edgeIts = edgesOfVertices(vertices, dir, labels, degree); + // 并行乱序处理 + this.traverseBatch(edgeIts, consumer, "traverse-ite-edge", 1); + } + + protected void bfsQuery(Iterator vertices, + Steps steps, + long capacity, + Consumer parseConsumer) { + CapacityConsumerWithStep consumer = + new CapacityConsumerWithStep(parseConsumer, capacity, steps); + + EdgesQueryIterator queryIterator = + new EdgesQueryIterator(vertices, steps.direction(), steps.edgeLabels(), + steps.degree()); + + // 这里获取边数据,以便支持 step + EdgesIterator edgeIts = new EdgesIterator(queryIterator); + + // 并行乱序处理 + this.traverseBatch(edgeIts, consumer, "traverse-ite-edge", 1); + } + public static class ConcurrentMultiValuedMap - extends ConcurrentHashMap> { + extends ConcurrentHashMap> { private static final long serialVersionUID = -7249946839643493614L; @@ -175,4 +278,100 @@ public List getValues(K key) { return values; } } + + public static class ConcurrentVerticesConsumer implements Consumer { + private final Id sourceV; + private final Set excluded; + private final Set neighbors; + private final long limit; + private final AtomicInteger count = new AtomicInteger(0); + + public ConcurrentVerticesConsumer(Id sourceV, Set excluded, long limit, + Set neighbors) { + this.sourceV = sourceV; + this.excluded = excluded; + this.limit = limit; + this.neighbors = neighbors; + } + + @Override + public void accept(EdgeId edgeId) { + if (limit != NO_LIMIT && count.get() >= limit) { + throw new Consumers.StopExecution("reach limit"); + } + + Id targetV = edgeId.otherVertexId(); + if (sourceV.equals(targetV)) { + return; + } + + if (excluded != null && excluded.contains(targetV)) { + return; + } + + if (neighbors.add(targetV)) { + if (limit != NO_LIMIT) { + count.getAndIncrement(); + } + } + } + } + + public abstract class EdgeItConsumer implements Consumer> { + private final Consumer parseConsumer; + private final long capacity; + + public EdgeItConsumer(Consumer parseConsumer, long capacity) { + this.parseConsumer = parseConsumer; + this.capacity = capacity; + } + + protected abstract Iterator prepare(Iterator it); + + @Override + public void accept(Iterator edges) { + Iterator ids = prepare(edges); + long counter = 0; + while (ids.hasNext()) { + if (Thread.currentThread().isInterrupted()) { + LOG.warn("Consumer isInterrupted"); + break; + } + counter++; + parseConsumer.accept(ids.next()); + } + long total = edgeIterCounter.addAndGet(counter); + // 按批次检测 capacity,以提高性能 + if (this.capacity != NO_LIMIT && total >= capacity) { + throw new Consumers.StopExecution("reach capacity"); + } + } + } + + public class CapacityConsumer extends EdgeItConsumer { + public CapacityConsumer(Consumer parseConsumer, long capacity) { + super(parseConsumer, capacity); + } + + @Override + protected Iterator prepare(Iterator edges) { + return new MapperIterator<>(edges, (e) -> ((HugeEdge) e).id()); + } + } + + public class CapacityConsumerWithStep extends EdgeItConsumer { + private final Steps steps; + + public CapacityConsumerWithStep(Consumer parseConsumer, long capacity, + Steps steps) { + super(parseConsumer, capacity); + this.steps = steps; + } + + @Override + protected Iterator prepare(Iterator edges) { + return edgesOfVertexStep(edges, steps); + } + } + } From 0d433cadf7bb79b3ee50a80f3d654103fed13a84 Mon Sep 17 00:00:00 2001 From: wucc <77946882+DanGuge@users.noreply.github.com> Date: Thu, 14 Sep 2023 16:53:44 +0800 Subject: [PATCH 03/11] support thread parallel & batch execution in kout and kneighbor --- .../algorithm/KneighborTraverser.java | 58 +++++++------- .../traversal/algorithm/KoutTraverser.java | 77 ++++++++++--------- .../algorithm/records/KneighborRecords.java | 14 +++- 3 files changed, 84 insertions(+), 65 deletions(-) diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KneighborTraverser.java b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KneighborTraverser.java index 9f16f480b2..ad420fdc7d 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KneighborTraverser.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KneighborTraverser.java @@ -17,11 +17,11 @@ package org.apache.hugegraph.traversal.algorithm; -import java.util.Iterator; import java.util.Set; import java.util.function.Consumer; import org.apache.hugegraph.HugeGraph; +import org.apache.hugegraph.backend.id.EdgeId; import org.apache.hugegraph.backend.id.Id; import org.apache.hugegraph.structure.HugeEdge; import org.apache.hugegraph.traversal.algorithm.records.KneighborRecords; @@ -48,25 +48,28 @@ public Set kneighbor(Id sourceV, Directions dir, Id labelId = this.getEdgeLabelId(label); - Set latest = newSet(); - Set all = newSet(); + KneighborRecords records = new KneighborRecords(true, sourceV, true); - latest.add(sourceV); - this.vertexIterCounter.addAndGet(1L); + Consumer consumer = edgeId -> { + if (this.reachLimit(limit, records.size())) { + return; + } + records.addPath(edgeId.ownerVertexId(), edgeId.otherVertexId()); + this.edgeIterCounter.addAndGet(1L); + }; while (depth-- > 0) { - long remaining = limit == NO_LIMIT ? NO_LIMIT : limit - all.size(); - latest = this.adjacentVertices(sourceV, latest, dir, labelId, - all, degree, remaining); - all.addAll(latest); - this.vertexIterCounter.addAndGet(1L); - this.edgeIterCounter.addAndGet(latest.size()); - if (reachLimit(limit, all.size())) { + records.startOneLayer(true); + bfsQuery(records.keys(), dir, labelId, degree, NO_LIMIT, consumer); + records.finishOneLayer(); + if (reachLimit(limit, records.size())) { break; } } - return all; + this.vertexIterCounter.addAndGet(records.size()); + + return records.idSet(limit); } public KneighborRecords customizedKneighbor(Id source, Steps steps, @@ -76,33 +79,30 @@ public KneighborRecords customizedKneighbor(Id source, Steps steps, checkPositive(maxDepth, "k-neighbor max_depth"); checkLimit(limit); - boolean concurrent = maxDepth >= this.concurrentDepth(); - - KneighborRecords records = new KneighborRecords(concurrent, + KneighborRecords records = new KneighborRecords(true, source, true); - Consumer consumer = v -> { + Consumer consumer = edge -> { if (this.reachLimit(limit, records.size())) { return; } - Iterator edges = edgesOfVertex(v, steps); - this.vertexIterCounter.addAndGet(1L); - while (!this.reachLimit(limit, records.size()) && edges.hasNext()) { - HugeEdge edge = (HugeEdge) edges.next(); - Id target = edge.id().otherVertexId(); - records.addPath(v, target); - - records.edgeResults().addEdge(v, target, edge); - - this.edgeIterCounter.addAndGet(1L); - } + EdgeId edgeId = ((HugeEdge) edge).id(); + records.addPath(edgeId.ownerVertexId(), edgeId.otherVertexId()); + records.edgeResults().addEdge(edgeId.ownerVertexId(), edgeId.otherVertexId(), edge); + this.edgeIterCounter.addAndGet(1L); }; while (maxDepth-- > 0) { records.startOneLayer(true); - traverseIds(records.keys(), consumer, concurrent); + bfsQuery(records.keys(), steps, NO_LIMIT, consumer); records.finishOneLayer(); + if (this.reachLimit(limit, records.size())) { + break; + } } + + this.vertexIterCounter.addAndGet(records.size()); + return records; } diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KoutTraverser.java b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KoutTraverser.java index 9924c766c5..421899247f 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KoutTraverser.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KoutTraverser.java @@ -18,12 +18,15 @@ package org.apache.hugegraph.traversal.algorithm; import java.util.Iterator; +import java.util.List; import java.util.Set; import java.util.function.Consumer; import org.apache.hugegraph.HugeException; import org.apache.hugegraph.HugeGraph; +import org.apache.hugegraph.backend.id.EdgeId; import org.apache.hugegraph.backend.id.Id; +import org.apache.hugegraph.backend.query.Query; import org.apache.hugegraph.structure.HugeEdge; import org.apache.hugegraph.traversal.algorithm.records.KoutRecords; import org.apache.hugegraph.traversal.algorithm.steps.Steps; @@ -57,34 +60,45 @@ public Set kout(Id sourceV, Directions dir, String label, Id labelId = this.getEdgeLabelId(label); - Set latest = newIdSet(); - latest.add(sourceV); + Set sources = newIdSet(); + Set neighbors = newIdSet(); + Set visited = nearest ? newIdSet() : null; - Set all = newIdSet(); - all.add(sourceV); + neighbors.add(sourceV); + + ConcurrentVerticesConsumer consumer; + + long remaining = capacity == NO_LIMIT ? NO_LIMIT : capacity - 1; - long remaining = capacity == NO_LIMIT ? - NO_LIMIT : capacity - latest.size(); - this.vertexIterCounter.addAndGet(1L); while (depth-- > 0) { // Just get limit nodes in last layer if limit < remaining capacity if (depth == 0 && limit != NO_LIMIT && (limit < remaining || remaining == NO_LIMIT)) { remaining = limit; } - if (nearest) { - latest = this.adjacentVertices(sourceV, latest, dir, labelId, - all, degree, remaining); - all.addAll(latest); - } else { - latest = this.adjacentVertices(sourceV, latest, dir, labelId, - null, degree, remaining); + + if (visited != null) { + visited.addAll(neighbors); } - this.vertexIterCounter.addAndGet(1L); - this.edgeIterCounter.addAndGet(latest.size()); + + // swap sources and neighbors + Set tmp = neighbors; + neighbors = sources; + sources = tmp; + + // start + consumer = new ConcurrentVerticesConsumer(sourceV, visited, remaining, neighbors); + + this.vertexIterCounter.addAndGet(sources.size()); + this.edgeIterCounter.addAndGet(neighbors.size()); + + bfsQuery(sources.iterator(), dir, labelId, degree, capacity, consumer); + + sources.clear(); + if (capacity != NO_LIMIT) { // Update 'remaining' value to record remaining capacity - remaining -= latest.size(); + remaining -= neighbors.size(); if (remaining <= 0 && depth > 0) { throw new HugeException( @@ -94,7 +108,7 @@ public Set kout(Id sourceV, Directions dir, String label, } } - return latest; + return neighbors; } public KoutRecords customizedKout(Id source, Steps steps, @@ -107,33 +121,26 @@ public KoutRecords customizedKout(Id source, Steps steps, checkLimit(limit); long[] depth = new long[1]; depth[0] = maxDepth; - boolean concurrent = maxDepth >= this.concurrentDepth(); - KoutRecords records = new KoutRecords(concurrent, source, nearest, 0); + KoutRecords records = new KoutRecords(true, source, nearest, 0); - Consumer consumer = v -> { + Consumer consumer = edge -> { if (this.reachLimit(limit, depth[0], records.size())) { return; } - Iterator edges = edgesOfVertex(v, steps); - this.vertexIterCounter.addAndGet(1L); - while (!this.reachLimit(limit, depth[0], records.size()) && - edges.hasNext()) { - HugeEdge edge = (HugeEdge) edges.next(); - Id target = edge.id().otherVertexId(); - records.addPath(v, target); - this.checkCapacity(capacity, records.accessed(), depth[0]); - - records.edgeResults().addEdge(v, target, edge); - - this.edgeIterCounter.addAndGet(1L); - } + EdgeId edgeId = ((HugeEdge) edge).id(); + records.addPath(edgeId.ownerVertexId(), edgeId.otherVertexId()); + records.edgeResults().addEdge(edgeId.ownerVertexId(), edgeId.otherVertexId(), edge); + this.edgeIterCounter.addAndGet(1L); }; while (depth[0]-- > 0) { + List sources = records.ids(Query.NO_LIMIT); records.startOneLayer(true); - this.traverseIds(records.keys(), consumer, concurrent); + bfsQuery(sources.iterator(), steps, capacity, consumer); + this.vertexIterCounter.addAndGet(sources.size()); records.finishOneLayer(); + checkCapacity(capacity, records.accessed(), depth[0]); } return records; } diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/records/KneighborRecords.java b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/records/KneighborRecords.java index 7e04a286c3..83decd21db 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/records/KneighborRecords.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/records/KneighborRecords.java @@ -19,7 +19,9 @@ import static org.apache.hugegraph.traversal.algorithm.HugeTraverser.NO_LIMIT; +import java.util.Collection; import java.util.List; +import java.util.Set; import java.util.Stack; import org.apache.hugegraph.backend.id.Id; @@ -45,6 +47,17 @@ public int size() { @Override public List ids(long limit) { List ids = CollectionFactory.newList(CollectionType.EC); + this.getRecords(limit, ids); + return ids; + } + + public Set idSet(long limit) { + Set ids = CollectionFactory.newSet(CollectionType.EC); + this.getRecords(limit, ids); + return ids; + } + + private void getRecords(long limit, Collection ids) { Stack records = this.records(); // Not include record(i=0) to ignore source vertex for (int i = 1; i < records.size(); i++) { @@ -54,7 +67,6 @@ public List ids(long limit) { limit--; } } - return ids; } @Override From c4a399db776e7dda11a5d9dba63d8be7320cb3d5 Mon Sep 17 00:00:00 2001 From: wucc <77946882+DanGuge@users.noreply.github.com> Date: Fri, 15 Sep 2023 17:17:54 +0800 Subject: [PATCH 04/11] fix: repeated addition of edgeIterCounter --- .../hugegraph/traversal/algorithm/KneighborTraverser.java | 2 -- .../org/apache/hugegraph/traversal/algorithm/KoutTraverser.java | 1 - .../org/apache/hugegraph/traversal/algorithm/OltpTraverser.java | 2 +- 3 files changed, 1 insertion(+), 4 deletions(-) diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KneighborTraverser.java b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KneighborTraverser.java index ad420fdc7d..1220f16ed7 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KneighborTraverser.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KneighborTraverser.java @@ -55,7 +55,6 @@ public Set kneighbor(Id sourceV, Directions dir, return; } records.addPath(edgeId.ownerVertexId(), edgeId.otherVertexId()); - this.edgeIterCounter.addAndGet(1L); }; while (depth-- > 0) { @@ -89,7 +88,6 @@ public KneighborRecords customizedKneighbor(Id source, Steps steps, EdgeId edgeId = ((HugeEdge) edge).id(); records.addPath(edgeId.ownerVertexId(), edgeId.otherVertexId()); records.edgeResults().addEdge(edgeId.ownerVertexId(), edgeId.otherVertexId(), edge); - this.edgeIterCounter.addAndGet(1L); }; while (maxDepth-- > 0) { diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KoutTraverser.java b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KoutTraverser.java index 421899247f..165e60188e 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KoutTraverser.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KoutTraverser.java @@ -131,7 +131,6 @@ public KoutRecords customizedKout(Id source, Steps steps, EdgeId edgeId = ((HugeEdge) edge).id(); records.addPath(edgeId.ownerVertexId(), edgeId.otherVertexId()); records.edgeResults().addEdge(edgeId.ownerVertexId(), edgeId.otherVertexId(), edge); - this.edgeIterCounter.addAndGet(1L); }; while (depth[0]-- > 0) { diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java index d3444fcb4b..bea0d284d6 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java @@ -334,7 +334,7 @@ public void accept(Iterator edges) { long counter = 0; while (ids.hasNext()) { if (Thread.currentThread().isInterrupted()) { - LOG.warn("Consumer isInterrupted"); + LOG.warn("Consumer is Interrupted"); break; } counter++; From b7d4ec7c477654e0e17be60353017c2963b6e7f4 Mon Sep 17 00:00:00 2001 From: wucc <77946882+DanGuge@users.noreply.github.com> Date: Fri, 15 Sep 2023 17:23:42 +0800 Subject: [PATCH 05/11] fix: format code --- .../traversal/algorithm/OltpTraverser.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java index bea0d284d6..203752a5d1 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java @@ -49,7 +49,7 @@ import com.google.common.base.Objects; public abstract class OltpTraverser extends HugeTraverser - implements AutoCloseable { + implements AutoCloseable { private static final String EXECUTOR_NAME = "oltp"; private static Consumers.ExecutorPool executors; @@ -71,6 +71,11 @@ protected OltpTraverser(HugeGraph graph) { } } + @Override + public void close() { + // pass + } + public static void destroy() { synchronized (OltpTraverser.class) { if (executors != null) { @@ -80,11 +85,6 @@ public static void destroy() { } } - @Override - public void close() { - // pass - } - protected long traversePairs(Iterator> pairs, Consumer> consumer) { return this.traverse(pairs, consumer, "traverse-pairs"); @@ -248,7 +248,7 @@ protected void bfsQuery(Iterator vertices, } public static class ConcurrentMultiValuedMap - extends ConcurrentHashMap> { + extends ConcurrentHashMap> { private static final long serialVersionUID = -7249946839643493614L; From f5432239c1b7ce804bfc2004d90596d0038c5721 Mon Sep 17 00:00:00 2001 From: wucc <77946882+DanGuge@users.noreply.github.com> Date: Sat, 16 Sep 2023 15:23:32 +0800 Subject: [PATCH 06/11] fix: change param name style & format code --- .../apache/hugegraph/task/TaskManager.java | 21 ++- .../algorithm/KneighborTraverser.java | 6 +- .../traversal/algorithm/KoutTraverser.java | 4 +- .../traversal/algorithm/OltpTraverser.java | 173 +++++++++--------- .../algorithm/records/KneighborRecords.java | 2 +- .../org/apache/hugegraph/util/Consumers.java | 58 +++--- .../backend/store/rocksdb/RocksDBStore.java | 8 +- 7 files changed, 139 insertions(+), 133 deletions(-) diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java b/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java index 524a1f7593..0149da913e 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java @@ -26,16 +26,17 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.apache.hugegraph.HugeException; +import org.apache.hugegraph.HugeGraphParams; +import org.apache.hugegraph.concurrent.PausableScheduledThreadPool; import org.apache.hugegraph.type.define.NodeRole; -import org.apache.hugegraph.util.*; import org.apache.hugegraph.util.Consumers; +import org.apache.hugegraph.util.E; +import org.apache.hugegraph.util.ExecutorUtil; import org.apache.hugegraph.util.LockUtil; +import org.apache.hugegraph.util.Log; import org.slf4j.Logger; -import org.apache.hugegraph.HugeException; -import org.apache.hugegraph.HugeGraphParams; -import org.apache.hugegraph.concurrent.PausableScheduledThreadPool; - public final class TaskManager { private static final Logger LOG = Log.logger(TaskManager.class); @@ -47,7 +48,7 @@ public final class TaskManager { "server-info-db-worker-%d"; public static final String TASK_SCHEDULER = "task-scheduler-%d"; - protected static final long SCHEDULE_PERIOD = 1000L; // unit ms + static final long SCHEDULE_PERIOD = 1000L; // unit ms private static final int THREADS = 4; private static final TaskManager MANAGER = new TaskManager(THREADS); @@ -134,7 +135,7 @@ private void closeTaskTx(HugeGraphParams graph) { graph.closeTx(); } else { Consumers.executeOncePerThread(this.taskExecutor, totalThreads, - graph::closeTx); + graph::closeTx, 5, TimeUnit.SECONDS); } } catch (Exception e) { throw new HugeException("Exception when closing task tx", e); @@ -242,7 +243,7 @@ public int pendingTasks() { return size; } - protected void notifyNewTask(HugeTask task) { + void notifyNewTask(HugeTask task) { Queue queue = ((ThreadPoolExecutor) this.schedulerExecutor) .getQueue(); if (queue.size() <= 1) { @@ -357,11 +358,11 @@ private void scheduleOrExecuteJobForGraph(StandardTaskScheduler scheduler) { private static final ThreadLocal CONTEXTS = new ThreadLocal<>(); - protected static void setContext(String context) { + static void setContext(String context) { CONTEXTS.set(context); } - protected static void resetContext() { + static void resetContext() { CONTEXTS.remove(); } diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KneighborTraverser.java b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KneighborTraverser.java index 1220f16ed7..565d0af5f6 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KneighborTraverser.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KneighborTraverser.java @@ -59,7 +59,7 @@ public Set kneighbor(Id sourceV, Directions dir, while (depth-- > 0) { records.startOneLayer(true); - bfsQuery(records.keys(), dir, labelId, degree, NO_LIMIT, consumer); + traverseIdsByBfs(records.keys(), dir, labelId, degree, NO_LIMIT, consumer); records.finishOneLayer(); if (reachLimit(limit, records.size())) { break; @@ -68,7 +68,7 @@ public Set kneighbor(Id sourceV, Directions dir, this.vertexIterCounter.addAndGet(records.size()); - return records.idSet(limit); + return records.idsBySet(limit); } public KneighborRecords customizedKneighbor(Id source, Steps steps, @@ -92,7 +92,7 @@ public KneighborRecords customizedKneighbor(Id source, Steps steps, while (maxDepth-- > 0) { records.startOneLayer(true); - bfsQuery(records.keys(), steps, NO_LIMIT, consumer); + traverseIdsByBfs(records.keys(), steps, NO_LIMIT, consumer); records.finishOneLayer(); if (this.reachLimit(limit, records.size())) { break; diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KoutTraverser.java b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KoutTraverser.java index 165e60188e..c683694c14 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KoutTraverser.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KoutTraverser.java @@ -92,7 +92,7 @@ public Set kout(Id sourceV, Directions dir, String label, this.vertexIterCounter.addAndGet(sources.size()); this.edgeIterCounter.addAndGet(neighbors.size()); - bfsQuery(sources.iterator(), dir, labelId, degree, capacity, consumer); + traverseIdsByBfs(sources.iterator(), dir, labelId, degree, capacity, consumer); sources.clear(); @@ -136,7 +136,7 @@ public KoutRecords customizedKout(Id source, Steps steps, while (depth[0]-- > 0) { List sources = records.ids(Query.NO_LIMIT); records.startOneLayer(true); - bfsQuery(sources.iterator(), steps, capacity, consumer); + traverseIdsByBfs(sources.iterator(), steps, capacity, consumer); this.vertexIterCounter.addAndGet(sources.size()); records.finishOneLayer(); checkCapacity(capacity, records.accessed(), depth[0]); diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java index 203752a5d1..b31ccbef49 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java @@ -87,7 +87,7 @@ public static void destroy() { protected long traversePairs(Iterator> pairs, Consumer> consumer) { - return this.traverse(pairs, consumer, "traverse-pairs"); + return this.traverseByOne(pairs, consumer, "traverse-pairs"); } protected long traverseIds(Iterator ids, Consumer consumer, @@ -105,18 +105,19 @@ protected long traverseIds(Iterator ids, Consumer consumer, } protected long traverseIds(Iterator ids, Consumer consumer) { - return this.traverse(ids, consumer, "traverse-ids"); + return this.traverseByOne(ids, consumer, "traverse-ids"); } - protected long traverse(Iterator iterator, Consumer consumer, - String name) { + protected long traverseByOne(Iterator iterator, + Consumer consumer, + String taskName) { if (!iterator.hasNext()) { return 0L; } Consumers consumers = new Consumers<>(executors.getExecutor(), consumer, null); - consumers.start(name); + consumers.start(taskName); long total = 0L; try { while (iterator.hasNext()) { @@ -141,33 +142,69 @@ protected long traverse(Iterator iterator, Consumer consumer, return total; } - protected long traverseBatch(Iterator> iterator, - Consumer> consumer, - String name, int queueWorkerSize) { - if (!iterator.hasNext()) { + protected void traverseIdsByBfs(Iterator vertices, + Directions dir, + Id label, + long degree, + long capacity, + Consumer consumer) { + List labels = + label == null ? Collections.emptyList() : Collections.singletonList(label); + OneStepEdgeIterConsumer edgeIterConsumer = new OneStepEdgeIterConsumer(consumer, capacity); + + EdgesIterator edgeIter = edgesOfVertices(vertices, dir, labels, degree); + + // parallel out-of-order execution + this.traverseByBatch(edgeIter, edgeIterConsumer, "traverse-ite-edge", 1); + } + + protected void traverseIdsByBfs(Iterator vertices, + Steps steps, + long capacity, + Consumer consumer) { + StepsEdgeIterConsumer edgeIterConsumer = + new StepsEdgeIterConsumer(consumer, capacity, steps); + + EdgesQueryIterator queryIterator = new EdgesQueryIterator(vertices, + steps.direction(), + steps.edgeLabels(), + steps.degree()); + + // get Iterator> from Iterator + EdgesIterator edgeIter = new EdgesIterator(queryIterator); + + // parallel out-of-order execution + this.traverseByBatch(edgeIter, edgeIterConsumer, "traverse-ite-edge", 1); + } + + protected long traverseByBatch(Iterator> sources, + Consumer> consumer, + String taskName, int concurrentWorkers) { + if (!sources.hasNext()) { return 0L; } AtomicBoolean done = new AtomicBoolean(false); Consumers> consumers = null; try { - consumers = getConsumers(consumer, queueWorkerSize, done, - executors.getExecutor()); - return consumersStart(iterator, name, done, consumers); + consumers = buildConsumers(consumer, concurrentWorkers, done, + executors.getExecutor()); + return startConsumers(sources, taskName, done, consumers); } finally { assert consumers != null; executors.returnExecutor(consumers.executor()); } } - private long consumersStart(Iterator> iterator, String name, + private long startConsumers(Iterator> sources, + String taskName, AtomicBoolean done, Consumers> consumers) { long total = 0L; try { - consumers.start(name); - while (iterator.hasNext() && !done.get()) { + consumers.start(taskName); + while (sources.hasNext() && !done.get()) { total++; - Iterator v = iterator.next(); + Iterator v = sources.next(); consumers.provide(v); } } catch (Consumers.StopExecution e) { @@ -180,30 +217,26 @@ private long consumersStart(Iterator> iterator, String name, } catch (Throwable e) { throw Consumers.wrapException(e); } finally { - CloseableIterator.closeIterator(iterator); + CloseableIterator.closeIterator(sources); } } return total; } - private Consumers> getConsumers(Consumer> consumer, - int queueWorkerSize, - AtomicBoolean done, - ExecutorService executor) { - Consumers> consumers; - consumers = new Consumers<>(executor, - consumer, - null, - e -> done.set(true), - queueWorkerSize); - return consumers; + private Consumers> buildConsumers(Consumer> consumer, + int queueWorkerSize, + AtomicBoolean done, + ExecutorService executor) { + return new Consumers<>(executor, + consumer, + null, + e -> done.set(true), + queueWorkerSize); } protected Iterator filter(Iterator vertices, String key, Object value) { - return new FilterIterator<>(vertices, vertex -> { - return match(vertex, key, value); - }); + return new FilterIterator<>(vertices, vertex -> match(vertex, key, value)); } protected boolean match(Element elem, String key, Object value) { @@ -214,39 +247,6 @@ protected boolean match(Element elem, String key, Object value) { return p.isPresent() && Objects.equal(p.value(), value); } - protected void bfsQuery(Iterator vertices, - Directions dir, - Id label, - long degree, - long capacity, - Consumer parseConsumer) { - List labels = - label == null ? Collections.emptyList() : Collections.singletonList(label); - CapacityConsumer consumer = new CapacityConsumer(parseConsumer, capacity); - - EdgesIterator edgeIts = edgesOfVertices(vertices, dir, labels, degree); - // 并行乱序处理 - this.traverseBatch(edgeIts, consumer, "traverse-ite-edge", 1); - } - - protected void bfsQuery(Iterator vertices, - Steps steps, - long capacity, - Consumer parseConsumer) { - CapacityConsumerWithStep consumer = - new CapacityConsumerWithStep(parseConsumer, capacity, steps); - - EdgesQueryIterator queryIterator = - new EdgesQueryIterator(vertices, steps.direction(), steps.edgeLabels(), - steps.degree()); - - // 这里获取边数据,以便支持 step - EdgesIterator edgeIts = new EdgesIterator(queryIterator); - - // 并行乱序处理 - this.traverseBatch(edgeIts, consumer, "traverse-ite-edge", 1); - } - public static class ConcurrentMultiValuedMap extends ConcurrentHashMap> { @@ -280,6 +280,7 @@ public List getValues(K key) { } public static class ConcurrentVerticesConsumer implements Consumer { + private final Id sourceV; private final Set excluded; private final Set neighbors; @@ -317,20 +318,21 @@ public void accept(EdgeId edgeId) { } } - public abstract class EdgeItConsumer implements Consumer> { - private final Consumer parseConsumer; + public abstract class EdgesConsumer implements Consumer> { + + private final Consumer consumer; private final long capacity; - public EdgeItConsumer(Consumer parseConsumer, long capacity) { - this.parseConsumer = parseConsumer; + public EdgesConsumer(Consumer consumer, long capacity) { + this.consumer = consumer; this.capacity = capacity; } - protected abstract Iterator prepare(Iterator it); + protected abstract Iterator prepare(Iterator iter); @Override - public void accept(Iterator edges) { - Iterator ids = prepare(edges); + public void accept(Iterator edgeIter) { + Iterator ids = prepare(edgeIter); long counter = 0; while (ids.hasNext()) { if (Thread.currentThread().isInterrupted()) { @@ -338,40 +340,41 @@ public void accept(Iterator edges) { break; } counter++; - parseConsumer.accept(ids.next()); + consumer.accept(ids.next()); } long total = edgeIterCounter.addAndGet(counter); - // 按批次检测 capacity,以提高性能 + // traverse by batch & improve performance if (this.capacity != NO_LIMIT && total >= capacity) { throw new Consumers.StopExecution("reach capacity"); } } } - public class CapacityConsumer extends EdgeItConsumer { - public CapacityConsumer(Consumer parseConsumer, long capacity) { - super(parseConsumer, capacity); + public class OneStepEdgeIterConsumer extends EdgesConsumer { + + public OneStepEdgeIterConsumer(Consumer consumer, long capacity) { + super(consumer, capacity); } @Override - protected Iterator prepare(Iterator edges) { - return new MapperIterator<>(edges, (e) -> ((HugeEdge) e).id()); + protected Iterator prepare(Iterator edgeIter) { + return new MapperIterator<>(edgeIter, (e) -> ((HugeEdge) e).id()); } } - public class CapacityConsumerWithStep extends EdgeItConsumer { + public class StepsEdgeIterConsumer extends EdgesConsumer { + private final Steps steps; - public CapacityConsumerWithStep(Consumer parseConsumer, long capacity, - Steps steps) { - super(parseConsumer, capacity); + public StepsEdgeIterConsumer(Consumer consumer, long capacity, + Steps steps) { + super(consumer, capacity); this.steps = steps; } @Override - protected Iterator prepare(Iterator edges) { - return edgesOfVertexStep(edges, steps); + protected Iterator prepare(Iterator edgeIter) { + return edgesOfVertexStep(edgeIter, steps); } } - } diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/records/KneighborRecords.java b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/records/KneighborRecords.java index 83decd21db..649b1c2116 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/records/KneighborRecords.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/records/KneighborRecords.java @@ -51,7 +51,7 @@ public List ids(long limit) { return ids; } - public Set idSet(long limit) { + public Set idsBySet(long limit) { Set ids = CollectionFactory.newSet(CollectionType.EC); this.getRecords(limit, ids); return ids; diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java b/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java index bf470df20e..e027c5e963 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java @@ -43,19 +43,19 @@ public final class Consumers { public static final int THREADS = 4 + CoreOptions.CPUS / 4; public static final int QUEUE_WORKER_SIZE = 1000; public static final long CONSUMER_WAKE_PERIOD = 1; + private static final Object QUEUE_END = new VWrapper(null); private static final Logger LOG = Log.logger(Consumers.class); private final ExecutorService executor; private final Consumer consumer; - private final Runnable done; + private final Runnable doneHandle; private final Consumer exceptionHandle; private final int workers; - private final List runnings; + private final List runningFutures; private final int queueSize; private final CountDownLatch latch; private final BlockingQueue> queue; - private final VWrapper queueEnd = new VWrapper(null); private volatile Throwable exception = null; public Consumers(ExecutorService executor, Consumer consumer) { @@ -63,26 +63,26 @@ public Consumers(ExecutorService executor, Consumer consumer) { } public Consumers(ExecutorService executor, - Consumer consumer, Runnable done) { - this(executor, consumer, done, QUEUE_WORKER_SIZE); + Consumer consumer, Runnable doneHandle) { + this(executor, consumer, doneHandle, QUEUE_WORKER_SIZE); } public Consumers(ExecutorService executor, Consumer consumer, - Runnable done, - int queueWorkerSize) { - this(executor, consumer, done, null, queueWorkerSize); + Runnable doneHandle, + int queueSizePerWorker) { + this(executor, consumer, doneHandle, null, queueSizePerWorker); } public Consumers(ExecutorService executor, Consumer consumer, - Runnable done, - Consumer handle, - int queueWorkerSize) { + Runnable doneHandle, + Consumer exceptionHandle, + int queueSizePerWorker) { this.executor = executor; this.consumer = consumer; - this.done = done; - this.exceptionHandle = handle; + this.doneHandle = doneHandle; + this.exceptionHandle = exceptionHandle; int workers = THREADS; if (this.executor instanceof ThreadPoolExecutor) { @@ -90,8 +90,8 @@ public Consumers(ExecutorService executor, } this.workers = workers; - this.runnings = new ArrayList<>(workers); - this.queueSize = queueWorkerSize * workers + 1; + this.runningFutures = new ArrayList<>(workers); + this.queueSize = queueSizePerWorker * workers + 1; this.latch = new CountDownLatch(workers); this.queue = new ArrayBlockingQueue<>(this.queueSize); } @@ -104,7 +104,7 @@ public void start(String name) { LOG.info("Starting {} workers[{}] with queue size {}...", this.workers, name, this.queueSize); for (int i = 0; i < this.workers; i++) { - this.runnings.add(this.executor.submit(new ContextCallable<>(this::runAndDone))); + this.runningFutures.add(this.executor.submit(new ContextCallable<>(this::runAndDone))); } } @@ -114,7 +114,7 @@ private Void runAndDone() { } catch (Throwable e) { if (e instanceof StopExecution) { this.queue.clear(); - putEnd(); + putQueueEnd(); } else { // Only the first exception of one thread can be stored this.exception = e; @@ -148,8 +148,8 @@ private boolean consume() { } } - if (elem == queueEnd) { - putEnd(); + if (elem == QUEUE_END) { + putQueueEnd(); return false; } // do job @@ -174,12 +174,12 @@ private void exceptionHandle(Throwable e) { } private void done() { - if (this.done == null) { + if (this.doneHandle == null) { return; } try { - this.done.run(); + this.doneHandle.run(); } catch (Throwable e) { if (this.exception == null) { this.exception = e; @@ -212,10 +212,10 @@ public void provide(V v) throws Throwable { } } - private void putEnd() { + private void putQueueEnd() { if (this.executor != null) { try { - this.queue.put(queueEnd); + this.queue.put((VWrapper) QUEUE_END); } catch (InterruptedException e) { LOG.warn("Interrupted while enqueue", e); } @@ -228,11 +228,11 @@ public void await() throws Throwable { this.done(); } else { try { - putEnd(); + putQueueEnd(); this.latch.await(); } catch (InterruptedException e) { String error = "Interrupted while waiting for consumers"; - for (Future f: this.runnings) { + for (Future f: this.runningFutures) { f.cancel(true); } this.exception = new HugeException(error, e); @@ -251,8 +251,10 @@ public ExecutorService executor() { public static void executeOncePerThread(ExecutorService executor, int totalThreads, - Runnable callback) - throws InterruptedException { + Runnable callback, + int invokeTimeout, + TimeUnit unit) + throws InterruptedException { // Ensure callback execute at least once for every thread final Map threadsTimes = new ConcurrentHashMap<>(); final List> tasks = new ArrayList<>(); @@ -280,7 +282,7 @@ public static void executeOncePerThread(ExecutorService executor, for (int i = 0; i < totalThreads; i++) { tasks.add(task); } - executor.invokeAll(tasks, 5, TimeUnit.SECONDS); + executor.invokeAll(tasks, invokeTimeout, unit); } public static ExecutorService newThreadPool(String prefix, int workers) { diff --git a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java index 2dba5fa766..b26fba78ba 100644 --- a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java +++ b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java @@ -44,9 +44,6 @@ import java.util.stream.Collectors; import org.apache.commons.io.FileUtils; -import org.rocksdb.RocksDBException; -import org.slf4j.Logger; - import org.apache.hugegraph.HugeException; import org.apache.hugegraph.backend.BackendException; import org.apache.hugegraph.backend.id.Id; @@ -69,6 +66,9 @@ import org.apache.hugegraph.util.ExecutorUtil; import org.apache.hugegraph.util.InsertionOrderUtil; import org.apache.hugegraph.util.Log; +import org.rocksdb.RocksDBException; +import org.slf4j.Logger; + import com.google.common.collect.ImmutableList; public abstract class RocksDBStore extends AbstractBackendStore { @@ -279,7 +279,7 @@ private void shutdownOpenPool(ExecutorService openPool) { this.useSessions(); try { Consumers.executeOncePerThread(openPool, OPEN_POOL_THREADS, - this::closeSessions); + this::closeSessions, 5, TimeUnit.SECONDS); } catch (InterruptedException e) { throw new BackendException("Failed to close session opened by " + "open-pool"); From 5187636b484279237f2b73555483d3c50cda354b Mon Sep 17 00:00:00 2001 From: wucc <77946882+DanGuge@users.noreply.github.com> Date: Sun, 17 Sep 2023 10:30:24 +0800 Subject: [PATCH 07/11] fix: code style --- .../apache/hugegraph/task/TaskManager.java | 11 ++++---- .../traversal/algorithm/OltpTraverser.java | 26 +++++++++---------- .../org/apache/hugegraph/util/Consumers.java | 9 +++---- .../backend/store/rocksdb/RocksDBStore.java | 3 ++- 4 files changed, 25 insertions(+), 24 deletions(-) diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java b/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java index 0149da913e..e85aace6cb 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java @@ -48,9 +48,10 @@ public final class TaskManager { "server-info-db-worker-%d"; public static final String TASK_SCHEDULER = "task-scheduler-%d"; - static final long SCHEDULE_PERIOD = 1000L; // unit ms + protected static final long SCHEDULE_PERIOD = 1000L; // unit ms private static final int THREADS = 4; + private static final int TX_CLOSE_TIMEOUT = 30; private static final TaskManager MANAGER = new TaskManager(THREADS); private final Map schedulers; @@ -135,7 +136,7 @@ private void closeTaskTx(HugeGraphParams graph) { graph.closeTx(); } else { Consumers.executeOncePerThread(this.taskExecutor, totalThreads, - graph::closeTx, 5, TimeUnit.SECONDS); + graph::closeTx, TX_CLOSE_TIMEOUT); } } catch (Exception e) { throw new HugeException("Exception when closing task tx", e); @@ -243,7 +244,7 @@ public int pendingTasks() { return size; } - void notifyNewTask(HugeTask task) { + protected void notifyNewTask(HugeTask task) { Queue queue = ((ThreadPoolExecutor) this.schedulerExecutor) .getQueue(); if (queue.size() <= 1) { @@ -358,11 +359,11 @@ private void scheduleOrExecuteJobForGraph(StandardTaskScheduler scheduler) { private static final ThreadLocal CONTEXTS = new ThreadLocal<>(); - static void setContext(String context) { + protected static void setContext(String context) { CONTEXTS.set(context); } - static void resetContext() { + protected static void resetContext() { CONTEXTS.remove(); } diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java index b31ccbef49..b566ad5c29 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java @@ -155,7 +155,7 @@ protected void traverseIdsByBfs(Iterator vertices, EdgesIterator edgeIter = edgesOfVertices(vertices, dir, labels, degree); // parallel out-of-order execution - this.traverseByBatch(edgeIter, edgeIterConsumer, "traverse-ite-edge", 1); + this.traverseByBatch(edgeIter, edgeIterConsumer, "traverse-bfs-step", 1); } protected void traverseIdsByBfs(Iterator vertices, @@ -174,7 +174,7 @@ protected void traverseIdsByBfs(Iterator vertices, EdgesIterator edgeIter = new EdgesIterator(queryIterator); // parallel out-of-order execution - this.traverseByBatch(edgeIter, edgeIterConsumer, "traverse-ite-edge", 1); + this.traverseByBatch(edgeIter, edgeIterConsumer, "traverse-bfs-steps", 1); } protected long traverseByBatch(Iterator> sources, @@ -224,14 +224,14 @@ private long startConsumers(Iterator> sources, } private Consumers> buildConsumers(Consumer> consumer, - int queueWorkerSize, + int queueSizePerWorker, AtomicBoolean done, ExecutorService executor) { return new Consumers<>(executor, consumer, null, e -> done.set(true), - queueWorkerSize); + queueSizePerWorker); } protected Iterator filter(Iterator vertices, @@ -297,22 +297,22 @@ public ConcurrentVerticesConsumer(Id sourceV, Set excluded, long limit, @Override public void accept(EdgeId edgeId) { - if (limit != NO_LIMIT && count.get() >= limit) { + if (this.limit != NO_LIMIT && count.get() >= this.limit) { throw new Consumers.StopExecution("reach limit"); } Id targetV = edgeId.otherVertexId(); - if (sourceV.equals(targetV)) { + if (this.sourceV.equals(targetV)) { return; } - if (excluded != null && excluded.contains(targetV)) { + if (this.excluded != null && this.excluded.contains(targetV)) { return; } - if (neighbors.add(targetV)) { - if (limit != NO_LIMIT) { - count.getAndIncrement(); + if (this.neighbors.add(targetV)) { + if (this.limit != NO_LIMIT) { + this.count.getAndIncrement(); } } } @@ -340,11 +340,11 @@ public void accept(Iterator edgeIter) { break; } counter++; - consumer.accept(ids.next()); + this.consumer.accept(ids.next()); } long total = edgeIterCounter.addAndGet(counter); // traverse by batch & improve performance - if (this.capacity != NO_LIMIT && total >= capacity) { + if (this.capacity != NO_LIMIT && total >= this.capacity) { throw new Consumers.StopExecution("reach capacity"); } } @@ -374,7 +374,7 @@ public StepsEdgeIterConsumer(Consumer consumer, long capacity, @Override protected Iterator prepare(Iterator edgeIter) { - return edgesOfVertexStep(edgeIter, steps); + return edgesOfVertexStep(edgeIter, this.steps); } } } diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java b/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java index e027c5e963..8796730973 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java @@ -43,7 +43,7 @@ public final class Consumers { public static final int THREADS = 4 + CoreOptions.CPUS / 4; public static final int QUEUE_WORKER_SIZE = 1000; public static final long CONSUMER_WAKE_PERIOD = 1; - private static final Object QUEUE_END = new VWrapper(null); + private static final Object QUEUE_END = new Object(); private static final Logger LOG = Log.logger(Consumers.class); @@ -232,7 +232,7 @@ public void await() throws Throwable { this.latch.await(); } catch (InterruptedException e) { String error = "Interrupted while waiting for consumers"; - for (Future f: this.runningFutures) { + for (Future f : this.runningFutures) { f.cancel(true); } this.exception = new HugeException(error, e); @@ -252,8 +252,7 @@ public ExecutorService executor() { public static void executeOncePerThread(ExecutorService executor, int totalThreads, Runnable callback, - int invokeTimeout, - TimeUnit unit) + int invokeTimeout) throws InterruptedException { // Ensure callback execute at least once for every thread final Map threadsTimes = new ConcurrentHashMap<>(); @@ -282,7 +281,7 @@ public static void executeOncePerThread(ExecutorService executor, for (int i = 0; i < totalThreads; i++) { tasks.add(task); } - executor.invokeAll(tasks, invokeTimeout, unit); + executor.invokeAll(tasks, invokeTimeout, TimeUnit.SECONDS); } public static ExecutorService newThreadPool(String prefix, int workers) { diff --git a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java index b26fba78ba..8e847995f9 100644 --- a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java +++ b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java @@ -94,6 +94,7 @@ public abstract class RocksDBStore extends AbstractBackendStore Date: Sun, 17 Sep 2023 11:46:59 +0800 Subject: [PATCH 08/11] fix: change the type of 'QUEUE_END' to 'VWrapper' --- .../src/main/java/org/apache/hugegraph/util/Consumers.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java b/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java index 8796730973..029780ac86 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java @@ -43,7 +43,7 @@ public final class Consumers { public static final int THREADS = 4 + CoreOptions.CPUS / 4; public static final int QUEUE_WORKER_SIZE = 1000; public static final long CONSUMER_WAKE_PERIOD = 1; - private static final Object QUEUE_END = new Object(); + private static final Object QUEUE_END = new VWrapper<>(null); private static final Logger LOG = Log.logger(Consumers.class); From d0b96c584c37e13a99eac0ea8671ef804bb3d571 Mon Sep 17 00:00:00 2001 From: wucc <77946882+DanGuge@users.noreply.github.com> Date: Sun, 17 Sep 2023 16:02:29 +0800 Subject: [PATCH 09/11] fix: code style & change BlockingQueue element type to V --- .../backend/query/EdgesQueryIterator.java | 2 +- .../apache/hugegraph/task/TaskManager.java | 3 +-- .../traversal/algorithm/HugeTraverser.java | 18 +++++++------ .../traversal/algorithm/OltpTraverser.java | 7 ++--- .../org/apache/hugegraph/util/Consumers.java | 27 +++++++------------ .../backend/store/rocksdb/RocksDBStore.java | 8 +++--- 6 files changed, 30 insertions(+), 35 deletions(-) diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/query/EdgesQueryIterator.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/query/EdgesQueryIterator.java index cf773b4474..4ab9a8859a 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/query/EdgesQueryIterator.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/query/EdgesQueryIterator.java @@ -25,6 +25,7 @@ import org.apache.hugegraph.type.define.Directions; public class EdgesQueryIterator implements Iterator { + private final List labels; private final Directions directions; private final long limit; @@ -60,5 +61,4 @@ public Query next() { } return query; } - } diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java b/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java index e85aace6cb..0ad96f443c 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java @@ -49,9 +49,8 @@ public final class TaskManager { public static final String TASK_SCHEDULER = "task-scheduler-%d"; protected static final long SCHEDULE_PERIOD = 1000L; // unit ms - + private static final long TX_CLOSE_TIMEOUT = 30L; // unit s private static final int THREADS = 4; - private static final int TX_CLOSE_TIMEOUT = 30; private static final TaskManager MANAGER = new TaskManager(THREADS); private final Map schedulers; diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/HugeTraverser.java b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/HugeTraverser.java index f5df51ddff..60b4ed50df 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/HugeTraverser.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/HugeTraverser.java @@ -1017,29 +1017,31 @@ public Set getEdges(Iterator vertexIter) { } public class EdgesIterator implements Iterator>, Closeable { - private final Iterator> currentIt; - public EdgesIterator(EdgesQueryIterator queryIterator) { + private final Iterator> currentIter; + + public EdgesIterator(EdgesQueryIterator queries) { List> iteratorList = new ArrayList<>(); - while (queryIterator.hasNext()) { - iteratorList.add(graph().edges(queryIterator.next())); + while (queries.hasNext()) { + iteratorList.add( + graph().edges(queries.next())); } - this.currentIt = iteratorList.iterator(); + this.currentIter = iteratorList.iterator(); } @Override public boolean hasNext() { - return this.currentIt.hasNext(); + return this.currentIter.hasNext(); } @Override public Iterator next() { - return this.currentIt.next(); + return this.currentIter.next(); } @Override public void close() throws IOException { - CloseableIterator.closeIterator(currentIt); + CloseableIterator.closeIterator(currentIter); } } } diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java index b566ad5c29..0149c9796b 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java @@ -148,8 +148,8 @@ protected void traverseIdsByBfs(Iterator vertices, long degree, long capacity, Consumer consumer) { - List labels = - label == null ? Collections.emptyList() : Collections.singletonList(label); + List labels = label == null ? Collections.emptyList() : + Collections.singletonList(label); OneStepEdgeIterConsumer edgeIterConsumer = new OneStepEdgeIterConsumer(consumer, capacity); EdgesIterator edgeIter = edgesOfVertices(vertices, dir, labels, degree); @@ -285,7 +285,6 @@ public static class ConcurrentVerticesConsumer implements Consumer { private final Set excluded; private final Set neighbors; private final long limit; - private final AtomicInteger count = new AtomicInteger(0); public ConcurrentVerticesConsumer(Id sourceV, Set excluded, long limit, Set neighbors) { @@ -295,6 +294,8 @@ public ConcurrentVerticesConsumer(Id sourceV, Set excluded, long limit, this.neighbors = neighbors; } + private final AtomicInteger count = new AtomicInteger(0); + @Override public void accept(EdgeId edgeId) { if (this.limit != NO_LIMIT && count.get() >= this.limit) { diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java b/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java index 029780ac86..5e1bcd74fc 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java @@ -43,10 +43,10 @@ public final class Consumers { public static final int THREADS = 4 + CoreOptions.CPUS / 4; public static final int QUEUE_WORKER_SIZE = 1000; public static final long CONSUMER_WAKE_PERIOD = 1; - private static final Object QUEUE_END = new VWrapper<>(null); private static final Logger LOG = Log.logger(Consumers.class); + private final V QUEUE_END = (V) new Object(); private final ExecutorService executor; private final Consumer consumer; private final Runnable doneHandle; @@ -55,7 +55,7 @@ public final class Consumers { private final List runningFutures; private final int queueSize; private final CountDownLatch latch; - private final BlockingQueue> queue; + private final BlockingQueue queue; private volatile Throwable exception = null; public Consumers(ExecutorService executor, Consumer consumer) { @@ -104,7 +104,8 @@ public void start(String name) { LOG.info("Starting {} workers[{}] with queue size {}...", this.workers, name, this.queueSize); for (int i = 0; i < this.workers; i++) { - this.runningFutures.add(this.executor.submit(new ContextCallable<>(this::runAndDone))); + this.runningFutures.add( + this.executor.submit(new ContextCallable<>(this::runAndDone))); } } @@ -116,7 +117,7 @@ private Void runAndDone() { this.queue.clear(); putQueueEnd(); } else { - // Only the first exception of one thread can be stored + // Only the first exception to one thread can be stored this.exception = e; LOG.error("Error when running task", e); } @@ -138,7 +139,7 @@ private void run() { } private boolean consume() { - VWrapper elem = null; + V elem = null; while (elem == null) { try { elem = this.queue.poll(CONSUMER_WAKE_PERIOD, TimeUnit.MILLISECONDS); @@ -153,7 +154,7 @@ private boolean consume() { return false; } // do job - this.consumer.accept(elem.v); + this.consumer.accept(elem); return true; } @@ -205,7 +206,7 @@ public void provide(V v) throws Throwable { throw this.throwException(); } else { try { - this.queue.put(new VWrapper<>(v)); + this.queue.put(v); } catch (InterruptedException e) { LOG.warn("Interrupted while enqueue", e); } @@ -215,7 +216,7 @@ public void provide(V v) throws Throwable { private void putQueueEnd() { if (this.executor != null) { try { - this.queue.put((VWrapper) QUEUE_END); + this.queue.put(QUEUE_END); } catch (InterruptedException e) { LOG.warn("Interrupted while enqueue", e); } @@ -252,7 +253,7 @@ public ExecutorService executor() { public static void executeOncePerThread(ExecutorService executor, int totalThreads, Runnable callback, - int invokeTimeout) + long invokeTimeout) throws InterruptedException { // Ensure callback execute at least once for every thread final Map threadsTimes = new ConcurrentHashMap<>(); @@ -373,12 +374,4 @@ public StopExecution(String message, Object... args) { super(message, args); } } - - public static class VWrapper { - public V v; - - public VWrapper(V v) { - this.v = v; - } - } } diff --git a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java index 8e847995f9..283baa622a 100644 --- a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java +++ b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java @@ -93,8 +93,8 @@ public abstract class RocksDBStore extends AbstractBackendStore Date: Tue, 19 Sep 2023 16:22:57 +0800 Subject: [PATCH 10/11] fix: bug when edge_steps is empty, we should filter nothing --- .../apache/hugegraph/traversal/algorithm/HugeTraverser.java | 5 +++++ .../apache/hugegraph/traversal/algorithm/steps/Steps.java | 4 ++++ 2 files changed, 9 insertions(+) diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/HugeTraverser.java b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/HugeTraverser.java index 60b4ed50df..9740811cc7 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/HugeTraverser.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/HugeTraverser.java @@ -485,6 +485,11 @@ public Iterator edgesOfVertex(Id source, Steps steps) { cq.limit(steps.limit()); } + if (steps.isEdgeEmpty()) { + Iterator edges = this.graph().edges(cq); + return edgesOfVertexStep(edges, steps); + } + Map edgeConditions = getFilterQueryConditions(steps.edgeSteps(), HugeType.EDGE); diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/steps/Steps.java b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/steps/Steps.java index d1a9238be1..c2a1a7e1e1 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/steps/Steps.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/steps/Steps.java @@ -138,6 +138,10 @@ public List edgeLabels() { return new ArrayList<>(this.edgeSteps.keySet()); } + public boolean isEdgeEmpty() { + return this.edgeSteps.isEmpty(); + } + public boolean isVertexEmpty() { return this.vertexSteps.isEmpty(); } From 23f48caa9196e1156374ece5d12de2f9793773fb Mon Sep 17 00:00:00 2001 From: wucc <77946882+DanGuge@users.noreply.github.com> Date: Thu, 21 Sep 2023 15:29:16 +0800 Subject: [PATCH 11/11] fix: code style --- .../apache/hugegraph/traversal/algorithm/HugeTraverser.java | 4 ++-- .../apache/hugegraph/traversal/algorithm/OltpTraverser.java | 4 ++-- .../src/main/java/org/apache/hugegraph/util/Consumers.java | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/HugeTraverser.java b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/HugeTraverser.java index 9740811cc7..194576e857 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/HugeTraverser.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/HugeTraverser.java @@ -1028,8 +1028,8 @@ public class EdgesIterator implements Iterator>, Closeable { public EdgesIterator(EdgesQueryIterator queries) { List> iteratorList = new ArrayList<>(); while (queries.hasNext()) { - iteratorList.add( - graph().edges(queries.next())); + Iterator edges = graph.edges(queries.next()); + iteratorList.add(edges); } this.currentIter = iteratorList.iterator(); } diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java index 0149c9796b..c05d8f89f4 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java @@ -285,6 +285,7 @@ public static class ConcurrentVerticesConsumer implements Consumer { private final Set excluded; private final Set neighbors; private final long limit; + private final AtomicInteger count; public ConcurrentVerticesConsumer(Id sourceV, Set excluded, long limit, Set neighbors) { @@ -292,10 +293,9 @@ public ConcurrentVerticesConsumer(Id sourceV, Set excluded, long limit, this.excluded = excluded; this.limit = limit; this.neighbors = neighbors; + this.count = new AtomicInteger(0); } - private final AtomicInteger count = new AtomicInteger(0); - @Override public void accept(EdgeId edgeId) { if (this.limit != NO_LIMIT && count.get() >= this.limit) { diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java b/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java index 5e1bcd74fc..06e678fd98 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java @@ -208,7 +208,7 @@ public void provide(V v) throws Throwable { try { this.queue.put(v); } catch (InterruptedException e) { - LOG.warn("Interrupted while enqueue", e); + LOG.warn("Interrupt while queuing QUEUE_END", e); } } }