Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

integration test for coordinator and overlord leadership client #10680

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -483,13 +483,19 @@ jobs:
name: "(Compile=openjdk8, Run=openjdk8) other integration tests"
jdk: openjdk8
services: *integration_test_services
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'
script: *run_integration_test
after_failure: *integration_test_diags

- <<: *integration_tests
name: "(Compile=openjdk8, Run=openjdk8) other integration tests with Indexer"
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer'
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer'

- <<: *integration_tests
name: "(Compile=openjdk8, Run=openjdk8) leadership and high availability integration tests"
jdk: openjdk8
env: TESTNG_GROUPS='-Dgroups=high-availability' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'

# END - Integration tests for Compile with Java 8 and Run with Java 8

# START - Integration tests for Compile with Java 8 and Run with Java 11
Expand Down Expand Up @@ -546,7 +552,12 @@ jobs:
- <<: *integration_tests
name: "(Compile=openjdk8, Run=openjdk11) other integration test"
jdk: openjdk8
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager'
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager'

- <<: *integration_tests
name: "(Compile=openjdk8, Run=openjdk11) leadership and high availability integration tests"
jdk: openjdk8
env: TESTNG_GROUPS='-Dgroups=high-availability' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager'
# END - Integration tests for Compile with Java 8 and Run with Java 11

