Skip to content

Commit

Permalink
Fixes and tests related to the Indexer process. (apache#10631)
Browse files Browse the repository at this point in the history
* Fixes and tests related to the Indexer process.

Three bugs fixed:

1) Indexers would not announce themselves as segment servers if they
   did not have storage locations defined. This used to work, but was
   broken in apache#9971. Fixed this by adding an "isSegmentServer" method
   to ServerType and updating SegmentLoadDropHandler to always announce
   if this method returns true.

2) Certain batch task types were written in a way that assumed "isReady"
   would be called before "run", which is not guaranteed. In particular,
   they relied on it in order to initialize "taskLockHelper". Fixed this
   by updating AbstractBatchIndexTask to ensure "isReady" is called
   before "run" for these tasks.

3) UnifiedIndexerAppenderatorsManager did not properly handle complex
   datasources. Introduced DataSourceAnalysis in order to fix this.

Test changes:

1) Add a new "docker-compose.cli-indexer.yml" config that spins up an
   Indexer instead of a MiddleManager.

2) Introduce a "USE_INDEXER" environment variable that determines if
   docker-compose will start up an Indexer or a MiddleManager.

3) Duplicate all the jdk8 tests and run them in both MiddleManager and
   Indexer mode.

4) Various adjustments to encourage fail-fast errors in the Docker
   build scripts.

5) Various adjustments to speed up integration tests and reduce memory
   usage.

6) Add another Mac-specific approach to determining a machine's own IP.
   This was useful on my development machine.

7) Update segment-count check in ITCompactionTaskTest to eliminate a
   race condition (it was looking for 6 segments, which only exist
   together briefly, until the older 4 are marked unused).

Javadoc updates:

1) AbstractBatchIndexTask: Added javadocs to determineLockGranularityXXX
   that make it clear when taskLockHelper will be initialized as a side
   effect. (Related to the second bug above.)

2) Task: Clarified that "isReady" is not guaranteed to be called before
   "run". It was already implied, but now it's explicit.

3) ZkCoordinator: Clarified deprecation message.

4) DataSegmentServerAnnouncer: Clarified deprecation message.

* Fix stop_cluster script.

* Fix sanity check in script.

* Fix hashbang lines.

* Test and doc adjustments.

* Additional tests, and adjustments for tests.

* Split ITs back out.

* Revert change to druid_coordinator_period_indexingPeriod.

* Set Indexer capacity to match MM.

* Bump up Historical memory.

* Bump down coordinator, overlord memory.

* Bump up Broker memory.
  • Loading branch information
