Skip to content

Commit

Permalink
remove left index in async mode
Browse files Browse the repository at this point in the history
implemented: #247

Change-Id: Ie1baacb93f18d66ed7169161f15f398701e955a3
  • Loading branch information
zhoney committed Dec 26, 2018
1 parent 2480347 commit d2ed391
Show file tree
Hide file tree
Showing 4 changed files with 254 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Vertex;

import com.baidu.hugegraph.HugeException;
import com.baidu.hugegraph.HugeGraph;
Expand All @@ -45,6 +49,8 @@
import com.baidu.hugegraph.backend.store.BackendEntry;
import com.baidu.hugegraph.backend.store.BackendStore;
import com.baidu.hugegraph.exception.NoIndexException;
import com.baidu.hugegraph.job.EphemeralJob;
import com.baidu.hugegraph.job.EphemeralJobBuilder;
import com.baidu.hugegraph.perf.PerfUtil.Watched;
import com.baidu.hugegraph.schema.IndexLabel;
import com.baidu.hugegraph.schema.PropertyKey;
Expand All @@ -54,6 +60,7 @@
import com.baidu.hugegraph.structure.HugeIndex;
import com.baidu.hugegraph.structure.HugeProperty;
import com.baidu.hugegraph.structure.HugeVertex;
import com.baidu.hugegraph.task.HugeTask;
import com.baidu.hugegraph.type.HugeType;
import com.baidu.hugegraph.type.define.HugeKeys;
import com.baidu.hugegraph.type.define.IndexType;
Expand All @@ -79,6 +86,18 @@ public GraphIndexTransaction(HugeGraph graph, BackendStore store) {
assert this.textAnalyzer != null;
}

protected Id asyncRemoveIndexLeft(ConditionQuery query,
HugeElement element) {
RemoveLeftIndexCallable callable = new RemoveLeftIndexCallable(query,
element);
EphemeralJobBuilder<Object> builder =
EphemeralJobBuilder.of(this.graph())
.name(element.name())
.job(callable);
HugeTask<?> task = builder.schedule();
return task.id();
}

protected void removeIndexLeft(ConditionQuery query, HugeElement element) {
if (element.type() != HugeType.VERTEX &&
element.type() != HugeType.EDGE_OUT &&
Expand All @@ -88,7 +107,6 @@ protected void removeIndexLeft(ConditionQuery query, HugeElement element) {
element.type());
}

// TODO: remove left index in async thread
for (ConditionQuery cq: ConditionQueryFlatten.flatten(query)) {
// Process range index
this.processRangeIndexLeft(cq, element);
Expand Down Expand Up @@ -124,37 +142,33 @@ private void processRangeIndexLeft(ConditionQuery query,
index.resetElementIds();
index.elementIds(element.id());
this.doEliminate(this.serializer.writeIndex(index));
this.commit();
// If deleted by error, re-add deleted index again
if (this.deletedByError(q, element)) {
this.doAppend(this.serializer.writeIndex(index));
this.commit();
}
}
}
}
}

