Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes and tests related to the Indexer process. #10631

Merged
merged 17 commits into from
Dec 9, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 87 additions & 34 deletions .travis.yml

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory;
import org.apache.druid.indexing.firehose.WindowedSegmentId;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.GranularityType;
Expand Down Expand Up @@ -82,7 +83,7 @@
/**
* Abstract class for batch tasks like {@link IndexTask}.
* Provides some methods such as {@link #determineSegmentGranularity}, {@link #findInputSegments},
* and {@link #determineLockGranularityandTryLock} for easily acquiring task locks.
* and {@link #determineLockGranularityAndTryLock} for easily acquiring task locks.
*/
public abstract class AbstractBatchIndexTask extends AbstractTask
{
Expand Down Expand Up @@ -122,6 +123,17 @@ protected AbstractBatchIndexTask(
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
if (taskLockHelper == null) {
// Subclasses generally use "isReady" to initialize the taskLockHelper. It's not guaranteed to be called before
// "run", and so we call it here to ensure it happens.
//
// We're only really calling it for its side effects, and we expect it to return "true". If it doesn't, something
// strange is going on, so bail out.
if (!isReady(toolbox.getTaskActionClient())) {
throw new ISE("Cannot start; not ready!");
}
}

synchronized (this) {
if (stopped) {
return TaskStatus.failure(getId());
Expand Down Expand Up @@ -251,9 +263,17 @@ public TaskLockHelper getTaskLockHelper()
}

/**
* Determine lockGranularity to use and try to acquire necessary locks.
* This method respects the value of 'forceTimeChunkLock' in task context.
* If it's set to false or missing, this method checks if this task can use segmentLock.
* Attempts to acquire a lock that covers the intervals specified in a certain granularitySpec.
*
* This method uses {@link GranularitySpec#bucketIntervals()} to get the list of intervals to lock, and passes them
* to {@link #determineLockGranularityAndTryLock(TaskActionClient, List)}.
*
* Will look at {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} to decide whether to acquire a time chunk or segment lock.
*
* If {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} is set, or if {@param intervals} is nonempty, then this method
* will initialize {@link #taskLockHelper} as a side effect.
*
* @return whether the lock was acquired
*/
protected boolean determineLockGranularityAndTryLock(
TaskActionClient client,
Expand All @@ -263,10 +283,20 @@ protected boolean determineLockGranularityAndTryLock(
final List<Interval> intervals = granularitySpec.bucketIntervals().isPresent()
? new ArrayList<>(granularitySpec.bucketIntervals().get())
: Collections.emptyList();
return determineLockGranularityandTryLock(client, intervals);
return determineLockGranularityAndTryLock(client, intervals);
}

boolean determineLockGranularityandTryLock(TaskActionClient client, List<Interval> intervals) throws IOException
/**
* Attempts to acquire a lock that covers certain intervals.
*
* Will look at {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} to decide whether to acquire a time chunk or segment lock.
*
* If {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} is set, or if {@param intervals} is nonempty, then this method
* will initialize {@link #taskLockHelper} as a side effect.
*
* @return whether the lock was acquired
*/
boolean determineLockGranularityAndTryLock(TaskActionClient client, List<Interval> intervals) throws IOException
{
final boolean forceTimeChunkLock = getContextValue(
Tasks.FORCE_TIME_CHUNK_LOCK_KEY,
Expand All @@ -287,12 +317,22 @@ boolean determineLockGranularityandTryLock(TaskActionClient client, List<Interva
taskLockHelper = new TaskLockHelper(result.lockGranularity == LockGranularity.SEGMENT);
return tryLockWithDetermineResult(client, result);
} else {
// This branch is the only one that will not initialize taskLockHelper.
return true;
}
}
}

boolean determineLockGranularityandTryLockWithSegments(
/**
* Attempts to acquire a lock that covers certain segments.
*
* Will look at {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} to decide whether to acquire a time chunk or segment lock.
*
* This method will initialize {@link #taskLockHelper} as a side effect.
*
* @return whether the lock was acquired
*/
boolean determineLockGranularityAndTryLockWithSegments(
TaskActionClient client,
List<DataSegment> segments,
BiConsumer<LockGranularity, List<DataSegment>> segmentCheckFunction
Expand Down Expand Up @@ -396,7 +436,8 @@ private LockGranularityDetermineResult determineSegmentGranularity(List<DataSegm
if (granularityFromSegments == null
|| segmentGranularityFromSpec != null
&& (!granularityFromSegments.equals(segmentGranularityFromSpec)
|| segments.stream().anyMatch(segment -> !segmentGranularityFromSpec.isAligned(segment.getInterval())))) {
|| segments.stream()
.anyMatch(segment -> !segmentGranularityFromSpec.isAligned(segment.getInterval())))) {
// This case is one of the followings:
// 1) Segments have different granularities.
// 2) Segment granularity in ingestion spec is different from the one of existig segments.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ public int getPriority()
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
final List<DataSegment> segments = segmentProvider.findSegments(taskActionClient);
return determineLockGranularityandTryLockWithSegments(taskActionClient, segments, segmentProvider::checkSegments);
return determineLockGranularityAndTryLockWithSegments(taskActionClient, segments, segmentProvider::checkSegments);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ public TaskStatus runTask(final TaskToolbox toolbox)
final List<Interval> allocateIntervals = new ArrayList<>(partitionAnalysis.getAllIntervalsToIndex());
final DataSchema dataSchema;
if (determineIntervals) {
if (!determineLockGranularityandTryLock(toolbox.getTaskActionClient(), allocateIntervals)) {
if (!determineLockGranularityAndTryLock(toolbox.getTaskActionClient(), allocateIntervals)) {
throw new ISE("Failed to get locks for intervals[%s]", allocateIntervals);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,11 @@ default int getPriority()
/**
* Execute preflight actions for a task. This can be used to acquire locks, check preconditions, and so on. The
* actions must be idempotent, since this method may be executed multiple times. This typically runs on the
* coordinator. If this method throws an exception, the task should be considered a failure.
* <p/>
* Overlord. If this method throws an exception, the task should be considered a failure.
*
* This method will not necessarily be executed before {@link #run(TaskToolbox)}, since this task readiness check
* may be done on a different machine from the one that actually runs the task.
Comment on lines +165 to +166
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, how can this happen? I thought Task.isReady() is always called before Task.run() because a task can be scheduled only when Task.isReady() returns true. Task.run() will be called after the task is scheduled in some indexer or middleManager.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It won't necessarily be called on that same Task object (it might be called on a different instance that represents the same actual task).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. I think it happens only in indexers because peon calls isReady() before it runs its task.

*
* This method must be idempotent, as it may be run multiple times per task.
*
* @param taskActionClient action client for this task (not the full toolbox)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
* - {@link #verifyAndLockExistingSegments} is to verify the granularity of existing segments and lock them.
* This method must be called before the task starts indexing.
* - Tells the task what {@link LockGranularity} it should use. Note that the LockGranularity is determined in
* {@link AbstractBatchIndexTask#determineLockGranularityandTryLock}.
* {@link AbstractBatchIndexTask#determineLockGranularityAndTryLock}.
* - Provides some util methods for {@link LockGranularity#SEGMENT}. Also caches the information of locked segments when
* - the SEGMENt lock granularity is used.
*/
Expand Down
61 changes: 39 additions & 22 deletions integration-tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ Integration Testing Using Docker
Before starting, if you don't already have docker on your machine, install it as described on
[Docker installation instructions](https://docs.docker.com/install/). Ensure that you
have at least 4GB of memory allocated to the docker engine. (You can verify it
under Preferences > Advanced.)
under Preferences > Resources > Advanced.)

Also set the `DOCKER_IP`
environment variable to localhost on your system, as follows:
Expand Down Expand Up @@ -72,31 +72,40 @@ Druid routers for security group integration test (permissive tls, no client aut

## Docker compose

Docker compose yamls located in "docker" folder
There are a few different Docker compose yamls located in "docker" folder. Before you can run any of these, you must
build the Docker images. See "Manually bringing up Docker containers and running tests" below.

docker-compose.base.yml - Base file that defines all containers for integration test

docker-compose.yml - Defines Druid cluster with default configuration that is used for running integration tests in Travis CI.

docker-compose -f docker-compose.yml up
// DRUID_INTEGRATION_TEST_GROUP - this variable is used in Druid docker container for "security" and "query" test group. Use next docker-compose if you want to run security/query tests.
DRUID_INTEGRATION_TEST_GROUP=security docker-compose -f docker-compose.yml up

```
docker-compose -f docker-compose.yml up
// DRUID_INTEGRATION_TEST_GROUP - this variable is used in Druid docker container for "security" and "query" test group. Use next docker-compose if you want to run security/query tests.
DRUID_INTEGRATION_TEST_GROUP=security docker-compose -f docker-compose.yml up
```

docker-compose.override-env.yml - Defines Druid cluster with default configuration plus any additional and/or overriden configurations from override-env file.

// OVERRIDE_ENV - variable that must contains path to Druid configuration file
OVERRIDE_ENV=./environment-configs/override-examples/s3 docker-compose -f docker-compose.override-env.yml up

```
// OVERRIDE_ENV - variable that must contains path to Druid configuration file
OVERRIDE_ENV=./environment-configs/override-examples/s3 docker-compose -f docker-compose.override-env.yml up
```

docker-compose.security.yml - Defines three additional Druid router services with permissive tls, no client auth tls, and custom check tls respectively.
This is meant to be use together with docker-compose.yml or docker-compose.override-env.yml and is only needed for the "security" group integration test.

docker-compose -f docker-compose.yml -f docker-compose.security.yml up

```
docker-compose -f docker-compose.yml -f docker-compose.security.yml up
```

docker-compose.druid-hadoop.yml - for starting Apache Hadoop 2.8.5 cluster with the same setup as the Druid tutorial

docker-compose -f docker-compose.druid-hadoop.yml up
```
docker-compose -f docker-compose.druid-hadoop.yml up
```

## Manual bringing up docker containers and running tests
## Manually bringing up Docker containers and running tests

1. Build druid-cluster, druid-hadoop docker images. From root module run maven command:
```
Expand All @@ -106,30 +115,38 @@ mvn clean install -pl integration-tests -P integration-tests -Ddocker.run.skip=t
2. Run druid cluster by docker-compose:

```
- Basic Druid cluster (skip this if running Druid cluster with override configs):
# Basic Druid cluster (skip this if running Druid cluster with override configs):
docker-compose -f integration-tests/docker/docker-compose.yml up
- Druid cluster with override configs (skip this if running Basic Druid cluster):
# Druid cluster with override configs (skip this if running Basic Druid cluster):
OVERRIDE_ENV=<PATH_TO_ENV> docker-compose -f ${DOCKERDIR}/docker-compose.override-env.yml up
- Druid hadoop (if needed):
# Druid hadoop (if needed):
docker-compose -f ${DOCKERDIR}/docker-compose.druid-hadoop.yml up
- Druid routers for security group integration test (if needed):
docker-compose -f ${DOCKERDIR}/docker-compose.security.yml up
# Druid routers for security group integration test (if needed):
docker-compose -f ${DOCKERDIR}/docker-compose.security.yml up
```

3. Run maven command to execute tests with -Ddocker.build.skip=true -Ddocker.run.skip=true

For example:

```
mvn verify -P integration-tests -pl integration-tests -Dit.test=ITIndexerTest -Ddocker.build.skip=true -Ddocker.run.skip=true
```

## Tips & tricks for debugging and developing integration tests

### Useful mvn command flags

- -Ddocker.build.skip=true to skip build druid containers.
If you do not apply any change to druid then you can do not rebuild druid.
This can save ~4 minutes to build druid cluster and druid hadoop.
- -Ddocker.build.skip=true to skip building the containers.
If you do not apply any change to Druid then you skip rebuilding the containers. This can save ~4 minutes.
You need to build druid containers only once, after you can skip docker build step.
- -Ddocker.run.skip=true to skip starting docker containers. This can save ~3 minutes by skipping building and bringing
up the docker containers (Druid, Kafka, Hadoop, MYSQL, zookeeper, etc). Please make sure that you actually do have
these containers already running if using this flag. Additionally, please make sure that the running containers
are in the same state that the setup script (run_cluster.sh) would have brought it up in.
are in the same state that the setup script (run_cluster.sh) would have brought it up in.

### Debugging Druid while running tests

Expand Down
2 changes: 2 additions & 0 deletions integration-tests/build_run_cluster.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

set -e

echo $DRUID_INTEGRATION_TEST_OVERRIDE_CONFIG_PATH

export DIR=$(cd $(dirname $0) && pwd)
Expand Down
7 changes: 4 additions & 3 deletions integration-tests/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,12 @@ EXPOSE 8300 8301 8302 8303 8304 8305
EXPOSE 9092 9093

WORKDIR /var/lib/druid
ENTRYPOINT /tls/generate-server-certs-and-keystores.sh \
ENTRYPOINT /tls/generate-server-certs-and-keystores.sh \
&& . /druid.sh \
# Create druid service config files with all the config variables
&& . /druid.sh; setupConfig \
&& setupConfig \
# Some test groups require pre-existing data to be setup
&& . /druid.sh; setupData \
&& setupData \
# Export the service config file path to use in supervisord conf file
&& export DRUID_COMMON_CONF_DIR="$(. /druid.sh; getConfPath ${DRUID_SERVICE})" \
# Export the common config file path to use in supervisord conf file
Expand Down
31 changes: 25 additions & 6 deletions integration-tests/docker/docker-compose.base.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ services:
- 51111:51111
networks:
druid-it-net:
ipv4_address: 172.172.172.13
ipv4_address: 172.172.172.15
privileged: true
volumes:
- ${HOME}/shared:/shared
Expand Down Expand Up @@ -173,12 +173,31 @@ services:
- ./environment-configs/common
- ./environment-configs/middlemanager

druid-indexer:
image: druid/cluster
container_name: druid-indexer
networks:
druid-it-net:
ipv4_address: 172.172.172.8
ports:
- 5008:5008
- 8091:8091
- 8291:8291
privileged: true
volumes:
- ./../src/test/resources:/resources
- ${HOME}/shared:/shared
- ./service-supervisords/druid.conf:/usr/lib/druid/conf/druid.conf
env_file:
- ./environment-configs/common
- ./environment-configs/indexer

druid-broker:
image: druid/cluster
container_name: druid-broker
networks:
druid-it-net:
ipv4_address: 172.172.172.8
ipv4_address: 172.172.172.9
ports:
- 5005:5005
- 8082:8082
Expand All @@ -196,7 +215,7 @@ services:
container_name: druid-router
networks:
druid-it-net:
ipv4_address: 172.172.172.9
ipv4_address: 172.172.172.10
ports:
- 5004:5004
- 8888:8888
Expand All @@ -214,7 +233,7 @@ services:
container_name: druid-router-permissive-tls
networks:
druid-it-net:
ipv4_address: 172.172.172.10
ipv4_address: 172.172.172.11
ports:
- 5001:5001
- 8889:8889
Expand All @@ -232,7 +251,7 @@ services:
container_name: druid-router-no-client-auth-tls
networks:
druid-it-net:
ipv4_address: 172.172.172.11
ipv4_address: 172.172.172.12
ports:
- 5002:5002
- 8890:8890
Expand All @@ -250,7 +269,7 @@ services:
container_name: druid-router-custom-check-tls
networks:
druid-it-net:
ipv4_address: 172.172.172.12
ipv4_address: 172.172.172.13
ports:
- 5003:5003
- 8891:8891
Expand Down
Loading