Skip to content

Commit

Permalink
fix(interactive): Add Unit Tests for Graph Optimizer in Concurrent Sc…
Browse files Browse the repository at this point in the history
…enario (#4274)

<!--
Thanks for your contribution! please review
https://github.com/alibaba/GraphScope/blob/main/CONTRIBUTING.md before
opening an issue.
-->

## What do these changes do?
1. Add `close()` interface in GraphOptimizer to shutdown
`CleanScheduler` (used to clean memory occupation by CBO optimizer
periodically) when compiler service stops.
2. Add `ConcurrentBITest` to run BI Tests in parallel, to verify the
optimizer's functionality in concurrent cases.

<!-- Please give a short brief about these changes. -->

## Related issue number

<!-- Are there any issues opened that will be resolved by merging this
change? -->

Fixes
  • Loading branch information
shirly121 authored Oct 18, 2024
1 parent 0052971 commit 542cb16
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ public void close() throws Exception {
if (!FrontendConfig.GREMLIN_SERVER_DISABLED.get(configs) && this.gremlinServer != null) {
this.gremlinServer.close();
}
if (this.optimizer != null) {
this.optimizer.close();
}
}

public static void main(String[] args) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,14 @@
import org.apache.calcite.tools.RelBuilderFactory;
import org.checkerframework.checker.nullness.qual.Nullable;

import java.io.Closeable;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Optimize graph relational tree which consists of match and other relational operators
*/
public class GraphRelOptimizer {
public class GraphRelOptimizer implements Closeable {
private final PlannerConfig config;
private final RelBuilderFactory relBuilderFactory;
private final GlogueHolder glogueHolder;
Expand Down Expand Up @@ -97,6 +98,13 @@ public RelNode optimize(RelNode before, GraphIOProcessor ioProcessor) {
return null;
}

@Override
public void close() {
if (this.plannerGroupManager != null) {
this.plannerGroupManager.close();
}
}

public static class MatchOptimizer extends GraphShuttle {
private final GraphIOProcessor ioProcessor;
private final RelOptPlanner matchPlanner;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public abstract class PlannerGroupManager {
public abstract class PlannerGroupManager implements Closeable {
protected final PlannerConfig config;
protected final RelBuilderFactory relBuilderFactory;

Expand All @@ -40,6 +41,9 @@ public PlannerGroupManager(PlannerConfig config, RelBuilderFactory relBuilderFac
this.relBuilderFactory = relBuilderFactory;
}

@Override
public void close() {}

public abstract PlannerGroup getCurrentGroup();

public static class Static extends PlannerGroupManager {
Expand Down Expand Up @@ -103,5 +107,17 @@ public PlannerGroup getCurrentGroup() {
int groupId = (int) Thread.currentThread().getId() % plannerGroups.size();
return plannerGroups.get(groupId);
}

@Override
public void close() {
try {
if (this.clearScheduler != null) {
this.clearScheduler.shutdown();
this.clearScheduler.awaitTermination(10 * 1000, TimeUnit.MILLISECONDS);
}
} catch (Exception e) {
logger.error("failed to close planner group manager.", e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
*
* * Copyright 2020 Alibaba Group Holding Limited.
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/

package com.alibaba.graphscope.common;

import org.apache.tinkerpop.gremlin.process.GremlinProcessRunner;
import org.junit.runners.model.InitializationError;
import org.junit.runners.model.RunnerScheduler;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* A JUnit runner that runs tests concurrently, the default thread pool size is 4.
*/
public class ConcurrentProcessRunner extends GremlinProcessRunner {
private final ExecutorService executorService;

public ConcurrentProcessRunner(Class<?> klass) throws InitializationError {
super(klass);
this.executorService = Executors.newFixedThreadPool(4);
RunnerScheduler scheduler =
new RunnerScheduler() {
@Override
public void schedule(Runnable childStatement) {
executorService.submit(childStatement);
}

@Override
public void finished() {
try {
executorService.shutdown();
executorService.awaitTermination(120, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
};
setScheduler(scheduler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.collect.ImmutableMap;

import org.apache.calcite.rel.RelNode;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
Expand Down Expand Up @@ -56,6 +57,13 @@ public static void beforeClass() {
optimizer.getGlogueHolder());
}

@AfterClass
public static void afterClass() {
if (optimizer != null) {
optimizer.close();
}
}

@Test
public void bi1_test() {
GraphBuilder builder = Utils.mockGraphBuilder(optimizer, irMeta);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
*
* * Copyright 2020 Alibaba Group Holding Limited.
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/

package com.alibaba.graphscope.common.ir.planner.cbo;

import com.alibaba.graphscope.common.ConcurrentProcessRunner;

import org.junit.runner.RunWith;

// run BI Tests in parallel, to check thread-safety of the CBO optimization
// ConcurrentProcessRunner is a custom runner that runs tests in parallel
@RunWith(ConcurrentProcessRunner.class)
public class ConcurrentBITest extends BITest {}

0 comments on commit 542cb16

Please sign in to comment.