private void processSecondaryOrSearchIndexLeft(ConditionQuery query,
HugeElement element) {
HugeElement deletion = element.copyAsFresh();
Set<Id> propKeys = query.userpropKeys();
Set<Id> incorrectPKs = InsertionOrderUtil.newSet();
for (Id key : propKeys) {
Set<Object> conditionValues = query.userpropValues(key);
E.checkState(!conditionValues.isEmpty(),
"Expect user property values for key '%s', " +
"but got none", key);
if (conditionValues.size() > 1) {
// It's inside/between Query (processed in range index)
return;
}
Object propValue = deletion.getProperty(key).value();
Object conditionValue = conditionValues.iterator().next();
if (!propValue.equals(conditionValue)) {
PropertyKey pkey = this.graph().propertyKey(key);
deletion.addProperty(pkey, conditionValue);
incorrectPKs.add(key);
}
Map<PropertyKey, Object> incorrectPKs = InsertionOrderUtil.newMap();
HugeElement deletion = this.constructErrorElem(query, element,
incorrectPKs);
if (deletion == null) {
return;
}

// Delete unused index
for (IndexLabel il : relatedIndexLabels(deletion)) {
if (!CollectionUtil.hasIntersection(il.indexFields(), incorrectPKs)) {
Set<Id> incorrectPkIds = incorrectPKs.keySet().stream()
.map(PropertyKey::id)
.collect(Collectors.toSet());
Collection<Id> incorrectIndexFields = CollectionUtil.intersect(
il.indexFields(), incorrectPkIds);
if (incorrectIndexFields.isEmpty()) {
continue;
}
// Skip if search index is not wrong
Expand Down Expand Up @@ -182,6 +196,12 @@ private void processSecondaryOrSearchIndexLeft(ConditionQuery query,
*/
this.updateIndex(il.id(), element, false);
}
this.commit();
if (this.deletedByError(element, incorrectIndexFields,
incorrectPKs)) {
this.updateIndex(il.id(), deletion, false);
this.commit();
}
}
}

Expand Down Expand Up @@ -305,6 +325,70 @@ private void updateIndex(IndexLabel indexLabel, Object propValue,
}
}

private HugeElement constructErrorElem(
ConditionQuery query, HugeElement element,
Map<PropertyKey, Object> incorrectPKs) {
HugeElement errorElem = element.copyAsFresh();
Set<Id> propKeys = query.userpropKeys();
for (Id key : propKeys) {
Set<Object> conditionValues = query.userpropValues(key);
E.checkState(!conditionValues.isEmpty(),
"Expect user property values for key '%s', " +
"but got none", key);
if (conditionValues.size() > 1) {
// It's inside/between Query (processed in range index)
return null;
}
Object propValue = errorElem.getProperty(key).value();
Object conditionValue = conditionValues.iterator().next();
if (!propValue.equals(conditionValue)) {
PropertyKey pkey = this.graph().propertyKey(key);
errorElem.addProperty(pkey, conditionValue);
incorrectPKs.put(pkey, conditionValue);
}
}
return errorElem;
}

private boolean deletedByError(ConditionQuery query, HugeElement element) {
HugeElement elem = this.newestElement(element);
if (elem == null) {
return false;
}
return query.test(elem);
}

private boolean deletedByError(HugeElement element, Collection<Id> ilFields,
Map<PropertyKey, Object> incorrectPKs) {
HugeElement elem = this.newestElement(element);
for (Map.Entry<PropertyKey, Object> e : incorrectPKs.entrySet()) {
PropertyKey pk = e.getKey();
Object value = e.getValue();
if (ilFields.contains(pk.id()) &&
value.equals(elem.getPropertyValue(pk.id()))) {
return true;
}
}
return false;
}

private HugeElement newestElement(HugeElement element) {
boolean isVertex = element instanceof HugeVertex;
if (isVertex) {
Iterator<Vertex> iterV = this.graph().vertices(element.id());
if (iterV.hasNext()) {
return (HugeVertex) iterV.next();
}
} else {
assert element instanceof HugeEdge;
Iterator<Edge> iterE = this.graph().edges(element.id());
if (iterE.hasNext()) {
return (HugeEdge) iterE.next();
}
}
return null;
}

/**
* Composite index, an index involving multiple columns.
* Single index, an index involving only one column.
Expand Down Expand Up @@ -1090,10 +1174,39 @@ public static IndexQueries of(IndexLabel il, ConditionQuery query) {
}
}

public static enum OptimizedType {
public enum OptimizedType {
NONE,
PRIMARY_KEY,
SORT_KEY,
INDEX
}

public static class RemoveLeftIndexCallable extends EphemeralJob<Object> {

private static final String REMOVE_LEFT_INDEX = "remove_left_index";

private ConditionQuery query;
private HugeElement element;

private RemoveLeftIndexCallable(ConditionQuery query,
HugeElement element) {
E.checkArgumentNotNull(query, "query");
E.checkArgumentNotNull(element, "element");
this.query = query;
this.element = element;
}

@Override
public String type() {
return REMOVE_LEFT_INDEX;
}

@Override
public Object execute() {
GraphIndexTransaction tx = this.graph().graphTransaction()
.indexTransaction();
tx.removeIndexLeft(this.query, this.element);
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ protected void reset() {
}

@Override
protected AbstractTransaction indexTransaction() {
protected GraphIndexTransaction indexTransaction() {
return this.indexTx;
}

Expand Down Expand Up @@ -1184,7 +1184,7 @@ private boolean filterResultFromIndexQuery(Query query, HugeElement elem) {

if (cq.optimized() == OptimizedType.INDEX.ordinal()) {
LOG.info("Remove left index: {}, query: {}", elem, cq);
this.indexTx.removeIndexLeft(cq, elem);
this.indexTx.asyncRemoveIndexLeft(cq, elem);
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.job;

import com.baidu.hugegraph.task.TaskCallable;

public abstract class EphemeralJob<T> extends TaskCallable<T> {

public abstract String type();

public abstract T execute() throws Exception;

@Override
public T call() throws Exception {
return this.execute();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.job;

import com.baidu.hugegraph.HugeGraph;
import com.baidu.hugegraph.backend.id.Id;
import com.baidu.hugegraph.backend.id.IdGenerator;
import com.baidu.hugegraph.task.HugeTask;
import com.baidu.hugegraph.task.TaskScheduler;
import com.baidu.hugegraph.util.E;

public class EphemeralJobBuilder<T> {

private final HugeGraph graph;

private String name;
private String input;
private EphemeralJob<T> job;

private static int tempTaskId = 0;

public static <T> EphemeralJobBuilder<T> of(final HugeGraph graph) {
return new EphemeralJobBuilder<>(graph);
}

public EphemeralJobBuilder(final HugeGraph graph) {
this.graph = graph;
}

public EphemeralJobBuilder<T> name(String name) {
this.name = name;
return this;
}

public EphemeralJobBuilder<T> input(String input) {
this.input = input;
return this;
}

public EphemeralJobBuilder<T> job(EphemeralJob<T> job) {
this.job = job;
return this;
}

public HugeTask<T> schedule() {
E.checkArgumentNotNull(this.name, "Job name can't be null");
E.checkArgumentNotNull(this.job, "Job can't be null");

HugeTask<T> task = new HugeTask<>(this.genTaskId(), null, this.job);
task.type(this.job.type());
task.name(this.name);
if (this.input != null) {
task.input(this.input);
}

TaskScheduler scheduler = this.graph.taskScheduler();
scheduler.schedule(task);

return task;
}

private Id genTaskId() {
return IdGenerator.of(--tempTaskId);
}
}

0 comments on commit d2ed391

Please sign in to comment.