Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optionally disable all of hardcoded zookeeper use #9507

Merged
merged 6 commits into from
Oct 27, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,11 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer

// ZK_CLEANUP_TODO : Remove these when RemoteTaskRunner and WorkerTaskMonitor are removed.
private static final Joiner JOINER = Joiner.on("/");

@Nullable // Null, if zk is disabled
private final CuratorFramework cf;

@Nullable // Null, if zk is disabled
private final ScheduledExecutorService zkCleanupExec;
private final IndexerZkConfig indexerZkConfig;

Expand All @@ -192,7 +196,7 @@ public HttpRemoteTaskRunner(
ProvisioningStrategy<WorkerTaskRunner> provisioningStrategy,
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider,
TaskStorage taskStorage,
CuratorFramework cf,
@Nullable CuratorFramework cf,
IndexerZkConfig indexerZkConfig
)
{
Expand All @@ -217,12 +221,18 @@ public HttpRemoteTaskRunner(
ScheduledExecutors.fixed(1, "HttpRemoteTaskRunner-Worker-Cleanup-%d")
);

this.cf = cf;
if (cf != null) {
this.cf = cf;
this.zkCleanupExec = ScheduledExecutors.fixed(
1,
"HttpRemoteTaskRunner-zk-cleanup-%d"
);
} else {
this.cf = null;
this.zkCleanupExec = null;
}

this.indexerZkConfig = indexerZkConfig;
this.zkCleanupExec = ScheduledExecutors.fixed(
1,
"HttpRemoteTaskRunner-zk-cleanup-%d"
);

this.provisioningStrategy = provisioningStrategy;
}
Expand Down Expand Up @@ -269,6 +279,10 @@ public void start()

