Skip to content

Commit

Permalink
merge traverser logic no matter if supporting paging
Browse files Browse the repository at this point in the history
Change-Id: I265f6f226dfe19f6ab66d5eaae6d5f969f591949
  • Loading branch information
zhoney committed Apr 3, 2019
1 parent 1340de8 commit 4546c6b
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

public final class IdHolderList extends ArrayList<IdHolder> {

private static final long serialVersionUID = -738694176552424990L;

private final boolean paging;

public IdHolderList(boolean paging) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Iterator;
import java.util.NoSuchElementException;

import com.baidu.hugegraph.backend.query.Query;
import com.baidu.hugegraph.backend.store.BackendEntry;
import com.baidu.hugegraph.exception.NotSupportException;
import com.baidu.hugegraph.iterator.Metadatable;
Expand All @@ -44,7 +45,7 @@ public PageEntryIterator(QueryList queries, long pageSize) {
}

private PageState parsePageState() {
String page = this.queries.parent().page();
String page = this.queries.parent().pageWithoutCheck();
PageState pageState = PageState.fromString(page);
E.checkState(pageState.offset() < this.queries.total(),
"Invalid page '%s' with an offset '%s' exceeds " +
Expand All @@ -61,13 +62,15 @@ public boolean hasNext() {
}

private boolean fetch() {
if (this.remaining <= 0 ||
if ((this.remaining != Query.NO_LIMIT && this.remaining <= 0L) ||
this.pageState.offset() >= this.queries.total()) {
return false;
}

long pageSize = this.remaining < this.pageSize ?
this.remaining : this.pageSize;
long pageSize = this.pageSize;
if (this.remaining != Query.NO_LIMIT && this.remaining < pageSize) {
pageSize = this.remaining;
}
this.results = this.queries.fetchNext(this.pageState, pageSize);
assert this.results != null;

Expand All @@ -90,7 +93,10 @@ public BackendEntry next() {
throw new NoSuchElementException();
}
BackendEntry entry = this.results.iterator().next();
this.remaining--;
if (this.remaining != Query.NO_LIMIT) {
// Assume one result in each entry (just for index query)
this.remaining--;
}
return entry;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public boolean empty() {
}

public Iterator<BackendEntry> fetch() {
assert !queries.isEmpty();
assert !this.queries.isEmpty();
if (this.parent.paging()) {
int pageSize = this.graph.configuration()
.get(CoreOptions.QUERY_PAGE_SIZE);
Expand Down Expand Up @@ -122,6 +122,7 @@ protected PageIterator fetchNext(PageState pageState, long pageSize) {
return query.iterator(offset - current, pageState.page(), pageSize);
}

@SuppressWarnings("unused")
private static Set<Id> limit(Set<Id> ids, Query query) {
long fromIndex = query.offset();
E.checkArgument(fromIndex <= Integer.MAX_VALUE,
Expand Down Expand Up @@ -167,7 +168,7 @@ private interface QueryHolder {
}

/**
* Generate queries from index.optimizeQuery()
* Generate queries from tx.optimizeQuery()
*/
private class OptimizedQuery implements QueryHolder {

Expand All @@ -183,9 +184,14 @@ public Iterator<BackendEntry> iterator() {

public PageIterator iterator(int index, String page, long pageSize) {
assert index == 0;
this.query.page(page);
Iterator<BackendEntry> iterator = fetcher.apply(this.query);
// Must iterate all entries before getting the next page info
Query query = this.query.copy();
query.page(page);
// Not set limit to pageSize due to PageEntryIterator.remaining
if (this.query.limit() == Query.NO_LIMIT) {
query.limit(pageSize);
}
Iterator<BackendEntry> iterator = fetcher.apply(query);
// Must iterate all entries before get the next page
List<BackendEntry> results = IteratorUtils.list(iterator);
return new PageIterator(results.iterator(),
PageState.page(iterator));
Expand All @@ -197,7 +203,7 @@ public int total() {
}

/**
* Generate queries from index.indexQuery()
* Generate queries from tx.indexQuery()
*/
private class IndexQuery implements QueryHolder {

Expand All @@ -214,10 +220,15 @@ public Iterator<BackendEntry> iterator() {
return null;
}
Set<Id> ids = holder.ids();
if (ids.size() > parent.limit()) {
ids = limit(ids, parent);
if (parent.limit() != Query.NO_LIMIT &&
ids.size() > parent.limit()) {
/*
* Avoid too many ids in one time query,
* Assume it will get one result by each id
*/
ids = CollectionUtil.subSet(ids, 0, (int) parent.limit());
}
IdQuery query = new IdQuery(parent.resultType(), ids);
IdQuery query = new IdQuery(parent, ids);
return fetcher.apply(query);
});
}
Expand All @@ -228,7 +239,7 @@ public PageIterator iterator(int index, String page, long pageSize) {
if (pageIds.empty()) {
return PageIterator.EMPTY;
}
IdQuery query = new IdQuery(parent.resultType(), pageIds.ids());
IdQuery query = new IdQuery(parent, pageIds.ids());
return new PageIterator(fetcher.apply(query), pageIds.page());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class Query implements Cloneable {
private String page;
private long capacity;
private boolean showHidden;
private boolean includeDeleting;
private boolean showDeleting;

private Query originQuery;

Expand All @@ -71,7 +71,7 @@ public Query(HugeType resultType, Query originQuery) {
this.capacity = defaultCapacity();

this.showHidden = false;
this.includeDeleting = false;
this.showDeleting = false;
}

public HugeType resultType() {
Expand Down Expand Up @@ -176,6 +176,10 @@ public String page() {
return this.page;
}

public String pageWithoutCheck() {
return this.page;
}

public void page(String page) {
this.page = page;
}
Expand Down Expand Up @@ -214,12 +218,12 @@ public void showHidden(boolean showHidden) {
this.showHidden = showHidden;
}

public boolean includeDeleting() {
return this.includeDeleting;
public boolean showDeleting() {
return this.showDeleting;
}

public void includeDeleting(boolean includeDeleting) {
this.includeDeleting = includeDeleting;
public void showDeleting(boolean showDeleting) {
this.showDeleting = showDeleting;
}

public Set<Id> ids() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ private List<IdHolder> queryByLabel(ConditionQuery query) {
indexQuery.eq(HugeKeys.INDEX_LABEL_ID, il.id());
indexQuery.eq(HugeKeys.FIELD_VALUES, label);
// Set offset and limit to avoid redundant element ids
indexQuery.page(query.page());
indexQuery.page(query.pageWithoutCheck());
indexQuery.limit(query.limit());
indexQuery.offset(query.offset());
indexQuery.capacity(query.capacity());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.baidu.hugegraph.backend.id.Id;
import com.baidu.hugegraph.backend.id.SplicingIdGenerator;
import com.baidu.hugegraph.backend.page.IdHolder;
import com.baidu.hugegraph.backend.page.PageState;
import com.baidu.hugegraph.backend.page.QueryList;
import com.baidu.hugegraph.backend.query.Condition;
import com.baidu.hugegraph.backend.query.ConditionQuery;
Expand Down Expand Up @@ -485,7 +486,7 @@ public Iterator<Vertex> queryVertices(Query query) {
}
// Filter vertices of deleting vertex label
if (vertex.schemaLabel().status() == SchemaStatus.DELETING &&
!query.includeDeleting()) {
!query.showDeleting()) {
return false;
}
// Process results that query from left index or primary-key
Expand Down Expand Up @@ -616,7 +617,7 @@ public Iterator<Edge> queryEdges(Query query) {
}
// Filter edges of deleting edge label
if (edge.schemaLabel().status() == SchemaStatus.DELETING &&
!query.includeDeleting()) {
!query.showDeleting()) {
return false;
}
// Process results that query from left index
Expand Down Expand Up @@ -1411,131 +1412,59 @@ public void removeEdges(EdgeLabel edgeLabel) {

public void traverseVerticesByLabel(VertexLabel label,
Consumer<Vertex> consumer,
boolean remove) {
this.traverseByLabel(label, this::queryVertices, consumer, remove);
boolean deleting) {
this.traverseByLabel(label, this::queryVertices, consumer, deleting);
}

public void traverseEdgesByLabel(EdgeLabel label, Consumer<Edge> consumer,
boolean remove) {
this.traverseByLabel(label, this::queryEdges, consumer, remove);
boolean deleting) {
this.traverseByLabel(label, this::queryEdges, consumer, deleting);
}

private <T> void traverseByLabel(SchemaLabel label,
Function<Query, Iterator<T>> fetcher,
Consumer<T> consumer, boolean remove) {
if (!this.store().features().supportsQueryByPage()) {
this.traverseByLabelWithoutPage(label, fetcher, consumer, remove);
return;
}
if (!label.enableLabelIndex()) {
this.traverseByLabelWithoutIndexInPage(label, fetcher,
consumer, remove);
} else {
this.traverseByLabelWithIndexInPage(label, fetcher,
consumer, remove);
}
}

private <T> void traverseByLabelWithoutPage(
SchemaLabel label, Function<Query, Iterator<T>> fetcher,
Consumer<T> consumer, boolean remove) {
Consumer<T> consumer, boolean deleting) {
HugeType type = label.type() == HugeType.VERTEX_LABEL ?
HugeType.VERTEX : HugeType.EDGE;
ConditionQuery query = new ConditionQuery(type);
// Whether query system vertices
if (label.hidden()) {
query.showHidden(true);
}
query.includeDeleting(remove);
// Not support label index, query all and filter by label
if (!label.enableLabelIndex()) {
query.capacity(Query.NO_CAPACITY);
Iterator<T> itor = fetcher.apply(query);
while (itor.hasNext()) {
T e = itor.next();
SchemaLabel elemLabel = ((HugeElement) e).schemaLabel();
if (label.equals(elemLabel)) {
consumer.accept(e);
}
}
return;
}

/*
* Support label index, query by label. Set limit&capacity to
* Query.DEFAULT_CAPACITY to limit elements number per pass
*/
query.limit(Query.DEFAULT_CAPACITY);
Query query = label.enableLabelIndex() ?
new ConditionQuery(type) :
new Query(type);
query.capacity(Query.NO_CAPACITY);
query.eq(HugeKeys.LABEL, label.id());
int pass = 0;
int counter;
do {
if (!remove) {
query.offset(pass++ * Query.DEFAULT_CAPACITY);
}
// Process every element in current batch
Iterator<T> itor = fetcher.apply(query);
for (counter = 0; itor.hasNext(); ++counter) {
consumer.accept(itor.next());
}
assert counter <= Query.DEFAULT_CAPACITY;
} while (counter == Query.DEFAULT_CAPACITY); // If not, means finish
}

private <T> void traverseByLabelWithIndexInPage(
SchemaLabel label, Function<Query, Iterator<T>> fetcher,
Consumer<T> consumer, boolean remove) {
HugeType type = label.type() == HugeType.VERTEX_LABEL ?
HugeType.VERTEX : HugeType.EDGE;
ConditionQuery query = new ConditionQuery(type);
query.eq(HugeKeys.LABEL, label.id());
query.limit(TRAVERSE_BATCH);
// Whether query system vertices
query.limit(Query.NO_LIMIT);
if (this.store().features().supportsQueryByPage()) {
query.page(PageState.PAGE_NONE);
}
if (label.hidden()) {
query.showHidden(true);
}
query.includeDeleting(remove);
query.showDeleting(deleting);

Iterator<T> itor;
String page = "";
while (page != null) {
query.page(page);
itor = fetcher.apply(query);
if (label.enableLabelIndex()) {
// Support label index, query by label index
((ConditionQuery) query).eq(HugeKeys.LABEL, label.id());
Iterator<T> itor = fetcher.apply(query);
while (itor.hasNext()) {
consumer.accept(itor.next());
}
page = (String) ((Metadatable) itor).metadata("page");
}
}

private <T> void traverseByLabelWithoutIndexInPage(
SchemaLabel label, Function<Query, Iterator<T>> fetcher,
Consumer<T> consumer, boolean remove) {
HugeType type = label.type() == HugeType.VERTEX_LABEL ?
HugeType.VERTEX : HugeType.EDGE;
Query query = new Query(type);
query.limit(TRAVERSE_BATCH);
// Whether query system vertices
if (label.hidden()) {
query.showHidden(true);
}
query.includeDeleting(remove);

Iterator<T> itor;
// Not support label index, query all and filter by label
String page = "";
while (page != null) {
query.page(page);
itor = fetcher.apply(query);
while (itor.hasNext()) {
T e = itor.next();
SchemaLabel elemLabel = ((HugeElement) e).schemaLabel();
if (label.equals(elemLabel)) {
consumer.accept(e);
}
} else {
// Not support label index, query all and filter by label
if (query.paging()) {
query.limit(TRAVERSE_BATCH);
}
page = (String) ((Metadatable) itor).metadata("page");
String page = null;
do {
Iterator<T> itor = fetcher.apply(query);
while (itor.hasNext()) {
T e = itor.next();
SchemaLabel elemLabel = ((HugeElement) e).schemaLabel();
if (label.equals(elemLabel)) {
consumer.accept(e);
}
}
if (query.paging()) {
page = (String) ((Metadatable) itor).metadata("page");
}
} while (page != null);
}
}
}

0 comments on commit 4546c6b

Please sign in to comment.