- name: "security vulnerabilities"
Expand Down
1 change: 1 addition & 0 deletions docs/querying/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -1139,6 +1139,7 @@ Servers table lists all discovered servers in the cluster.
|tier|STRING|Distribution tier see [druid.server.tier](../configuration/index.html#historical-general-configuration). Only valid for HISTORICAL type, for other types it's null|
|current_size|LONG|Current size of segments in bytes on this server. Only valid for HISTORICAL type, for other types it's 0|
|max_size|LONG|Max size in bytes this server recommends to assign to segments see [druid.server.maxSize](../configuration/index.html#historical-general-configuration). Only valid for HISTORICAL type, for other types it's 0|
|is_leader|LONG|1 if the server is currently the 'leader' (for services which have the concept of leadership), otherwise 0 if the server is not the leader, or the default long value (0 or null depending on `druid.generic.useDefaultValueForNull`) if the server type does not have the concept of leadership|

To retrieve information about all servers, use the query:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicate;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.security.basic.authentication.entity.BasicAuthenticatorUser;
Expand Down Expand Up @@ -66,6 +68,9 @@ public class BasicAuthUtils
public static final int DEFAULT_CREDENTIAL_CACHE_SIZE = 100;
public static final int KEY_LENGTH = 512;
public static final String ALGORITHM = "PBKDF2WithHmacSHA512";
public static final int MAX_INIT_RETRIES = 2;
public static final Predicate<Throwable> SHOULD_RETRY_INIT =
(throwable) -> throwable instanceof BasicSecurityDBResourceException;

public static final TypeReference<Map<String, BasicAuthenticatorUser>> AUTHENTICATOR_USER_MAP_TYPE_REFERENCE =
new TypeReference<Map<String, BasicAuthenticatorUser>>()
Expand Down Expand Up @@ -277,4 +282,14 @@ public static byte[] serializeAuthorizerRoleMap(ObjectMapper objectMapper, Map<S
throw new ISE(ioe, "Couldn't serialize authorizer roleMap!");
}
}

public static void maybeInitialize(final RetryUtils.Task<?> task)
{
try {
RetryUtils.retry(task, SHOULD_RETRY_INIT, MAX_INIT_RETRIES);
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,46 +119,50 @@ public void start()

try {
LOG.info("Starting CoordinatorBasicAuthenticatorMetadataStorageUpdater.");
for (Map.Entry<String, Authenticator> entry : authenticatorMapper.getAuthenticatorMap().entrySet()) {
Authenticator authenticator = entry.getValue();
if (authenticator instanceof BasicHTTPAuthenticator) {
String authenticatorName = entry.getKey();
authenticatorPrefixes.add(authenticatorName);
BasicHTTPAuthenticator basicHTTPAuthenticator = (BasicHTTPAuthenticator) authenticator;
BasicAuthDBConfig dbConfig = basicHTTPAuthenticator.getDbConfig();
byte[] userMapBytes = getCurrentUserMapBytes(authenticatorName);
Map<String, BasicAuthenticatorUser> userMap = BasicAuthUtils.deserializeAuthenticatorUserMap(
objectMapper,
userMapBytes
);
cachedUserMaps.put(authenticatorName, new BasicAuthenticatorUserMapBundle(userMap, userMapBytes));

if (dbConfig.getInitialAdminPassword() != null && !userMap.containsKey(BasicAuthUtils.ADMIN_NAME)) {
createUserInternal(authenticatorName, BasicAuthUtils.ADMIN_NAME);
setUserCredentialsInternal(
authenticatorName,
BasicAuthUtils.ADMIN_NAME,
new BasicAuthenticatorCredentialUpdate(
dbConfig.getInitialAdminPassword().getPassword(),
BasicAuthUtils.DEFAULT_KEY_ITERATIONS
)
);
}
BasicAuthUtils.maybeInitialize(
() -> {
for (Map.Entry<String, Authenticator> entry : authenticatorMapper.getAuthenticatorMap().entrySet()) {
Authenticator authenticator = entry.getValue();
if (authenticator instanceof BasicHTTPAuthenticator) {
String authenticatorName = entry.getKey();
authenticatorPrefixes.add(authenticatorName);
BasicHTTPAuthenticator basicHTTPAuthenticator = (BasicHTTPAuthenticator) authenticator;
BasicAuthDBConfig dbConfig = basicHTTPAuthenticator.getDbConfig();
byte[] userMapBytes = getCurrentUserMapBytes(authenticatorName);
Map<String, BasicAuthenticatorUser> userMap = BasicAuthUtils.deserializeAuthenticatorUserMap(
objectMapper,
userMapBytes
);
cachedUserMaps.put(authenticatorName, new BasicAuthenticatorUserMapBundle(userMap, userMapBytes));

if (dbConfig.getInitialAdminPassword() != null && !userMap.containsKey(BasicAuthUtils.ADMIN_NAME)) {
createUserInternal(authenticatorName, BasicAuthUtils.ADMIN_NAME);
setUserCredentialsInternal(
authenticatorName,
BasicAuthUtils.ADMIN_NAME,
new BasicAuthenticatorCredentialUpdate(
dbConfig.getInitialAdminPassword().getPassword(),
BasicAuthUtils.DEFAULT_KEY_ITERATIONS
)
);
}

if (dbConfig.getInitialInternalClientPassword() != null
&& !userMap.containsKey(BasicAuthUtils.INTERNAL_USER_NAME)) {
createUserInternal(authenticatorName, BasicAuthUtils.INTERNAL_USER_NAME);
setUserCredentialsInternal(
authenticatorName,
BasicAuthUtils.INTERNAL_USER_NAME,
new BasicAuthenticatorCredentialUpdate(
dbConfig.getInitialInternalClientPassword().getPassword(),
BasicAuthUtils.DEFAULT_KEY_ITERATIONS
)
);
}
}
}
if (dbConfig.getInitialInternalClientPassword() != null
&& !userMap.containsKey(BasicAuthUtils.INTERNAL_USER_NAME)) {
createUserInternal(authenticatorName, BasicAuthUtils.INTERNAL_USER_NAME);
setUserCredentialsInternal(
authenticatorName,
BasicAuthUtils.INTERNAL_USER_NAME,
new BasicAuthenticatorCredentialUpdate(
dbConfig.getInitialInternalClientPassword().getPassword(),
BasicAuthUtils.DEFAULT_KEY_ITERATIONS
)
);
}
}
}
return true;
});

ScheduledExecutors.scheduleWithFixedDelay(
exec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,42 +143,52 @@ public void start()

try {
LOG.info("Starting CoordinatorBasicAuthorizerMetadataStorageUpdater");
for (Map.Entry<String, Authorizer> entry : authorizerMapper.getAuthorizerMap().entrySet()) {
Authorizer authorizer = entry.getValue();
if (authorizer instanceof BasicRoleBasedAuthorizer) {
BasicRoleBasedAuthorizer basicRoleBasedAuthorizer = (BasicRoleBasedAuthorizer) authorizer;
BasicAuthDBConfig dbConfig = basicRoleBasedAuthorizer.getDbConfig();
String authorizerName = entry.getKey();
authorizerNames.add(authorizerName);

byte[] userMapBytes = getCurrentUserMapBytes(authorizerName);
Map<String, BasicAuthorizerUser> userMap = BasicAuthUtils.deserializeAuthorizerUserMap(
objectMapper,
userMapBytes
);
cachedUserMaps.put(authorizerName, new BasicAuthorizerUserMapBundle(userMap, userMapBytes));
BasicAuthUtils.maybeInitialize(
() -> {
for (Map.Entry<String, Authorizer> entry : authorizerMapper.getAuthorizerMap().entrySet()) {
Authorizer authorizer = entry.getValue();
if (authorizer instanceof BasicRoleBasedAuthorizer) {
BasicRoleBasedAuthorizer basicRoleBasedAuthorizer = (BasicRoleBasedAuthorizer) authorizer;
BasicAuthDBConfig dbConfig = basicRoleBasedAuthorizer.getDbConfig();
String authorizerName = entry.getKey();
authorizerNames.add(authorizerName);

byte[] groupMappingMapBytes = getCurrentGroupMappingMapBytes(authorizerName);
Map<String, BasicAuthorizerGroupMapping> groupMappingMap = BasicAuthUtils.deserializeAuthorizerGroupMappingMap(
objectMapper,
groupMappingMapBytes
);
cachedGroupMappingMaps.put(authorizerName, new BasicAuthorizerGroupMappingMapBundle(groupMappingMap, groupMappingMapBytes));
byte[] userMapBytes = getCurrentUserMapBytes(authorizerName);
Map<String, BasicAuthorizerUser> userMap = BasicAuthUtils.deserializeAuthorizerUserMap(
objectMapper,
userMapBytes
);
cachedUserMaps.put(authorizerName, new BasicAuthorizerUserMapBundle(userMap, userMapBytes));

byte[] roleMapBytes = getCurrentRoleMapBytes(authorizerName);
Map<String, BasicAuthorizerRole> roleMap = BasicAuthUtils.deserializeAuthorizerRoleMap(
objectMapper,
roleMapBytes
);
cachedRoleMaps.put(authorizerName, new BasicAuthorizerRoleMapBundle(roleMap, roleMapBytes));
byte[] groupMappingMapBytes = getCurrentGroupMappingMapBytes(authorizerName);
Map<String, BasicAuthorizerGroupMapping> groupMappingMap = BasicAuthUtils.deserializeAuthorizerGroupMappingMap(
objectMapper,
groupMappingMapBytes
);
cachedGroupMappingMaps.put(
authorizerName,
new BasicAuthorizerGroupMappingMapBundle(
groupMappingMap,
groupMappingMapBytes
)
);

initSuperUsersAndGroupMapping(authorizerName, userMap, roleMap, groupMappingMap,
dbConfig.getInitialAdminUser(),
dbConfig.getInitialAdminRole(),
dbConfig.getInitialAdminGroupMapping()
);
}
}
byte[] roleMapBytes = getCurrentRoleMapBytes(authorizerName);
Map<String, BasicAuthorizerRole> roleMap = BasicAuthUtils.deserializeAuthorizerRoleMap(
objectMapper,
roleMapBytes
);
cachedRoleMaps.put(authorizerName, new BasicAuthorizerRoleMapBundle(roleMap, roleMapBytes));

initSuperUsersAndGroupMapping(authorizerName, userMap, roleMap, groupMappingMap,
dbConfig.getInitialAdminUser(),
dbConfig.getInitialAdminRole(),
dbConfig.getInitialAdminGroupMapping()
);
}
}
return true;
});

ScheduledExecutors.scheduleWithFixedDelay(
exec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ public TaskStatusResponse getTaskStatus(String taskId)
final String groupId = task.isPresent() ? task.get().getGroupId() : null;
final String taskType = task.isPresent() ? task.get().getType() : null;
final TaskStatus taskStatus = taskRunner.getStatus(taskId);

if (taskStatus != null) {
return new TaskStatusResponse(
taskId,
Expand Down
Loading