private void scheduleCompletedTaskStatusCleanupFromZk()
{
if (cf == null) {
return;
}

zkCleanupExec.scheduleAtFixedRate(
() -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import com.google.inject.Provider;
import org.apache.curator.framework.CuratorFramework;
import org.apache.druid.curator.ZkEnablementConfig;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.guice.annotations.Smile;
Expand All @@ -36,6 +38,8 @@
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.server.initialization.IndexerZkConfig;

import javax.annotation.Nullable;

/**
*/
public class HttpRemoteTaskRunnerFactory implements TaskRunnerFactory<HttpRemoteTaskRunner>
Expand All @@ -52,6 +56,7 @@ public class HttpRemoteTaskRunnerFactory implements TaskRunnerFactory<HttpRemote
private final TaskStorage taskStorage;

// ZK_CLEANUP_TODO : Remove these when RemoteTaskRunner and WorkerTaskMonitor are removed.
@Nullable //Null if zk is disabled
private final CuratorFramework cf;
private final IndexerZkConfig indexerZkConfig;

Expand All @@ -65,8 +70,9 @@ public HttpRemoteTaskRunnerFactory(
final ProvisioningStrategy provisioningStrategy,
final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider,
final TaskStorage taskStorage,
final CuratorFramework cf,
final IndexerZkConfig indexerZkConfig
final Provider<CuratorFramework> cfProvider,
final IndexerZkConfig indexerZkConfig,
final ZkEnablementConfig zkEnablementConfig
)
{
this.smileMapper = smileMapper;
Expand All @@ -77,8 +83,13 @@ public HttpRemoteTaskRunnerFactory(
this.provisioningStrategy = provisioningStrategy;
this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider;
this.taskStorage = taskStorage;
this.cf = cf;
this.indexerZkConfig = indexerZkConfig;

if (zkEnablementConfig.isEnabled()) {
this.cf = cfProvider.get();
} else {
this.cf = null;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
* starts running and completed task on disk is deleted based on a periodic schedule where overlord is asked for
* active tasks to see which completed tasks are safe to delete.
*/
public abstract class WorkerTaskManager
public class WorkerTaskManager
{
private static final EmittingLogger log = new EmittingLogger(WorkerTaskManager.class);

Expand Down Expand Up @@ -596,6 +596,12 @@ public void workerDisabled()
}
}

public boolean isWorkerEnabled()
{
Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.SECONDS), "not started");
return !disabled.get();
}

private static class TaskDetails
{
private final Task task;
Expand Down Expand Up @@ -776,7 +782,13 @@ public void handle()
//watches task assignments and updates task statuses inside Zookeeper. When the transition to HTTP is complete
//in Overlord as well as MiddleManagers then WorkerTaskMonitor should be deleted, this class should no longer be abstract
//and the methods below should be removed.
protected abstract void taskStarted(String taskId);
protected void taskStarted(String taskId)
{

protected abstract void taskAnnouncementChanged(TaskAnnouncement announcement);
}

protected void taskAnnouncementChanged(TaskAnnouncement announcement)
{

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,10 @@
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.hrtr.WorkerHolder;
import org.apache.druid.indexing.worker.WorkerHistoryItem;
import org.apache.druid.indexing.worker.WorkerTaskMonitor;
import org.apache.druid.indexing.worker.WorkerTaskManager;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.coordination.ChangeRequestHistory;
import org.apache.druid.server.coordination.ChangeRequestsSnapshot;
import org.apache.druid.server.http.SegmentListerResource;
import org.apache.druid.server.http.security.StateResourceFilter;

import javax.servlet.AsyncContext;
Expand All @@ -61,22 +60,22 @@
@ResourceFilters(StateResourceFilter.class)
public class TaskManagementResource
{
protected static final EmittingLogger log = new EmittingLogger(SegmentListerResource.class);
protected static final EmittingLogger log = new EmittingLogger(TaskManagementResource.class);

protected final ObjectMapper jsonMapper;
protected final ObjectMapper smileMapper;
private final WorkerTaskMonitor workerTaskMonitor;
private final WorkerTaskManager workerTaskManager;

@Inject
public TaskManagementResource(
@Json ObjectMapper jsonMapper,
@Smile ObjectMapper smileMapper,
WorkerTaskMonitor workerTaskMonitor
WorkerTaskManager workerTaskManager
)
{
this.jsonMapper = jsonMapper;
this.smileMapper = smileMapper;
this.workerTaskMonitor = workerTaskMonitor;
this.workerTaskManager = workerTaskManager;
}

/**
Expand Down Expand Up @@ -119,7 +118,7 @@ public Void getWorkerState(

final ResponseContext context = createContext(req.getHeader("Accept"));

final ListenableFuture<ChangeRequestsSnapshot<WorkerHistoryItem>> future = workerTaskMonitor.getChangesSince(
final ListenableFuture<ChangeRequestsSnapshot<WorkerHistoryItem>> future = workerTaskManager.getChangesSince(
new ChangeRequestHistory.Counter(
counter,
hash
Expand Down Expand Up @@ -205,7 +204,7 @@ public void onFailure(Throwable th)
public Response assignTask(Task task)
{
try {
workerTaskMonitor.assignTask(task);
workerTaskManager.assignTask(task);
return Response.ok().build();
}
catch (RuntimeException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,22 @@
import com.google.common.collect.Lists;
import com.google.common.io.ByteSource;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.sun.jersey.spi.container.ResourceFilters;
import org.apache.druid.curator.ZkEnablementConfig;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.WorkerCuratorCoordinator;
import org.apache.druid.indexing.worker.WorkerTaskMonitor;
import org.apache.druid.indexing.worker.WorkerTaskManager;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.http.HttpMediaType;
import org.apache.druid.server.http.security.ConfigResourceFilter;
import org.apache.druid.server.http.security.StateResourceFilter;
import org.apache.druid.tasklogs.TaskLogStreamer;

import javax.annotation.Nullable;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
Expand All @@ -59,23 +62,32 @@ public class WorkerResource
private static String DISABLED_VERSION = "";

private final Worker enabledWorker;

@Nullable // Null, if zk is disabled
private final WorkerCuratorCoordinator curatorCoordinator;

private final TaskRunner taskRunner;
private final WorkerTaskMonitor workerTaskManager;
private final WorkerTaskManager workerTaskManager;

@Inject
public WorkerResource(
Worker worker,
WorkerCuratorCoordinator curatorCoordinator,
Provider<WorkerCuratorCoordinator> curatorCoordinatorProvider,
TaskRunner taskRunner,
WorkerTaskMonitor workerTaskManager
WorkerTaskManager workerTaskManager,
ZkEnablementConfig zkEnablementConfig

)
{
this.enabledWorker = worker;
this.curatorCoordinator = curatorCoordinator;
this.taskRunner = taskRunner;
this.workerTaskManager = workerTaskManager;

if (zkEnablementConfig.isEnabled()) {
this.curatorCoordinator = curatorCoordinatorProvider.get();
} else {
this.curatorCoordinator = null;
}
}


Expand All @@ -86,17 +98,19 @@ public WorkerResource(
public Response doDisable()
{
try {
final Worker disabledWorker = new Worker(
enabledWorker.getScheme(),
enabledWorker.getHost(),
enabledWorker.getIp(),
enabledWorker.getCapacity(),
DISABLED_VERSION,
enabledWorker.getCategory()
);
curatorCoordinator.updateWorkerAnnouncement(disabledWorker);
if (curatorCoordinator != null) {
final Worker disabledWorker = new Worker(
enabledWorker.getScheme(),
enabledWorker.getHost(),
enabledWorker.getIp(),
enabledWorker.getCapacity(),
DISABLED_VERSION,
enabledWorker.getCategory()
);
curatorCoordinator.updateWorkerAnnouncement(disabledWorker);
}
workerTaskManager.workerDisabled();
return Response.ok(ImmutableMap.of(disabledWorker.getHost(), "disabled")).build();
return Response.ok(ImmutableMap.of(enabledWorker.getHost(), "disabled")).build();
}
catch (Exception e) {
return Response.serverError().build();
Expand All @@ -110,7 +124,9 @@ public Response doDisable()
public Response doEnable()
{
try {
curatorCoordinator.updateWorkerAnnouncement(enabledWorker);
if (curatorCoordinator != null) {
curatorCoordinator.updateWorkerAnnouncement(enabledWorker);
}
workerTaskManager.workerEnabled();
return Response.ok(ImmutableMap.of(enabledWorker.getHost(), "enabled")).build();
}
Expand All @@ -126,9 +142,7 @@ public Response doEnable()
public Response isEnabled()
{
try {
final Worker theWorker = curatorCoordinator.getWorker();
final boolean enabled = !theWorker.getVersion().equalsIgnoreCase(DISABLED_VERSION);
return Response.ok(ImmutableMap.of(theWorker.getHost(), enabled)).build();
return Response.ok(ImmutableMap.of(enabledWorker.getHost(), workerTaskManager.isWorkerEnabled())).build();
}
catch (Exception e) {
return Response.serverError().build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;
import org.apache.druid.curator.PotentiallyGzippedCompressionProvider;
import org.apache.druid.curator.ZkEnablementConfig;
import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.WorkerCuratorCoordinator;
Expand Down Expand Up @@ -100,9 +101,10 @@ public String getBase()

workerResource = new WorkerResource(
worker,
curatorCoordinator,
() -> curatorCoordinator,
null,
EasyMock.createNiceMock(WorkerTaskMonitor.class)
EasyMock.createNiceMock(WorkerTaskMonitor.class),
ZkEnablementConfig.ENABLED
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,20 @@ public class CuratorModule implements Module
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, CURATOR_CONFIG_PREFIX, ZkEnablementConfig.class);
JsonConfigProvider.bind(binder, CURATOR_CONFIG_PREFIX, CuratorConfig.class);
JsonConfigProvider.bind(binder, EXHIBITOR_CONFIG_PREFIX, ExhibitorConfig.class);
}

@Provides
@LazySingleton
@SuppressForbidden(reason = "System#err")
public CuratorFramework makeCurator(CuratorConfig config, EnsembleProvider ensembleProvider, Lifecycle lifecycle)
public CuratorFramework makeCurator(ZkEnablementConfig zkEnablementConfig, CuratorConfig config, EnsembleProvider ensembleProvider, Lifecycle lifecycle)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't looked too closely yet, but I wonder if this would be better if this was marked as @Nullable and returned null instead of throwing the runtime exception, and shift the burden of validating that curator is available to the settings that do require it, such as inventory, segment loading, and task management? The other stuff might be able to be simplified a bit and not have to care about having the setting, and could probably avoid having some of the signature changes to use providers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Main reason for failing loudly here is so that if I forgot to disable zk in some code path, then this method would immediately fail on node start with guice injection errors leading to quick discovery of exactly what is missed.
this helped me catch quite a few places that I missed.

{
if (!zkEnablementConfig.isEnabled()) {
throw new RuntimeException("Zookeeper is disabled, Can't create CuratorFramework.");
}

final CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
if (!Strings.isNullOrEmpty(config.getZkUser()) && !Strings.isNullOrEmpty(config.getZkPwd())) {
builder.authorization(
Expand Down
Loading