Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into Unnest_MSQ
Browse files Browse the repository at this point in the history
  • Loading branch information
somu-imply committed Aug 29, 2023
2 parents be06d35 + 8885805 commit 12bd9fa
Show file tree
Hide file tree
Showing 64 changed files with 1,535 additions and 2,046 deletions.
45 changes: 23 additions & 22 deletions docs/configuration/index.md

Large diffs are not rendered by default.

12 changes: 7 additions & 5 deletions docs/data-management/automatic-compaction.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ title: "Automatic compaction"
-->

In Apache Druid, compaction is a special type of ingestion task that reads data from a Druid datasource and writes it back into the same datasource. A common use case for this is to [optimally size segments](../operations/segment-optimization.md) after ingestion to improve query performance. Automatic compaction, or auto-compaction, refers to the system for automatic execution of compaction tasks managed by the [Druid Coordinator](../design/coordinator.md).
This topic guides you through setting up automatic compaction for your Druid cluster. See the [examples](#examples) for common use cases for automatic compaction.

## How Druid manages automatic compaction

The Coordinator [indexing period](../configuration/index.md#coordinator-operation), `druid.coordinator.period.indexingPeriod`, controls the frequency of compaction tasks.
The default indexing period is 30 minutes, meaning that the Coordinator first checks for segments to compact at most 30 minutes from when auto-compaction is enabled.
Expand All @@ -33,9 +36,12 @@ At every invocation of auto-compaction, the Coordinator initiates a [segment sea
When there are eligible segments to compact, the Coordinator issues compaction tasks based on available worker capacity.
If a compaction task takes longer than the indexing period, the Coordinator waits for it to finish before resuming the period for segment search.

:::info
Auto-compaction skips datasources that have a segment granularity of `ALL`.
:::

As a best practice, you should set up auto-compaction for all Druid datasources. You can run compaction tasks manually for cases where you want to allocate more system resources. For example, you may choose to run multiple compaction tasks in parallel to compact an existing datasource for the first time. See [Compaction](compaction.md) for additional details and use cases.

This topic guides you through setting up automatic compaction for your Druid cluster. See the [examples](#examples) for common use cases for automatic compaction.

## Enable automatic compaction

Expand Down Expand Up @@ -174,10 +180,6 @@ The following auto-compaction configuration compacts existing `HOUR` segments in
}
```

:::info
Auto-compaction skips datasources containing ALL granularity segments when the target granularity is different.
:::

### Update partitioning scheme

For your `wikipedia` datasource, you want to optimize segment access when regularly ingesting data without compromising compute time when querying the data. Your ingestion spec for batch append uses [dynamic partitioning](../ingestion/native-batch.md#dynamic-partitioning) to optimize for write-time operations, while your stream ingestion partitioning is configured by the stream service. You want to implement auto-compaction to reorganize the data with a suitable read-time partitioning using [multi-dimension range partitioning](../ingestion/native-batch.md#multi-dimension-range-partitioning). Based on the dimensions frequently accessed in queries, you wish to partition on the following dimensions: `channel`, `countryName`, `namespace`.
Expand Down
24 changes: 12 additions & 12 deletions docs/querying/filters.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,19 @@ Druid's SQL planner uses the equality filter by default instead of selector filt
### Example: equivalent of `WHERE someColumn = 'hello'`

```json
{ "type": "equality", "column": "someColumn", "matchValueType": "STRING", "matchValue": "hello" }
{ "type": "equals", "column": "someColumn", "matchValueType": "STRING", "matchValue": "hello" }
```

### Example: equivalent of `WHERE someNumericColumn = 1.23`

```json
{ "type": "equality", "column": "someNumericColumn", "matchValueType": "DOUBLE", "matchValue": 1.23 }
{ "type": "equals", "column": "someNumericColumn", "matchValueType": "DOUBLE", "matchValue": 1.23 }
```

### Example: equivalent of `WHERE someArrayColumn = ARRAY[1, 2, 3]`

```json
{ "type": "equality", "column": "someArrayColumn", "matchValueType": "ARRAY<LONG>", "matchValue": [1, 2, 3] }
{ "type": "equals", "column": "someArrayColumn", "matchValueType": "ARRAY<LONG>", "matchValue": [1, 2, 3] }
```


Expand Down Expand Up @@ -160,8 +160,8 @@ Note that the column comparison filter converts all values to strings prior to c
{
"type": "and",
"fields": [
{ "type": "equality", "column": "someColumn", "matchValue": "a", "matchValueType": "STRING" },
{ "type": "equality", "column": "otherColumn", "matchValue": 1234, "matchValueType": "LONG" },
{ "type": "equals", "column": "someColumn", "matchValue": "a", "matchValueType": "STRING" },
{ "type": "equals", "column": "otherColumn", "matchValue": 1234, "matchValueType": "LONG" },
{ "type": "null", "column": "anotherColumn" }
]
}
Expand All @@ -180,8 +180,8 @@ Note that the column comparison filter converts all values to strings prior to c
{
"type": "or",
"fields": [
{ "type": "equality", "column": "someColumn", "matchValue": "a", "matchValueType": "STRING" },
{ "type": "equality", "column": "otherColumn", "matchValue": 1234, "matchValueType": "LONG" },
{ "type": "equals", "column": "someColumn", "matchValue": "a", "matchValueType": "STRING" },
{ "type": "equals", "column": "otherColumn", "matchValue": 1234, "matchValueType": "LONG" },
{ "type": "null", "column": "anotherColumn" }
]
}
Expand Down Expand Up @@ -709,7 +709,7 @@ All filters return true if any one of the dimension values is satisfies the filt
Given a multi-value STRING row with values `['a', 'b', 'c']`, a filter such as

```json
{ "type": "equality", "column": "someMultiValueColumn", "matchValueType": "STRING", "matchValue": "b" }
{ "type": "equals", "column": "someMultiValueColumn", "matchValueType": "STRING", "matchValue": "b" }
```
will successfully match the entire row. This can produce sometimes unintuitive behavior when coupled with the implicit UNNEST functionality of Druid [GroupBy](./groupbyquery.md) and [TopN](./topnquery.md) queries.

Expand All @@ -724,13 +724,13 @@ Given a multi-value STRING row with values `['a', 'b', 'c']`, and filter such as
"type": "and",
"fields": [
{
"type": "equality",
"type": "equals",
"column": "someMultiValueColumn",
"matchValueType": "STRING",
"matchValue": "a"
},
{
"type": "equality",
"type": "equals",
"column": "someMultiValueColumn",
"matchValueType": "STRING",
"matchValue": "b"
Expand All @@ -754,7 +754,7 @@ the "regex" filter) the numeric column values will be converted to strings durin

```json
{
"type": "equality",
"type": "equals",
"dimension": "myFloatColumn",
"matchValueType": "FLOAT",
"value": 10.1
Expand Down Expand Up @@ -811,7 +811,7 @@ If you want to interpret the timestamp with a specific format, timezone, or loca

```json
{
"type": "equality",
"type": "equals",
"dimension": "__time",
"matchValueType": "LONG",
"value": 124457387532
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ protected synchronized TaskStatus run(Job job, long launchTimeout, long timeout)
}
catch (Exception e) {
log.info("Failed to run task: %s", taskId.getOriginalTaskId());
shutdown();
throw e;
}
finally {
Expand Down Expand Up @@ -168,10 +167,9 @@ protected synchronized TaskStatus join(long timeout) throws IllegalStateExceptio
finally {
try {
saveLogs();
shutdown();
}
catch (Exception e) {
log.warn(e, "Task [%s] cleanup failed", taskId);
log.warn(e, "Log processing failed for task [%s]", taskId);
}

stopTask();
Expand All @@ -188,7 +186,7 @@ protected synchronized TaskStatus join(long timeout) throws IllegalStateExceptio
*/
protected void shutdown()
{
if (State.PENDING.equals(state.get()) || State.RUNNING.equals(state.get())) {
if (State.PENDING.equals(state.get()) || State.RUNNING.equals(state.get()) || State.STOPPED.equals(state.get())) {
kubernetesClient.deletePeonJob(taskId);
}
}
Expand Down Expand Up @@ -223,7 +221,7 @@ protected State getState()
*/
protected TaskLocation getTaskLocation()
{
if (!State.RUNNING.equals(state.get())) {
if (State.PENDING.equals(state.get()) || State.NOT_STARTED.equals(state.get())) {
log.debug("Can't get task location for non-running job. [%s]", taskId.getOriginalTaskId());
return TaskLocation.unknown();
}
Expand Down Expand Up @@ -251,7 +249,6 @@ protected TaskLocation getTaskLocation()
Boolean.parseBoolean(pod.getMetadata().getAnnotations().getOrDefault(DruidK8sConstants.TLS_ENABLED, "false")),
pod.getMetadata() != null ? pod.getMetadata().getName() : ""
);
log.info("K8s task %s is running at location %s", taskId.getOriginalTaskId(), taskLocation);
}

return taskLocation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,6 @@ protected TaskStatus doTask(Task task, boolean run)
KubernetesWorkItem workItem = tasks.get(task.getId());

if (workItem == null) {
throw new ISE("Task [%s] disappeared", task.getId());
}

if (workItem.isShutdownRequested()) {
throw new ISE("Task [%s] has been shut down", task.getId());
}

Expand Down Expand Up @@ -213,11 +209,6 @@ protected TaskStatus doTask(Task task, boolean run)
log.error(e, "Task [%s] execution caught an exception", task.getId());
throw new RuntimeException(e);
}
finally {
synchronized (tasks) {
tasks.remove(task.getId());
}
}
}

@VisibleForTesting
Expand Down Expand Up @@ -271,6 +262,10 @@ public void shutdown(String taskid, String reason)
return;
}

synchronized (tasks) {
tasks.remove(taskid);
}

workItem.shutdown();
}

Expand Down Expand Up @@ -396,12 +391,6 @@ public Map<String, Long> getBlacklistedTaskSlotCount()
return Collections.emptyMap();
}

@Override
public boolean isK8sTaskRunner()
{
return true;
}

@Override
public void unregisterListener(String listenerId)
{
Expand Down Expand Up @@ -446,6 +435,17 @@ public Collection<TaskRunnerWorkItem> getPendingTasks()
.collect(Collectors.toList());
}

@Override
public TaskLocation getTaskLocation(String taskId)
{
final KubernetesWorkItem workItem = tasks.get(taskId);
if (workItem == null) {
return TaskLocation.unknown();
} else {
return workItem.getLocation();
}
}

@Nullable
@Override
public RunnerTaskState getRunnerTaskState(String taskId)
Expand All @@ -457,4 +457,16 @@ public RunnerTaskState getRunnerTaskState(String taskId)

return workItem.getRunnerTaskState();
}

@Override
public int getTotalCapacity()
{
return config.getCapacity();
}

@Override
public int getUsedCapacity()
{
return tasks.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,10 @@
import org.apache.druid.java.util.common.ISE;

import java.io.InputStream;
import java.util.concurrent.atomic.AtomicBoolean;

public class KubernetesWorkItem extends TaskRunnerWorkItem
{
private final Task task;

private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
private KubernetesPeonLifecycle kubernetesPeonLifecycle = null;

public KubernetesWorkItem(Task task, ListenableFuture<TaskStatus> statusFuture)
Expand All @@ -53,19 +50,13 @@ protected synchronized void setKubernetesPeonLifecycle(KubernetesPeonLifecycle k

protected synchronized void shutdown()
{
this.shutdownRequested.set(true);

if (this.kubernetesPeonLifecycle != null) {
this.kubernetesPeonLifecycle.startWatchingLogs();
this.kubernetesPeonLifecycle.shutdown();
}
}

protected boolean isShutdownRequested()
{
return shutdownRequested.get();
}

protected boolean isPending()
{
return RunnerTaskState.PENDING.equals(getRunnerTaskState());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,6 @@ protected synchronized TaskStatus join(long timeout)
EasyMock.anyLong(),
EasyMock.eq(TimeUnit.MILLISECONDS)
)).andReturn(null);
EasyMock.expect(kubernetesClient.deletePeonJob(
new K8sTaskId(ID)
)).andReturn(true);
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());
stateListener.stateChanged(KubernetesPeonLifecycle.State.PENDING, ID);
EasyMock.expectLastCall().once();
Expand Down Expand Up @@ -245,7 +242,6 @@ public void test_join_withoutJob_returnsFailedTaskStatus() throws IOException
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();
EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);

replayAll();

Expand Down Expand Up @@ -298,7 +294,6 @@ public void test_join() throws IOException
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();
EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);

Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());

Expand Down Expand Up @@ -353,7 +348,6 @@ public void test_join_whenCalledMultipleTimes_raisesIllegalStateException() thro
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();
EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);

Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());

Expand Down Expand Up @@ -408,7 +402,6 @@ public void test_join_withoutTaskStatus_returnsFailedTaskStatus() throws IOExcep
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();
EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);

Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());

Expand Down Expand Up @@ -459,7 +452,6 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskStatus_returnsFaile
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();
EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);

Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());

Expand Down Expand Up @@ -512,7 +504,6 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskLogs_isIgnored() th
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();
EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);

Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());

Expand Down Expand Up @@ -554,8 +545,6 @@ public void test_join_whenRuntimeExceptionThrownWhileWaitingForKubernetesJob_thr
logWatch.close();
EasyMock.expectLastCall();

EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);

Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());

replayAll();
Expand Down Expand Up @@ -908,8 +897,11 @@ public void test_getTaskLocation_withStoppedTaskState_returnsUnknown()
stateListener
);
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.STOPPED);
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.absent()).once();

replayAll();
Assert.assertEquals(TaskLocation.unknown(), peonLifecycle.getTaskLocation());
verifyAll();
}

private void setPeonLifecycleState(KubernetesPeonLifecycle peonLifecycle, KubernetesPeonLifecycle.State state)
Expand Down
Loading

0 comments on commit 12bd9fa

Please sign in to comment.