From 746ffeb5504c2126094a24728d0aca8a420868de Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Sun, 8 Mar 2020 15:22:11 -0700 Subject: [PATCH 1/4] optionally disable all of hardcoded zookeeper use --- .../overlord/hrtr/HttpRemoteTaskRunner.java | 26 ++++++--- .../hrtr/HttpRemoteTaskRunnerFactory.java | 17 ++++-- .../indexing/worker/WorkerTaskManager.java | 18 +++++-- .../worker/http/TaskManagementResource.java | 15 +++--- .../indexing/worker/http/WorkerResource.java | 52 +++++++++++------- .../worker/http/WorkerResourceTest.java | 6 ++- .../apache/druid/curator/CuratorModule.java | 7 ++- .../druid/curator/ZkEnablementConfig.java | 53 +++++++++++++++++++ .../curator/discovery/DiscoveryModule.java | 27 ++++++++-- .../curator/discovery/ServiceAnnouncer.java | 16 ++++++ .../apache/druid/guice/AnnouncerModule.java | 19 ++++++- .../BatchDataSegmentAnnouncer.java | 39 +++++++++++--- .../DataSegmentServerAnnouncer.java | 16 ++++++ .../server/coordinator/DruidCoordinator.java | 28 +++++++--- .../coordinator/LoadQueueTaskMaster.java | 9 ++-- .../druid/server/http/HistoricalResource.java | 12 ++--- .../CuratorDruidCoordinatorTest.java | 11 ++-- .../coordinator/DruidCoordinatorTest.java | 6 ++- .../org/apache/druid/cli/CliCoordinator.java | 5 +- .../org/apache/druid/cli/CliHistorical.java | 18 ++++++- .../java/org/apache/druid/cli/CliIndexer.java | 12 ++++- .../apache/druid/cli/CliMiddleManager.java | 28 ++++++++-- 22 files changed, 349 insertions(+), 91 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/curator/ZkEnablementConfig.java diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java index da83b966606f..82cf612c2f85 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java @@ -180,7 +180,11 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer // ZK_CLEANUP_TODO : Remove these when RemoteTaskRunner and WorkerTaskMonitor are removed. private static final Joiner JOINER = Joiner.on("/"); + + @Nullable // Null, if zk is disabled private final CuratorFramework cf; + + @Nullable // Null, if zk is disabled private final ScheduledExecutorService zkCleanupExec; private final IndexerZkConfig indexerZkConfig; @@ -192,7 +196,7 @@ public HttpRemoteTaskRunner( ProvisioningStrategy provisioningStrategy, DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, TaskStorage taskStorage, - CuratorFramework cf, + @Nullable CuratorFramework cf, IndexerZkConfig indexerZkConfig ) { @@ -217,12 +221,18 @@ public HttpRemoteTaskRunner( ScheduledExecutors.fixed(1, "HttpRemoteTaskRunner-Worker-Cleanup-%d") ); - this.cf = cf; + if (cf != null) { + this.cf = cf; + this.zkCleanupExec = ScheduledExecutors.fixed( + 1, + "HttpRemoteTaskRunner-zk-cleanup-%d" + ); + } else { + this.cf = null; + this.zkCleanupExec = null; + } + this.indexerZkConfig = indexerZkConfig; - this.zkCleanupExec = ScheduledExecutors.fixed( - 1, - "HttpRemoteTaskRunner-zk-cleanup-%d" - ); this.provisioningStrategy = provisioningStrategy; } @@ -269,6 +279,10 @@ public void start() private void scheduleCompletedTaskStatusCleanupFromZk() { + if (cf == null) { + return; + } + zkCleanupExec.scheduleAtFixedRate( () -> { try { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerFactory.java index 99f6f6607dc0..6b81bb0782f0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerFactory.java @@ -22,7 +22,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Supplier; import com.google.inject.Inject; +import com.google.inject.Provider; import org.apache.curator.framework.CuratorFramework; +import org.apache.druid.curator.ZkEnablementConfig; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.guice.annotations.EscalatedGlobal; import org.apache.druid.guice.annotations.Smile; @@ -36,6 +38,8 @@ import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.server.initialization.IndexerZkConfig; +import javax.annotation.Nullable; + /** */ public class HttpRemoteTaskRunnerFactory implements TaskRunnerFactory @@ -52,6 +56,7 @@ public class HttpRemoteTaskRunnerFactory implements TaskRunnerFactory cfProvider, + final IndexerZkConfig indexerZkConfig, + final ZkEnablementConfig zkEnablementConfig ) { this.smileMapper = smileMapper; @@ -77,8 +83,13 @@ public HttpRemoteTaskRunnerFactory( this.provisioningStrategy = provisioningStrategy; this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider; this.taskStorage = taskStorage; - this.cf = cf; this.indexerZkConfig = indexerZkConfig; + + if (zkEnablementConfig.isEnabled()) { + this.cf = cfProvider.get(); + } else { + this.cf = null; + } } @Override diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java index 6855d08b5b5d..7d8d4614d8e3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java @@ -76,7 +76,7 @@ * starts running and completed task on disk is deleted based on a periodic schedule where overlord is asked for * active tasks to see which completed tasks are safe to delete. */ -public abstract class WorkerTaskManager +public class WorkerTaskManager { private static final EmittingLogger log = new EmittingLogger(WorkerTaskManager.class); @@ -596,6 +596,12 @@ public void workerDisabled() } } + public boolean isWorkerEnabled() + { + Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.SECONDS), "not started"); + return !disabled.get(); + } + private static class TaskDetails { private final Task task; @@ -776,7 +782,13 @@ public void handle() //watches task assignments and updates task statuses inside Zookeeper. When the transition to HTTP is complete //in Overlord as well as MiddleManagers then WorkerTaskMonitor should be deleted, this class should no longer be abstract //and the methods below should be removed. - protected abstract void taskStarted(String taskId); + protected void taskStarted(String taskId) + { - protected abstract void taskAnnouncementChanged(TaskAnnouncement announcement); + } + + protected void taskAnnouncementChanged(TaskAnnouncement announcement) + { + + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/TaskManagementResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/TaskManagementResource.java index 7b4b2b38f960..57046ff80a03 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/TaskManagementResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/TaskManagementResource.java @@ -31,11 +31,10 @@ import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.hrtr.WorkerHolder; import org.apache.druid.indexing.worker.WorkerHistoryItem; -import org.apache.druid.indexing.worker.WorkerTaskMonitor; +import org.apache.druid.indexing.worker.WorkerTaskManager; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.server.coordination.ChangeRequestHistory; import org.apache.druid.server.coordination.ChangeRequestsSnapshot; -import org.apache.druid.server.http.SegmentListerResource; import org.apache.druid.server.http.security.StateResourceFilter; import javax.servlet.AsyncContext; @@ -61,22 +60,22 @@ @ResourceFilters(StateResourceFilter.class) public class TaskManagementResource { - protected static final EmittingLogger log = new EmittingLogger(SegmentListerResource.class); + protected static final EmittingLogger log = new EmittingLogger(TaskManagementResource.class); protected final ObjectMapper jsonMapper; protected final ObjectMapper smileMapper; - private final WorkerTaskMonitor workerTaskMonitor; + private final WorkerTaskManager workerTaskManager; @Inject public TaskManagementResource( @Json ObjectMapper jsonMapper, @Smile ObjectMapper smileMapper, - WorkerTaskMonitor workerTaskMonitor + WorkerTaskManager workerTaskManager ) { this.jsonMapper = jsonMapper; this.smileMapper = smileMapper; - this.workerTaskMonitor = workerTaskMonitor; + this.workerTaskManager = workerTaskManager; } /** @@ -119,7 +118,7 @@ public Void getWorkerState( final ResponseContext context = createContext(req.getHeader("Accept")); - final ListenableFuture> future = workerTaskMonitor.getChangesSince( + final ListenableFuture> future = workerTaskManager.getChangesSince( new ChangeRequestHistory.Counter( counter, hash @@ -205,7 +204,7 @@ public void onFailure(Throwable th) public Response assignTask(Task task) { try { - workerTaskMonitor.assignTask(task); + workerTaskManager.assignTask(task); return Response.ok().build(); } catch (RuntimeException ex) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/WorkerResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/WorkerResource.java index cafabc9e525f..dd3861791507 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/WorkerResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/WorkerResource.java @@ -26,12 +26,14 @@ import com.google.common.collect.Lists; import com.google.common.io.ByteSource; import com.google.inject.Inject; +import com.google.inject.Provider; import com.sun.jersey.spi.container.ResourceFilters; +import org.apache.druid.curator.ZkEnablementConfig; import org.apache.druid.indexing.overlord.TaskRunner; import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; import org.apache.druid.indexing.worker.Worker; import org.apache.druid.indexing.worker.WorkerCuratorCoordinator; -import org.apache.druid.indexing.worker.WorkerTaskMonitor; +import org.apache.druid.indexing.worker.WorkerTaskManager; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.server.http.HttpMediaType; @@ -39,6 +41,7 @@ import org.apache.druid.server.http.security.StateResourceFilter; import org.apache.druid.tasklogs.TaskLogStreamer; +import javax.annotation.Nullable; import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; import javax.ws.rs.POST; @@ -59,23 +62,32 @@ public class WorkerResource private static String DISABLED_VERSION = ""; private final Worker enabledWorker; + + @Nullable // Null, if zk is disabled private final WorkerCuratorCoordinator curatorCoordinator; + private final TaskRunner taskRunner; - private final WorkerTaskMonitor workerTaskManager; + private final WorkerTaskManager workerTaskManager; @Inject public WorkerResource( Worker worker, - WorkerCuratorCoordinator curatorCoordinator, + Provider curatorCoordinatorProvider, TaskRunner taskRunner, - WorkerTaskMonitor workerTaskManager + WorkerTaskManager workerTaskManager, + ZkEnablementConfig zkEnablementConfig ) { this.enabledWorker = worker; - this.curatorCoordinator = curatorCoordinator; this.taskRunner = taskRunner; this.workerTaskManager = workerTaskManager; + + if (zkEnablementConfig.isEnabled()) { + this.curatorCoordinator = curatorCoordinatorProvider.get(); + } else { + this.curatorCoordinator = null; + } } @@ -86,17 +98,19 @@ public WorkerResource( public Response doDisable() { try { - final Worker disabledWorker = new Worker( - enabledWorker.getScheme(), - enabledWorker.getHost(), - enabledWorker.getIp(), - enabledWorker.getCapacity(), - DISABLED_VERSION, - enabledWorker.getCategory() - ); - curatorCoordinator.updateWorkerAnnouncement(disabledWorker); + if (curatorCoordinator != null) { + final Worker disabledWorker = new Worker( + enabledWorker.getScheme(), + enabledWorker.getHost(), + enabledWorker.getIp(), + enabledWorker.getCapacity(), + DISABLED_VERSION, + enabledWorker.getCategory() + ); + curatorCoordinator.updateWorkerAnnouncement(disabledWorker); + } workerTaskManager.workerDisabled(); - return Response.ok(ImmutableMap.of(disabledWorker.getHost(), "disabled")).build(); + return Response.ok(ImmutableMap.of(enabledWorker.getHost(), "disabled")).build(); } catch (Exception e) { return Response.serverError().build(); @@ -110,7 +124,9 @@ public Response doDisable() public Response doEnable() { try { - curatorCoordinator.updateWorkerAnnouncement(enabledWorker); + if (curatorCoordinator != null) { + curatorCoordinator.updateWorkerAnnouncement(enabledWorker); + } workerTaskManager.workerEnabled(); return Response.ok(ImmutableMap.of(enabledWorker.getHost(), "enabled")).build(); } @@ -126,9 +142,7 @@ public Response doEnable() public Response isEnabled() { try { - final Worker theWorker = curatorCoordinator.getWorker(); - final boolean enabled = !theWorker.getVersion().equalsIgnoreCase(DISABLED_VERSION); - return Response.ok(ImmutableMap.of(theWorker.getHost(), enabled)).build(); + return Response.ok(ImmutableMap.of(enabledWorker.getHost(), workerTaskManager.isWorkerEnabled())).build(); } catch (Exception e) { return Response.serverError().build(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/http/WorkerResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/http/WorkerResourceTest.java index 3667ef74e27a..0ad900dd8c44 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/http/WorkerResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/http/WorkerResourceTest.java @@ -25,6 +25,7 @@ import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.test.TestingCluster; import org.apache.druid.curator.PotentiallyGzippedCompressionProvider; +import org.apache.druid.curator.ZkEnablementConfig; import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import org.apache.druid.indexing.worker.Worker; import org.apache.druid.indexing.worker.WorkerCuratorCoordinator; @@ -100,9 +101,10 @@ public String getBase() workerResource = new WorkerResource( worker, - curatorCoordinator, + () -> curatorCoordinator, null, - EasyMock.createNiceMock(WorkerTaskMonitor.class) + EasyMock.createNiceMock(WorkerTaskMonitor.class), + ZkEnablementConfig.ENABLED ); } diff --git a/server/src/main/java/org/apache/druid/curator/CuratorModule.java b/server/src/main/java/org/apache/druid/curator/CuratorModule.java index d5f516b898b3..093265ea6482 100644 --- a/server/src/main/java/org/apache/druid/curator/CuratorModule.java +++ b/server/src/main/java/org/apache/druid/curator/CuratorModule.java @@ -63,6 +63,7 @@ public class CuratorModule implements Module @Override public void configure(Binder binder) { + JsonConfigProvider.bind(binder, CURATOR_CONFIG_PREFIX, ZkEnablementConfig.class); JsonConfigProvider.bind(binder, CURATOR_CONFIG_PREFIX, CuratorConfig.class); JsonConfigProvider.bind(binder, EXHIBITOR_CONFIG_PREFIX, ExhibitorConfig.class); } @@ -70,8 +71,12 @@ public void configure(Binder binder) @Provides @LazySingleton @SuppressForbidden(reason = "System#err") - public CuratorFramework makeCurator(CuratorConfig config, EnsembleProvider ensembleProvider, Lifecycle lifecycle) + public CuratorFramework makeCurator(ZkEnablementConfig zkEnablementConfig, CuratorConfig config, EnsembleProvider ensembleProvider, Lifecycle lifecycle) { + if (!zkEnablementConfig.isEnabled()) { + throw new RuntimeException("Zookeeper is disabled, Can't create CuratorFramework."); + } + final CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); if (!Strings.isNullOrEmpty(config.getZkUser()) && !Strings.isNullOrEmpty(config.getZkPwd())) { builder.authorization( diff --git a/server/src/main/java/org/apache/druid/curator/ZkEnablementConfig.java b/server/src/main/java/org/apache/druid/curator/ZkEnablementConfig.java new file mode 100644 index 000000000000..8553509ecbe0 --- /dev/null +++ b/server/src/main/java/org/apache/druid/curator/ZkEnablementConfig.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.curator; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.java.util.common.StringUtils; + +import java.util.Properties; + +public class ZkEnablementConfig +{ + private static final String PROP_KEY_ENABLED = StringUtils.format("%s.enabled", CuratorModule.CURATOR_CONFIG_PREFIX); + + public static final ZkEnablementConfig ENABLED = new ZkEnablementConfig(true); + + @JsonProperty + private final boolean enabled; + + @JsonCreator + public ZkEnablementConfig(@JsonProperty("enabled") Boolean enabled) + { + this.enabled = enabled == null ? true : enabled.booleanValue(); + } + + public boolean isEnabled() + { + return enabled; + } + + public static boolean isEnabled(Properties properties) + { + String value = properties.getProperty(PROP_KEY_ENABLED); + return value == null ? true : Boolean.parseBoolean(value); + } +} diff --git a/server/src/main/java/org/apache/druid/curator/discovery/DiscoveryModule.java b/server/src/main/java/org/apache/druid/curator/discovery/DiscoveryModule.java index fd899640c67f..a7345b045c30 100644 --- a/server/src/main/java/org/apache/druid/curator/discovery/DiscoveryModule.java +++ b/server/src/main/java/org/apache/druid/curator/discovery/DiscoveryModule.java @@ -46,6 +46,7 @@ import org.apache.curator.x.discovery.details.ServiceCacheListener; import org.apache.druid.client.coordinator.Coordinator; import org.apache.druid.client.indexing.IndexingService; +import org.apache.druid.curator.ZkEnablementConfig; import org.apache.druid.discovery.DruidLeaderSelector; import org.apache.druid.discovery.DruidNodeAnnouncer; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; @@ -66,6 +67,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Properties; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; @@ -86,6 +88,14 @@ public class DiscoveryModule implements Module private static final String INTERNAL_DISCOVERY_PROP = "druid.discovery.type"; private static final String CURATOR_KEY = "curator"; + private boolean isZkEnabled = true; + + @Inject + public void configure(Properties properties) + { + isZkEnabled = ZkEnablementConfig.isEnabled(properties); + } + /** * Requests that the un-annotated DruidNode instance be injected and published as part of the lifecycle. * @@ -155,9 +165,16 @@ public void configure(Binder binder) // Build the binder so that it will at a minimum inject an empty set. DruidBinders.discoveryAnnouncementBinder(binder); - binder.bind(ServiceAnnouncer.class) - .to(Key.get(CuratorServiceAnnouncer.class, Names.named(NAME))) - .in(LazySingleton.class); + if (isZkEnabled) { + binder.bind(ServiceAnnouncer.class) + .to(Key.get(CuratorServiceAnnouncer.class, Names.named(NAME))) + .in(LazySingleton.class); + } else { + binder.bind(Key.get(ServiceAnnouncer.Noop.class, Names.named(NAME))).toInstance(new ServiceAnnouncer.Noop()); + binder.bind(ServiceAnnouncer.class) + .to(Key.get(ServiceAnnouncer.Noop.class, Names.named(NAME))) + .in(LazySingleton.class); + } // internal discovery bindings. PolyBind.createChoiceWithDefault(binder, INTERNAL_DISCOVERY_PROP, Key.get(DruidNodeAnnouncer.class), CURATOR_KEY); @@ -520,7 +537,7 @@ public void close() private static class DruidLeaderSelectorProvider implements Provider { @Inject - private CuratorFramework curatorFramework; + private Provider curatorFramework; @Inject @Self @@ -540,7 +557,7 @@ private static class DruidLeaderSelectorProvider implements Provider announcerProvider, + ObjectMapper jsonMapper, + ZkEnablementConfig zkEnablementConfig ) { this.config = config; - this.announcer = announcer; this.jsonMapper = jsonMapper; this.server = server; @@ -99,13 +107,28 @@ public BatchDataSegmentAnnouncer( return rv; }; - if (this.config.isSkipSegmentAnnouncementOnZk()) { + isSkipSegmentAnnouncementOnZk = !zkEnablementConfig.isEnabled() || config.isSkipSegmentAnnouncementOnZk(); + if (isSkipSegmentAnnouncementOnZk) { dummyZnode = new SegmentZNode("PLACE_HOLDER_ONLY"); + this.announcer = null; } else { dummyZnode = null; + this.announcer = announcerProvider.get(); } } + @VisibleForTesting + public BatchDataSegmentAnnouncer( + DruidServerMetadata server, + final BatchDataSegmentAnnouncerConfig config, + ZkPathsConfig zkPaths, + Announcer announcer, + ObjectMapper jsonMapper + ) + { + this(server, config, zkPaths, () -> announcer, jsonMapper, ZkEnablementConfig.ENABLED); + } + @Override public void announceSegment(DataSegment segment) throws IOException { @@ -124,7 +147,7 @@ public void announceSegment(DataSegment segment) throws IOException changes.addChangeRequest(new SegmentChangeRequestLoad(toAnnounce)); - if (config.isSkipSegmentAnnouncementOnZk()) { + if (isSkipSegmentAnnouncementOnZk) { segmentLookup.put(segment, dummyZnode); return; } @@ -192,7 +215,7 @@ public void unannounceSegment(DataSegment segment) changes.addChangeRequest(new SegmentChangeRequestDrop(segment)); - if (config.isSkipSegmentAnnouncementOnZk()) { + if (isSkipSegmentAnnouncementOnZk) { return; } @@ -231,7 +254,7 @@ public void announceSegments(Iterable segments) throws IOException changesBatch.add(new SegmentChangeRequestLoad(segment)); - if (config.isSkipSegmentAnnouncementOnZk()) { + if (isSkipSegmentAnnouncementOnZk) { segmentLookup.put(segment, dummyZnode); continue; } @@ -262,7 +285,7 @@ public void announceSegments(Iterable segments) throws IOException changes.addChangeRequests(changesBatch); - if (!config.isSkipSegmentAnnouncementOnZk()) { + if (!isSkipSegmentAnnouncementOnZk) { segmentZNode.addSegments(batch); announcer.announce(segmentZNode.getPath(), segmentZNode.getBytes()); } diff --git a/server/src/main/java/org/apache/druid/server/coordination/DataSegmentServerAnnouncer.java b/server/src/main/java/org/apache/druid/server/coordination/DataSegmentServerAnnouncer.java index 6d1497831c4b..b7ad83cfa436 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/DataSegmentServerAnnouncer.java +++ b/server/src/main/java/org/apache/druid/server/coordination/DataSegmentServerAnnouncer.java @@ -27,4 +27,20 @@ public interface DataSegmentServerAnnouncer { void announce(); void unannounce(); + + class Noop implements DataSegmentServerAnnouncer + { + + @Override + public void announce() + { + + } + + @Override + public void unannounce() + { + + } + } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 72c648d84881..87a2de901f8f 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -25,6 +25,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; +import com.google.inject.Provider; import it.unimi.dsi.fastutil.objects.Object2IntMap; import it.unimi.dsi.fastutil.objects.Object2IntMaps; import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; @@ -41,6 +42,7 @@ import org.apache.druid.client.coordinator.Coordinator; import org.apache.druid.client.indexing.IndexingServiceClient; import org.apache.druid.common.config.JacksonConfigManager; +import org.apache.druid.curator.ZkEnablementConfig; import org.apache.druid.curator.discovery.ServiceAnnouncer; import org.apache.druid.discovery.DruidLeaderSelector; import org.apache.druid.guice.ManageLifecycle; @@ -132,7 +134,10 @@ public class DruidCoordinator private final SegmentsMetadataManager segmentsMetadataManager; private final ServerInventoryView serverInventoryView; private final MetadataRuleManager metadataRuleManager; + + @Nullable // Null if zk is disabled private final CuratorFramework curator; + private final ServiceEmitter emitter; private final IndexingServiceClient indexingServiceClient; private final ScheduledExecutorService exec; @@ -158,7 +163,7 @@ public DruidCoordinator( SegmentsMetadataManager segmentsMetadataManager, ServerInventoryView serverInventoryView, MetadataRuleManager metadataRuleManager, - CuratorFramework curator, + Provider curatorProvider, ServiceEmitter emitter, ScheduledExecutorFactory scheduledExecutorFactory, IndexingServiceClient indexingServiceClient, @@ -169,7 +174,8 @@ public DruidCoordinator( BalancerStrategyFactory factory, LookupCoordinatorManager lookupCoordinatorManager, @Coordinator DruidLeaderSelector coordLeaderSelector, - CompactSegments compactSegments + CompactSegments compactSegments, + ZkEnablementConfig zkEnablementConfig ) { this( @@ -179,7 +185,7 @@ public DruidCoordinator( segmentsMetadataManager, serverInventoryView, metadataRuleManager, - curator, + curatorProvider, emitter, scheduledExecutorFactory, indexingServiceClient, @@ -191,7 +197,8 @@ public DruidCoordinator( factory, lookupCoordinatorManager, coordLeaderSelector, - compactSegments + compactSegments, + zkEnablementConfig ); } @@ -202,7 +209,7 @@ public DruidCoordinator( SegmentsMetadataManager segmentsMetadataManager, ServerInventoryView serverInventoryView, MetadataRuleManager metadataRuleManager, - CuratorFramework curator, + Provider curatorProvider, ServiceEmitter emitter, ScheduledExecutorFactory scheduledExecutorFactory, IndexingServiceClient indexingServiceClient, @@ -214,7 +221,8 @@ public DruidCoordinator( BalancerStrategyFactory factory, LookupCoordinatorManager lookupCoordinatorManager, DruidLeaderSelector coordLeaderSelector, - CompactSegments compactSegments + CompactSegments compactSegments, + ZkEnablementConfig zkEnablementConfig ) { this.config = config; @@ -224,7 +232,11 @@ public DruidCoordinator( this.segmentsMetadataManager = segmentsMetadataManager; this.serverInventoryView = serverInventoryView; this.metadataRuleManager = metadataRuleManager; - this.curator = curator; + if (zkEnablementConfig.isEnabled()) { + this.curator = curatorProvider.get(); + } else { + this.curator = null; + } this.emitter = emitter; this.indexingServiceClient = indexingServiceClient; this.taskMaster = taskMaster; @@ -443,7 +455,7 @@ public void moveSegment( () -> { try { if (serverInventoryView.isSegmentLoadedByServer(toServer.getName(), segment) && - curator.checkExists().forPath(toLoadQueueSegPath) == null && + (curator == null || curator.checkExists().forPath(toLoadQueueSegPath) == null) && !dropPeon.getSegmentsToDrop().contains(segment)) { dropPeon.dropSegment(segment, loadPeonCallback); } else { diff --git a/server/src/main/java/org/apache/druid/server/coordinator/LoadQueueTaskMaster.java b/server/src/main/java/org/apache/druid/server/coordinator/LoadQueueTaskMaster.java index 30263094a3d8..e4f22832e9a1 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/LoadQueueTaskMaster.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/LoadQueueTaskMaster.java @@ -20,6 +20,7 @@ package org.apache.druid.server.coordinator; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Provider; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.utils.ZKPaths; import org.apache.druid.client.ImmutableDruidServer; @@ -34,7 +35,7 @@ */ public class LoadQueueTaskMaster { - private final CuratorFramework curator; + private final Provider curatorFrameworkProvider; private final ObjectMapper jsonMapper; private final ScheduledExecutorService peonExec; private final ExecutorService callbackExec; @@ -43,7 +44,7 @@ public class LoadQueueTaskMaster private final ZkPathsConfig zkPaths; public LoadQueueTaskMaster( - CuratorFramework curator, + Provider curatorFrameworkProvider, ObjectMapper jsonMapper, ScheduledExecutorService peonExec, ExecutorService callbackExec, @@ -52,7 +53,7 @@ public LoadQueueTaskMaster( ZkPathsConfig zkPaths ) { - this.curator = curator; + this.curatorFrameworkProvider = curatorFrameworkProvider; this.jsonMapper = jsonMapper; this.peonExec = peonExec; this.callbackExec = callbackExec; @@ -67,7 +68,7 @@ public LoadQueuePeon giveMePeon(ImmutableDruidServer server) return new HttpLoadQueuePeon(server.getURL(), jsonMapper, httpClient, config, peonExec, callbackExec); } else { return new CuratorLoadQueuePeon( - curator, + curatorFrameworkProvider.get(), ZKPaths.makePath(zkPaths.getLoadQueuePath(), server.getName()), jsonMapper, peonExec, diff --git a/server/src/main/java/org/apache/druid/server/http/HistoricalResource.java b/server/src/main/java/org/apache/druid/server/http/HistoricalResource.java index c338854dca20..4bc48f444df1 100644 --- a/server/src/main/java/org/apache/druid/server/http/HistoricalResource.java +++ b/server/src/main/java/org/apache/druid/server/http/HistoricalResource.java @@ -21,7 +21,7 @@ import com.google.common.collect.ImmutableMap; import com.sun.jersey.spi.container.ResourceFilters; -import org.apache.druid.server.coordination.ZkCoordinator; +import org.apache.druid.server.coordination.SegmentLoadDropHandler; import org.apache.druid.server.http.security.StateResourceFilter; import javax.inject.Inject; @@ -34,14 +34,14 @@ @Path("/druid/historical/v1") public class HistoricalResource { - private final ZkCoordinator coordinator; + private final SegmentLoadDropHandler segmentLoadDropHandler; @Inject public HistoricalResource( - ZkCoordinator coordinator + SegmentLoadDropHandler segmentLoadDropHandler ) { - this.coordinator = coordinator; + this.segmentLoadDropHandler = segmentLoadDropHandler; } @GET @@ -50,14 +50,14 @@ public HistoricalResource( @Produces(MediaType.APPLICATION_JSON) public Response getLoadStatus() { - return Response.ok(ImmutableMap.of("cacheInitialized", coordinator.isStarted())).build(); + return Response.ok(ImmutableMap.of("cacheInitialized", segmentLoadDropHandler.isStarted())).build(); } @GET @Path("/readiness") public Response getReadiness() { - if (coordinator.isStarted()) { + if (segmentLoadDropHandler.isStarted()) { return Response.ok().build(); } else { return Response.status(Response.Status.SERVICE_UNAVAILABLE).build(); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java index c44c0d8d3fb3..dd382e31a65f 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java @@ -38,6 +38,7 @@ import org.apache.druid.common.config.JacksonConfigManager; import org.apache.druid.curator.CuratorTestBase; import org.apache.druid.curator.CuratorUtils; +import org.apache.druid.curator.ZkEnablementConfig; import org.apache.druid.curator.discovery.NoopServiceAnnouncer; import org.apache.druid.discovery.DruidLeaderSelector; import org.apache.druid.jackson.DefaultObjectMapper; @@ -223,7 +224,7 @@ public String getBase() segmentsMetadataManager, baseView, metadataRuleManager, - curator, + () -> curator, new NoopServiceEmitter(), scheduledExecutorFactory, null, @@ -249,7 +250,8 @@ public void unannounce(DruidNode node) new CostBalancerStrategyFactory(), EasyMock.createNiceMock(LookupCoordinatorManager.class), new TestDruidLeaderSelector(), - null + null, + ZkEnablementConfig.ENABLED ); } @@ -521,7 +523,7 @@ public String getBase() segmentsMetadataManager, baseView, metadataRuleManager, - curator, + () -> curator, new NoopServiceEmitter(), scheduledExecutorFactory, null, @@ -547,7 +549,8 @@ public void unannounce(DruidNode node) new CostBalancerStrategyFactory(), EasyMock.createNiceMock(LookupCoordinatorManager.class), new TestDruidLeaderSelector(), - null + null, + ZkEnablementConfig.ENABLED ); } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java index e48bf9430211..adf013b4463c 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java @@ -39,6 +39,7 @@ import org.apache.druid.common.config.JacksonConfigManager; import org.apache.druid.curator.CuratorTestBase; import org.apache.druid.curator.CuratorUtils; +import org.apache.druid.curator.ZkEnablementConfig; import org.apache.druid.curator.discovery.NoopServiceAnnouncer; import org.apache.druid.discovery.DruidLeaderSelector; import org.apache.druid.jackson.DefaultObjectMapper; @@ -188,7 +189,7 @@ public String getBase() segmentsMetadataManager, serverInventoryView, metadataRuleManager, - curator, + () -> curator, serviceEmitter, scheduledExecutorFactory, null, @@ -214,7 +215,8 @@ public void unannounce(DruidNode node) new CostBalancerStrategyFactory(), EasyMock.createNiceMock(LookupCoordinatorManager.class), new TestDruidLeaderSelector(), - null + null, + ZkEnablementConfig.ENABLED ); } diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java index 8d1938dc09b2..4a0ed6f8283e 100644 --- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java +++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java @@ -25,6 +25,7 @@ import com.google.inject.Inject; import com.google.inject.Key; import com.google.inject.Module; +import com.google.inject.Provider; import com.google.inject.Provides; import com.google.inject.name.Names; import io.airlift.airline.Command; @@ -253,7 +254,7 @@ public void configure(Binder binder) @Provides @LazySingleton public LoadQueueTaskMaster getLoadQueueTaskMaster( - CuratorFramework curator, + Provider curatorFrameworkProvider, ObjectMapper jsonMapper, ScheduledExecutorFactory factory, DruidCoordinatorConfig config, @@ -274,7 +275,7 @@ public LoadQueueTaskMaster getLoadQueueTaskMaster( } ExecutorServices.manageLifecycle(lifecycle, callBackExec); return new LoadQueueTaskMaster( - curator, + curatorFrameworkProvider, jsonMapper, factory.create(1, "Master-PeonExec--%d"), callBackExec, diff --git a/services/src/main/java/org/apache/druid/cli/CliHistorical.java b/services/src/main/java/org/apache/druid/cli/CliHistorical.java index 23c771cf3019..98af00cc83ec 100644 --- a/services/src/main/java/org/apache/druid/cli/CliHistorical.java +++ b/services/src/main/java/org/apache/druid/cli/CliHistorical.java @@ -20,11 +20,13 @@ package org.apache.druid.cli; import com.google.common.collect.ImmutableList; +import com.google.inject.Inject; import com.google.inject.Key; import com.google.inject.Module; import com.google.inject.name.Names; import io.airlift.airline.Command; import org.apache.druid.client.cache.CacheConfig; +import org.apache.druid.curator.ZkEnablementConfig; import org.apache.druid.discovery.DataNodeService; import org.apache.druid.discovery.LookupNodeService; import org.apache.druid.discovery.NodeRole; @@ -56,6 +58,7 @@ import org.eclipse.jetty.server.Server; import java.util.List; +import java.util.Properties; @Command( name = "historical", @@ -65,11 +68,19 @@ public class CliHistorical extends ServerRunnable { private static final Logger log = new Logger(CliHistorical.class); + private boolean isZkEnabled = true; + public CliHistorical() { super(log); } + @Inject + public void configure(Properties properties) + { + isZkEnabled = ZkEnablementConfig.isEnabled(properties); + } + @Override protected List getModules() { @@ -95,10 +106,13 @@ protected List getModules() binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class); binder.bind(QueryCountStatsProvider.class).to(QueryResource.class); Jerseys.addResource(binder, QueryResource.class); - Jerseys.addResource(binder, HistoricalResource.class); Jerseys.addResource(binder, SegmentListerResource.class); + Jerseys.addResource(binder, HistoricalResource.class); LifecycleModule.register(binder, QueryResource.class); - LifecycleModule.register(binder, ZkCoordinator.class); + + if (isZkEnabled) { + LifecycleModule.register(binder, ZkCoordinator.class); + } JsonConfigProvider.bind(binder, "druid.historical.cache", CacheConfig.class); binder.install(new CacheModule()); diff --git a/services/src/main/java/org/apache/druid/cli/CliIndexer.java b/services/src/main/java/org/apache/druid/cli/CliIndexer.java index 3b32e9dd66b9..9577c0a46fb8 100644 --- a/services/src/main/java/org/apache/druid/cli/CliIndexer.java +++ b/services/src/main/java/org/apache/druid/cli/CliIndexer.java @@ -28,6 +28,7 @@ import com.google.inject.util.Providers; import io.airlift.airline.Command; import org.apache.druid.client.DruidServer; +import org.apache.druid.curator.ZkEnablementConfig; import org.apache.druid.discovery.DataNodeService; import org.apache.druid.discovery.LookupNodeService; import org.apache.druid.discovery.NodeRole; @@ -85,14 +86,21 @@ public class CliIndexer extends ServerRunnable { private static final Logger log = new Logger(CliIndexer.class); - @Inject private Properties properties; + private boolean isZkEnabled = true; public CliIndexer() { super(log); } + @Inject + public void configure(Properties properties) + { + this.properties = properties; + isZkEnabled = ZkEnablementConfig.isEnabled(properties); + } + @Override protected List getModules() { @@ -128,7 +136,7 @@ public void configure(Binder binder) CliPeon.bindPeonDataSegmentHandlers(binder); CliPeon.bindRealtimeCache(binder); CliPeon.bindCoordinatorHandoffNotiferAndClient(binder); - CliMiddleManager.bindWorkerManagementClasses(binder); + CliMiddleManager.bindWorkerManagementClasses(binder, isZkEnabled); binder.bind(AppenderatorsManager.class) .to(UnifiedIndexerAppenderatorsManager.class) diff --git a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java index d273c16ad331..37e23901a4bf 100644 --- a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java +++ b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableList; import com.google.inject.Binder; +import com.google.inject.Inject; import com.google.inject.Key; import com.google.inject.Module; import com.google.inject.Provides; @@ -31,6 +32,7 @@ import io.airlift.airline.Command; import org.apache.druid.client.indexing.HttpIndexingServiceClient; import org.apache.druid.client.indexing.IndexingServiceClient; +import org.apache.druid.curator.ZkEnablementConfig; import org.apache.druid.discovery.NodeRole; import org.apache.druid.discovery.WorkerNodeService; import org.apache.druid.guice.IndexingServiceFirehoseModule; @@ -55,6 +57,7 @@ import org.apache.druid.indexing.overlord.TaskRunner; import org.apache.druid.indexing.worker.Worker; import org.apache.druid.indexing.worker.WorkerCuratorCoordinator; +import org.apache.druid.indexing.worker.WorkerTaskManager; import org.apache.druid.indexing.worker.WorkerTaskMonitor; import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.indexing.worker.http.ShuffleResource; @@ -72,6 +75,7 @@ import org.eclipse.jetty.server.Server; import java.util.List; +import java.util.Properties; /** * @@ -84,11 +88,19 @@ public class CliMiddleManager extends ServerRunnable { private static final Logger log = new Logger(CliMiddleManager.class); + private boolean isZkEnabled = true; + public CliMiddleManager() { super(log); } + @Inject + public void configure(Properties properties) + { + isZkEnabled = ZkEnablementConfig.isEnabled(properties); + } + @Override protected List getModules() { @@ -130,7 +142,7 @@ public void configure(Binder binder) .in(LazySingleton.class); binder.bind(DropwizardRowIngestionMetersFactory.class).in(LazySingleton.class); - bindWorkerManagementClasses(binder); + bindWorkerManagementClasses(binder, isZkEnabled); binder.bind(JettyServerInitializer.class) .to(MiddleManagerJettyServerInitializer.class) @@ -190,11 +202,17 @@ public WorkerNodeService getWorkerNodeService(WorkerConfig workerConfig) ); } - public static void bindWorkerManagementClasses(Binder binder) + public static void bindWorkerManagementClasses(Binder binder, boolean isZkEnabled) { - binder.bind(WorkerTaskMonitor.class).in(ManageLifecycle.class); - binder.bind(WorkerCuratorCoordinator.class).in(ManageLifecycle.class); - LifecycleModule.register(binder, WorkerTaskMonitor.class); + if (isZkEnabled) { + binder.bind(WorkerTaskManager.class).to(WorkerTaskMonitor.class); + binder.bind(WorkerTaskMonitor.class).in(ManageLifecycle.class); + binder.bind(WorkerCuratorCoordinator.class).in(ManageLifecycle.class); + LifecycleModule.register(binder, WorkerTaskMonitor.class); + } else { + binder.bind(WorkerTaskManager.class).in(ManageLifecycle.class); + } + Jerseys.addResource(binder, WorkerResource.class); Jerseys.addResource(binder, TaskManagementResource.class); } From 2382d1d351ec9ed3ebe03e39f678b9ade28584f4 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Mon, 26 Oct 2020 12:12:00 -0700 Subject: [PATCH 2/4] fix DruidCoordinatorTest compilation --- .../apache/druid/server/coordinator/DruidCoordinatorTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java index 7cdfc6dc2420..8ba4b0f20946 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java @@ -705,7 +705,8 @@ public void testBalancerThreadNumber() null, null, null, - null + null, + ZkEnablementConfig.ENABLED ); DruidCoordinator.DutiesRunnable duty = c.new DutiesRunnable(Collections.emptyList(), 0); From 2ac97849bfe22d017e964b28a807ef1da034a690 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Mon, 26 Oct 2020 14:44:48 -0700 Subject: [PATCH 3/4] fix test in DruidCoordinatorTest --- .../apache/druid/server/coordinator/DruidCoordinatorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java index 8ba4b0f20946..34fe944f61e4 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java @@ -694,7 +694,7 @@ public void testBalancerThreadNumber() null, null, null, - null, + () -> null, null, scheduledExecutorFactory, null, From 02badd876adbe766d8701be5ed1fb62542a85e2f Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Mon, 26 Oct 2020 15:12:14 -0700 Subject: [PATCH 4/4] fix strict compilation --- .../org/apache/druid/cli/CliHistoricalForQueryRetryTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/integration-tests/src/main/java/org/apache/druid/cli/CliHistoricalForQueryRetryTest.java b/integration-tests/src/main/java/org/apache/druid/cli/CliHistoricalForQueryRetryTest.java index 6663e99ceb58..6ef34b11de96 100644 --- a/integration-tests/src/main/java/org/apache/druid/cli/CliHistoricalForQueryRetryTest.java +++ b/integration-tests/src/main/java/org/apache/druid/cli/CliHistoricalForQueryRetryTest.java @@ -43,6 +43,7 @@ public CliHistoricalForQueryRetryTest() } @Inject + @Override public void configure(Properties properties) { log.info("Historical is configured for testing query retry on missing segments");