gianm authored and JulianJaffePinterest committed Jan 22, 2021
1 parent 936239f commit bf8dd07
Show file tree
Hide file tree
Showing 45 changed files with 692 additions and 147 deletions.
113 changes: 79 additions & 34 deletions .travis.yml

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions docs/design/indexer.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ By default, the number of concurrent persist/merge operations is limited to (`dr

Separate task logs are not currently supported when using the Indexer; all task log messages will instead be logged in the Indexer process log.

The Indexer currently imposes an identical memory limit on each task. In later releases, the per-task memory limit will be removed and only the global limit will apply. The limit on concurrent merges will also be removed.
The Indexer currently imposes an identical memory limit on each task. In later releases, the per-task memory limit will be removed and only the global limit will apply. The limit on concurrent merges will also be removed.

In later releases, per-task memory usage will be dynamically managed. Please see https://github.com/apache/druid/issues/7900 for details on future enhancements to the Indexer.
The Indexer does not work properly with [`index_realtime`](../ingestion/tasks.md#index_realtime) task types. Therefore, it is not compatible with [Tranquility](../ingestion/tranquility.md). If you are using Tranquility, consider migrating to Druid's builtin [Apache Kafka](../development/extensions-core/kafka-ingestion.md) or [Amazon Kinesis](../development/extensions-core/kinesis-ingestion.md) ingestion options.

In later releases, per-task memory usage will be dynamically managed. Please see https://github.com/apache/druid/issues/7900 for details on future enhancements to the Indexer.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory;
import org.apache.druid.indexing.firehose.WindowedSegmentId;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.GranularityType;
Expand Down Expand Up @@ -82,7 +83,7 @@
/**
* Abstract class for batch tasks like {@link IndexTask}.
* Provides some methods such as {@link #determineSegmentGranularity}, {@link #findInputSegments},
* and {@link #determineLockGranularityandTryLock} for easily acquiring task locks.
* and {@link #determineLockGranularityAndTryLock} for easily acquiring task locks.
*/
public abstract class AbstractBatchIndexTask extends AbstractTask
{
Expand Down Expand Up @@ -122,6 +123,17 @@ protected AbstractBatchIndexTask(
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
if (taskLockHelper == null) {
// Subclasses generally use "isReady" to initialize the taskLockHelper. It's not guaranteed to be called before
// "run", and so we call it here to ensure it happens.
//
// We're only really calling it for its side effects, and we expect it to return "true". If it doesn't, something
// strange is going on, so bail out.
if (!isReady(toolbox.getTaskActionClient())) {
throw new ISE("Cannot start; not ready!");
}
}

synchronized (this) {
if (stopped) {
return TaskStatus.failure(getId());
Expand Down Expand Up @@ -251,9 +263,17 @@ public TaskLockHelper getTaskLockHelper()
}

/**
* Determine lockGranularity to use and try to acquire necessary locks.
* This method respects the value of 'forceTimeChunkLock' in task context.
* If it's set to false or missing, this method checks if this task can use segmentLock.
* Attempts to acquire a lock that covers the intervals specified in a certain granularitySpec.
*
* This method uses {@link GranularitySpec#bucketIntervals()} to get the list of intervals to lock, and passes them
* to {@link #determineLockGranularityAndTryLock(TaskActionClient, List)}.
*
* Will look at {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} to decide whether to acquire a time chunk or segment lock.
*
* If {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} is set, or if {@param intervals} is nonempty, then this method
* will initialize {@link #taskLockHelper} as a side effect.
*
* @return whether the lock was acquired
*/
protected boolean determineLockGranularityAndTryLock(
TaskActionClient client,
Expand All @@ -263,10 +283,20 @@ protected boolean determineLockGranularityAndTryLock(
final List<Interval> intervals = granularitySpec.bucketIntervals().isPresent()
? new ArrayList<>(granularitySpec.bucketIntervals().get())
: Collections.emptyList();
return determineLockGranularityandTryLock(client, intervals);
return determineLockGranularityAndTryLock(client, intervals);
}

boolean determineLockGranularityandTryLock(TaskActionClient client, List<Interval> intervals) throws IOException
/**
* Attempts to acquire a lock that covers certain intervals.
*
* Will look at {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} to decide whether to acquire a time chunk or segment lock.
*
* If {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} is set, or if {@param intervals} is nonempty, then this method
* will initialize {@link #taskLockHelper} as a side effect.
*
* @return whether the lock was acquired
*/
boolean determineLockGranularityAndTryLock(TaskActionClient client, List<Interval> intervals) throws IOException
{
final boolean forceTimeChunkLock = getContextValue(
Tasks.FORCE_TIME_CHUNK_LOCK_KEY,
Expand All @@ -287,12 +317,22 @@ boolean determineLockGranularityandTryLock(TaskActionClient client, List<Interva
taskLockHelper = new TaskLockHelper(result.lockGranularity == LockGranularity.SEGMENT);
return tryLockWithDetermineResult(client, result);
} else {
// This branch is the only one that will not initialize taskLockHelper.
return true;
}
}
}

boolean determineLockGranularityandTryLockWithSegments(
/**
* Attempts to acquire a lock that covers certain segments.
*
* Will look at {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} to decide whether to acquire a time chunk or segment lock.
*
* This method will initialize {@link #taskLockHelper} as a side effect.
*
* @return whether the lock was acquired
*/
boolean determineLockGranularityAndTryLockWithSegments(
TaskActionClient client,
List<DataSegment> segments,
BiConsumer<LockGranularity, List<DataSegment>> segmentCheckFunction
Expand Down Expand Up @@ -396,7 +436,8 @@ private LockGranularityDetermineResult determineSegmentGranularity(List<DataSegm
if (granularityFromSegments == null
|| segmentGranularityFromSpec != null
&& (!granularityFromSegments.equals(segmentGranularityFromSpec)
|| segments.stream().anyMatch(segment -> !segmentGranularityFromSpec.isAligned(segment.getInterval())))) {
|| segments.stream()
.anyMatch(segment -> !segmentGranularityFromSpec.isAligned(segment.getInterval())))) {
// This case is one of the followings:
// 1) Segments have different granularities.
// 2) Segment granularity in ingestion spec is different from the one of existig segments.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ public int getPriority()
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
final List<DataSegment> segments = segmentProvider.findSegments(taskActionClient);
return determineLockGranularityandTryLockWithSegments(taskActionClient, segments, segmentProvider::checkSegments);
return determineLockGranularityAndTryLockWithSegments(taskActionClient, segments, segmentProvider::checkSegments);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ public TaskStatus runTask(final TaskToolbox toolbox)
final List<Interval> allocateIntervals = new ArrayList<>(partitionAnalysis.getAllIntervalsToIndex());
final DataSchema dataSchema;
if (determineIntervals) {
if (!determineLockGranularityandTryLock(toolbox.getTaskActionClient(), allocateIntervals)) {
if (!determineLockGranularityAndTryLock(toolbox.getTaskActionClient(), allocateIntervals)) {
throw new ISE("Failed to get locks for intervals[%s]", allocateIntervals);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,11 @@ default int getPriority()
/**
* Execute preflight actions for a task. This can be used to acquire locks, check preconditions, and so on. The
* actions must be idempotent, since this method may be executed multiple times. This typically runs on the
* coordinator. If this method throws an exception, the task should be considered a failure.
* <p/>
* Overlord. If this method throws an exception, the task should be considered a failure.
*
* This method will not necessarily be executed before {@link #run(TaskToolbox)}, since this task readiness check
* may be done on a different machine from the one that actually runs the task.
*
* This method must be idempotent, as it may be run multiple times per task.
*
* @param taskActionClient action client for this task (not the full toolbox)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
* - {@link #verifyAndLockExistingSegments} is to verify the granularity of existing segments and lock them.
* This method must be called before the task starts indexing.
* - Tells the task what {@link LockGranularity} it should use. Note that the LockGranularity is determined in
* {@link AbstractBatchIndexTask#determineLockGranularityandTryLock}.
* {@link AbstractBatchIndexTask#determineLockGranularityAndTryLock}.
* - Provides some util methods for {@link LockGranularity#SEGMENT}. Also caches the information of locked segments when
* - the SEGMENt lock granularity is used.
*/
Expand Down
61 changes: 39 additions & 22 deletions integration-tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ Integration Testing Using Docker
Before starting, if you don't already have docker on your machine, install it as described on
[Docker installation instructions](https://docs.docker.com/install/). Ensure that you
have at least 4GB of memory allocated to the docker engine. (You can verify it
under Preferences > Advanced.)
under Preferences > Resources > Advanced.)

Also set the `DOCKER_IP`
environment variable to localhost on your system, as follows:
Expand Down Expand Up @@ -72,31 +72,40 @@ Druid routers for security group integration test (permissive tls, no client aut

## Docker compose

Docker compose yamls located in "docker" folder
There are a few different Docker compose yamls located in "docker" folder. Before you can run any of these, you must
build the Docker images. See "Manually bringing up Docker containers and running tests" below.

docker-compose.base.yml - Base file that defines all containers for integration test

docker-compose.yml - Defines Druid cluster with default configuration that is used for running integration tests in Travis CI.

docker-compose -f docker-compose.yml up
// DRUID_INTEGRATION_TEST_GROUP - this variable is used in Druid docker container for "security" and "query" test group. Use next docker-compose if you want to run security/query tests.
DRUID_INTEGRATION_TEST_GROUP=security docker-compose -f docker-compose.yml up

```
docker-compose -f docker-compose.yml up
// DRUID_INTEGRATION_TEST_GROUP - this variable is used in Druid docker container for "security" and "query" test group. Use next docker-compose if you want to run security/query tests.
DRUID_INTEGRATION_TEST_GROUP=security docker-compose -f docker-compose.yml up
```

docker-compose.override-env.yml - Defines Druid cluster with default configuration plus any additional and/or overriden configurations from override-env file.

// OVERRIDE_ENV - variable that must contains path to Druid configuration file
OVERRIDE_ENV=./environment-configs/override-examples/s3 docker-compose -f docker-compose.override-env.yml up

```
// OVERRIDE_ENV - variable that must contains path to Druid configuration file
OVERRIDE_ENV=./environment-configs/override-examples/s3 docker-compose -f docker-compose.override-env.yml up
```

docker-compose.security.yml - Defines three additional Druid router services with permissive tls, no client auth tls, and custom check tls respectively.
This is meant to be use together with docker-compose.yml or docker-compose.override-env.yml and is only needed for the "security" group integration test.

docker-compose -f docker-compose.yml -f docker-compose.security.yml up

```
docker-compose -f docker-compose.yml -f docker-compose.security.yml up
```

docker-compose.druid-hadoop.yml - for starting Apache Hadoop 2.8.5 cluster with the same setup as the Druid tutorial

docker-compose -f docker-compose.druid-hadoop.yml up
```
docker-compose -f docker-compose.druid-hadoop.yml up
```

## Manual bringing up docker containers and running tests
## Manually bringing up Docker containers and running tests

1. Build druid-cluster, druid-hadoop docker images. From root module run maven command:
```
Expand All @@ -106,30 +115,38 @@ mvn clean install -pl integration-tests -P integration-tests -Ddocker.run.skip=t
2. Run druid cluster by docker-compose:

```
- Basic Druid cluster (skip this if running Druid cluster with override configs):
# Basic Druid cluster (skip this if running Druid cluster with override configs):
docker-compose -f integration-tests/docker/docker-compose.yml up
- Druid cluster with override configs (skip this if running Basic Druid cluster):
# Druid cluster with override configs (skip this if running Basic Druid cluster):
OVERRIDE_ENV=<PATH_TO_ENV> docker-compose -f ${DOCKERDIR}/docker-compose.override-env.yml up
- Druid hadoop (if needed):
# Druid hadoop (if needed):
docker-compose -f ${DOCKERDIR}/docker-compose.druid-hadoop.yml up
- Druid routers for security group integration test (if needed):
docker-compose -f ${DOCKERDIR}/docker-compose.security.yml up
# Druid routers for security group integration test (if needed):
docker-compose -f ${DOCKERDIR}/docker-compose.security.yml up
```

3. Run maven command to execute tests with -Ddocker.build.skip=true -Ddocker.run.skip=true

For example:

```
mvn verify -P integration-tests -pl integration-tests -Dit.test=ITIndexerTest -Ddocker.build.skip=true -Ddocker.run.skip=true
```

## Tips & tricks for debugging and developing integration tests

### Useful mvn command flags

- -Ddocker.build.skip=true to skip build druid containers.
If you do not apply any change to druid then you can do not rebuild druid.
This can save ~4 minutes to build druid cluster and druid hadoop.
- -Ddocker.build.skip=true to skip building the containers.
If you do not apply any change to Druid then you skip rebuilding the containers. This can save ~4 minutes.
You need to build druid containers only once, after you can skip docker build step.
- -Ddocker.run.skip=true to skip starting docker containers. This can save ~3 minutes by skipping building and bringing
up the docker containers (Druid, Kafka, Hadoop, MYSQL, zookeeper, etc). Please make sure that you actually do have
these containers already running if using this flag. Additionally, please make sure that the running containers
are in the same state that the setup script (run_cluster.sh) would have brought it up in.
are in the same state that the setup script (run_cluster.sh) would have brought it up in.

### Debugging Druid while running tests

Expand Down
2 changes: 2 additions & 0 deletions integration-tests/build_run_cluster.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

set -e

echo $DRUID_INTEGRATION_TEST_OVERRIDE_CONFIG_PATH

export DIR=$(cd $(dirname $0) && pwd)
Expand Down
7 changes: 4 additions & 3 deletions integration-tests/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,12 @@ EXPOSE 8300 8301 8302 8303 8304 8305
EXPOSE 9092 9093

WORKDIR /var/lib/druid
ENTRYPOINT /tls/generate-server-certs-and-keystores.sh \
ENTRYPOINT /tls/generate-server-certs-and-keystores.sh \
&& . /druid.sh \
# Create druid service config files with all the config variables
&& . /druid.sh; setupConfig \
&& setupConfig \
# Some test groups require pre-existing data to be setup
&& . /druid.sh; setupData \
&& setupData \
# Export the service config file path to use in supervisord conf file
&& export DRUID_COMMON_CONF_DIR="$(. /druid.sh; getConfPath ${DRUID_SERVICE})" \
# Export the common config file path to use in supervisord conf file
Expand Down
31 changes: 25 additions & 6 deletions integration-tests/docker/docker-compose.base.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ services:
- 51111:51111
networks:
druid-it-net:
ipv4_address: 172.172.172.13
ipv4_address: 172.172.172.15
privileged: true
volumes:
- ${HOME}/shared:/shared
Expand Down Expand Up @@ -173,12 +173,31 @@ services:
- ./environment-configs/common
- ./environment-configs/middlemanager

druid-indexer:
image: druid/cluster
container_name: druid-indexer
networks:
druid-it-net:
ipv4_address: 172.172.172.8
ports:
- 5008:5008
- 8091:8091
- 8291:8291
privileged: true
volumes:
- ./../src/test/resources:/resources
- ${HOME}/shared:/shared
- ./service-supervisords/druid.conf:/usr/lib/druid/conf/druid.conf
env_file:
- ./environment-configs/common
- ./environment-configs/indexer

druid-broker:
image: druid/cluster
container_name: druid-broker
networks:
druid-it-net:
ipv4_address: 172.172.172.8
ipv4_address: 172.172.172.9
ports:
- 5005:5005
- 8082:8082
Expand All @@ -196,7 +215,7 @@ services:
container_name: druid-router
networks:
druid-it-net:
ipv4_address: 172.172.172.9
ipv4_address: 172.172.172.10
ports:
- 5004:5004
- 8888:8888
Expand All @@ -214,7 +233,7 @@ services:
container_name: druid-router-permissive-tls
networks:
druid-it-net:
ipv4_address: 172.172.172.10
ipv4_address: 172.172.172.11
ports:
- 5001:5001
- 8889:8889
Expand All @@ -232,7 +251,7 @@ services:
container_name: druid-router-no-client-auth-tls
networks:
druid-it-net:
ipv4_address: 172.172.172.11
ipv4_address: 172.172.172.12
ports:
- 5002:5002
- 8890:8890
Expand All @@ -250,7 +269,7 @@ services:
container_name: druid-router-custom-check-tls
networks:
druid-it-net:
ipv4_address: 172.172.172.12
ipv4_address: 172.172.172.13
ports:
- 5003:5003
- 8891:8891
Expand Down
Loading

0 comments on commit bf8dd07

Please sign in to comment.