Skip to content

Commit

Permalink
Followup changes to 15817 (Segment schema publishing and polling) (#1…
Browse files Browse the repository at this point in the history
…6368)

* Fix build

* Nit changes in KillUnreferencedSegmentSchema

* Replace reference to the abbreviation SMQ with Metadata Query, rename inTransit maps in schema cache

* nitpicks

* Remove reference to smq abbreviation from integration-tests

* Remove reference to smq abbreviation from integration-tests

* minor change

* Update index.md

* Add delimiter while computing schema fingerprint hash
  • Loading branch information
findingrish authored May 3, 2024
1 parent 5fae20d commit c61c378
Show file tree
Hide file tree
Showing 47 changed files with 213 additions and 186 deletions.
25 changes: 22 additions & 3 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ The `file` request logger stores daily request logs on disk.
|--------|-----------|-------|
|`druid.request.logging.dir`|Historical, Realtime, and Broker services maintain request logs of all of the requests they get (interaction is via POST, so normal request logs don’t generally capture information about the actual query), this specifies the directory to store the request logs in|none|
|`druid.request.logging.filePattern`|[Joda datetime format](http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html) for each file|"yyyy-MM-dd'.log'"|
| `druid.request.logging.durationToRetain`| Period to retain the request logs on disk. The period should be at least longer than `P1D`.| none
| `druid.request.logging.durationToRetain`| Period to retain the request logs on disk. The period should be at least longer than `P1D`.| none|

The format of request logs is TSV, one line per requests, with five fields: timestamp, remote\_addr, native\_query, query\_context, sql\_query.

Expand Down Expand Up @@ -581,6 +581,23 @@ This deep storage is used to interface with Cassandra. You must load the `druid-
|`druid.storage.keyspace`|Cassandra key space.|none|


#### Centralized datasource schema

Centralized datasource schema is an [experimental feature](../development/experimental.md) to centralized datasource schema building within the Coordinator.
Traditionally, the datasource schema is built in the Brokers by combining schema for all the available segments of a datasource.
Brokers issue segment metadata query to data nodes and tasks to fetch segment schema.
In the new arrangement, tasks publish segment schema along with segment metadata to the database and schema for realtime segments is periodically pushed to the Coordinator in the segment announcement flow.
This enables Coordinator to cache segment schemas and build datasource schema by combining segment schema.
Brokers query the datasource schema from the Coordinator, while retaining the ability to build table schema if the
need arises.

|Property|Description|Default|Required|
|-----|-----------|-------|--------|
|`druid.centralizedDatasourceSchema.enabled`|Boolean flag for enabling datasource schema building in the Coordinator, this should be specified in the common runtime properties.|false|No.|
|`druid.indexer.fork.property.druid.centralizedDatasourceSchema.enabled`| This config should be set when CentralizedDatasourceSchema feature is enabled. This should be specified in the MiddleManager runtime properties.|false|No.|

For, stale schema cleanup configs, refer to properties with the prefix `druid.coordinator.kill.segmentSchema` in [Metadata Management](#metadata-management).

### Ingestion security configuration

#### HDFS input source
Expand Down Expand Up @@ -878,7 +895,6 @@ These Coordinator static configurations can be defined in the `coordinator/runti
|`druid.coordinator.loadqueuepeon.http.batchSize`|Number of segment load/drop requests to batch in one HTTP request. Note that it must be smaller than `druid.segmentCache.numLoadingThreads` config on Historical service.|1|
|`druid.coordinator.asOverlord.enabled`|Boolean value for whether this Coordinator service should act like an Overlord as well. This configuration allows users to simplify a Druid cluster by not having to deploy any standalone Overlord services. If set to true, then Overlord console is available at `http://coordinator-host:port/console.html` and be sure to set `druid.coordinator.asOverlord.overlordService` also.|false|
|`druid.coordinator.asOverlord.overlordService`| Required, if `druid.coordinator.asOverlord.enabled` is `true`. This must be same value as `druid.service` on standalone Overlord services and `druid.selectors.indexing.serviceName` on Middle Managers.|NULL|
|`druid.centralizedDatasourceSchema.enabled`|Boolean flag for enabling datasource schema building on the Coordinator. Note, when using MiddleManager to launch task, set `druid.indexer.fork.property.druid.centralizedDatasourceSchema.enabled` in MiddleManager runtime config. |false|

##### Metadata management

Expand All @@ -899,6 +915,9 @@ These Coordinator static configurations can be defined in the `coordinator/runti
|`druid.coordinator.kill.datasource.on`| Boolean value for whether to enable automatic deletion of datasource metadata (Note: datasource metadata only exists for datasource created from supervisor). If set to true, Coordinator will periodically remove datasource metadata of terminated supervisor from the datasource table in metadata storage. | No | True|
|`druid.coordinator.kill.datasource.period`| How often to do automatic deletion of datasource metadata in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Value must be equal to or greater than `druid.coordinator.period.metadataStoreManagementPeriod`. Only applies if `druid.coordinator.kill.datasource.on` is set to true.| No| `P1D`|
|`druid.coordinator.kill.datasource.durationToRetain`| Duration of datasource metadata to be retained from created time in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Only applies if `druid.coordinator.kill.datasource.on` is set to true.| Yes if `druid.coordinator.kill.datasource.on` is set to true.| `P90D`|
|`druid.coordinator.kill.segmentSchema.on`| Boolean value for whether to enable automatic deletion of unused segment schemas. If set to true, Coordinator will periodically identify segment schemas which are not referenced by any used segment and mark them as unused. At a later point, these unused schemas are deleted. Only applies if [Centralized Datasource schema](#centralized-datasource-schema) feature is enabled. | No | True|
|`druid.coordinator.kill.segmentSchema.period`| How often to do automatic deletion of segment schemas in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Value must be equal to or greater than `druid.coordinator.period.metadataStoreManagementPeriod`. Only applies if `druid.coordinator.kill.segmentSchema.on` is set to true.| No| `P1D`|
|`druid.coordinator.kill.segmentSchema.durationToRetain`| Duration of segment schemas to be retained from the time it was marked as unused in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Only applies if `druid.coordinator.kill.segmentSchema.on` is set to true.| Yes, if `druid.coordinator.kill.segmentSchema.on` is set to true.| `P90D`|

##### Segment management

Expand Down Expand Up @@ -1428,7 +1447,7 @@ MiddleManagers pass their configurations down to their child peons. The MiddleMa
|`druid.worker.baseTaskDirs`|List of base temporary working directories, one of which is assigned per task in a round-robin fashion. This property can be used to allow usage of multiple disks for indexing. This property is recommended in place of and takes precedence over `${druid.indexer.task.baseTaskDir}`. If this configuration is not set, `${druid.indexer.task.baseTaskDir}` is used. For example, `druid.worker.baseTaskDirs=[\"PATH1\",\"PATH2\",...]`.|null|
|`druid.worker.baseTaskDirSize`|The total amount of bytes that can be used by tasks on any single task dir. This value is treated symmetrically across all directories, that is, if this is 500 GB and there are 3 `baseTaskDirs`, then each of those task directories is assumed to allow for 500 GB to be used and a total of 1.5 TB will potentially be available across all tasks. The actual amount of memory assigned to each task is discussed in [Configuring task storage sizes](../ingestion/tasks.md#configuring-task-storage-sizes)|`Long.MAX_VALUE`|
|`druid.worker.category`|A string to name the category that the MiddleManager node belongs to.|`_default_worker_category`|
|`druid.indexer.fork.property.druid.centralizedDatasourceSchema.enabled`| This config should be set when CentralizedDatasourceSchema feature is enabled. |false|
|`druid.indexer.fork.property.druid.centralizedDatasourceSchema.enabled`| This config should be set when [Centralized Datasource Schema](#centralized-datasource-schema) feature is enabled. |false|

#### Peon processing

Expand Down
26 changes: 13 additions & 13 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,11 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|`sqlQuery/bytes`|Number of bytes returned in the SQL query response.|`id`, `nativeQueryIds`, `dataSource`, `remoteAddress`, `success`, `engine`| |
|`serverview/init/time`|Time taken to initialize the broker server view. Useful to detect if brokers are taking too long to start.||Depends on the number of segments.|
|`metadatacache/init/time`|Time taken to initialize the broker segment metadata cache. Useful to detect if brokers are taking too long to start||Depends on the number of segments.|
|`metadatacache/refresh/count`|Number of segments to refresh in broker segment metadata cache.|`dataSource`|
|`metadatacache/refresh/time`|Time taken to refresh segments in broker segment metadata cache.|`dataSource`|
|`metadatacache/schemaPoll/count`|Number of coordinator polls to fetch datasource schema.||
|`metadatacache/schemaPoll/failed`|Number of failed coordinator polls to fetch datasource schema.||
|`metadatacache/schemaPoll/time`|Time taken for coordinator polls to fetch datasource schema.||
|`metadatacache/backfill/count`|Number of segments for which schema was back filled in the database.|`dataSource`|
|`schemacache/realtime/count`|Number of realtime segments for which schema is cached.||Depends on the number of realtime segments.|
|`schemacache/finalizedSegmentMetadata/count`|Number of finalized segments for which schema metadata is cached.||Depends on the number of segments in the cluster.|
|`schemacache/finalizedSchemaPayload/count`|Number of finalized segment schema cached.||Depends on the number of distinct schema in the cluster.|
|`schemacache/inTransitSMQResults/count`|Number of segments for which schema was fetched by executing segment metadata query.||Eventually it should be 0.|
|`schemacache/inTransitSMQPublishedResults/count`|Number of segments for which schema is cached after back filling in the database.||Eventually it should be 0.|
|`metadatacache/refresh/count`|Number of segments to refresh in broker segment metadata cache.|`dataSource`||
|`metadatacache/refresh/time`|Time taken to refresh segments in broker segment metadata cache.|`dataSource`||
|`metadatacache/schemaPoll/count`|Number of coordinator polls to fetch datasource schema.|||
|`metadatacache/schemaPoll/failed`|Number of failed coordinator polls to fetch datasource schema.|||
|`metadatacache/schemaPoll/time`|Time taken for coordinator polls to fetch datasource schema.|||
|`serverview/sync/healthy`|Sync status of the Broker with a segment-loading server such as a Historical or Peon. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled. This metric can be used in conjunction with `serverview/sync/unstableTime` to debug slow startup of Brokers.|`server`, `tier`|1 for fully synced servers, 0 otherwise|
|`serverview/sync/unstableTime`|Time in milliseconds for which the Broker has been failing to sync with a segment-loading server. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled.|`server`, `tier`|Not emitted for synced servers.|
|`subquery/rowLimit/count`|Number of subqueries whose results are materialized as rows (Java objects on heap).|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| |
Expand Down Expand Up @@ -375,8 +369,14 @@ These metrics are for the Druid Coordinator and are reset each time the Coordina
|`serverview/sync/healthy`|Sync status of the Coordinator with a segment-loading server such as a Historical or Peon. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled. You can use this metric in conjunction with `serverview/sync/unstableTime` to debug slow startup of the Coordinator.|`server`, `tier`|1 for fully synced servers, 0 otherwise|
|`serverview/sync/unstableTime`|Time in milliseconds for which the Coordinator has been failing to sync with a segment-loading server. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled.|`server`, `tier`|Not emitted for synced servers.|
|`metadatacache/init/time`|Time taken to initialize the coordinator segment metadata cache.||Depends on the number of segments.|
|`metadatacache/refresh/count`|Number of segments to refresh in coordinator segment metadata cache.|`dataSource`|
|`metadatacache/refresh/time`|Time taken to refresh segments in coordinator segment metadata cache.|`dataSource`|
|`metadatacache/refresh/count`|Number of segments to refresh in coordinator segment metadata cache.|`dataSource`||
|`metadatacache/refresh/time`|Time taken to refresh segments in coordinator segment metadata cache.|`dataSource`||
|`metadatacache/backfill/count`|Number of segments for which schema was back filled in the database.|`dataSource`||
|`metadatacache/realtimeSegmentSchema/count`|Number of realtime segments for which schema is cached.||Depends on the number of realtime segments in the cluster.|
|`metadatacache/finalizedSegmentMetadata/count`|Number of finalized segments for which schema metadata is cached.||Depends on the number of segments in the cluster.|
|`metadatacache/finalizedSchemaPayload/count`|Number of finalized segment schema cached.||Depends on the number of distinct schema in the cluster.|
|`metadatacache/temporaryMetadataQueryResults/count`|Number of segments for which schema was fetched by executing segment metadata query.||Eventually it should be 0.|
|`metadatacache/temporaryPublishedMetadataQueryResults/count`|Number of segments for which schema is cached after back filling in the database.||This value gets reset after each database poll. Eventually it should be 0.|

## General Health

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,13 @@ public static SegmentTransactionalInsertAction overwriteAction(
@Nullable SegmentSchemaMapping segmentSchemaMapping
)
{
return new SegmentTransactionalInsertAction(segmentsToBeOverwritten, segmentsToPublish, null, null, null,
segmentSchemaMapping
return new SegmentTransactionalInsertAction(
segmentsToBeOverwritten,
segmentsToPublish,
null,
null,
null,
segmentSchemaMapping
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,8 +443,10 @@ protected TaskAction<SegmentPublishResult> buildPublishAction(
case APPEND:
return SegmentTransactionalAppendAction.forSegments(segmentsToPublish, segmentSchemaMapping);
default:
return SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten, segmentsToPublish,
segmentSchemaMapping
return SegmentTransactionalInsertAction.overwriteAction(
segmentsToBeOverwritten,
segmentsToPublish,
segmentSchemaMapping
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public TaskStatus runTask(TaskToolbox toolbox)
return status;
}

public TaskAction<SegmentPublishResult> testBuildPublishAction(
public TaskAction<SegmentPublishResult> buildPublishActionForTest(
Set<DataSegment> segmentsToBeOverwritten,
Set<DataSegment> segmentsToPublish,
SegmentSchemaMapping segmentSchemaMapping,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,9 @@ public Map<String, Long> getBlacklistedTaskSlotCount()
}
}

/**
* Verify that schema is present for each segment.
*/
public void verifySchema(DataSegmentsWithSchemas dataSegmentsWithSchemas)
{
int nonTombstoneSegments = 0;
Expand All @@ -556,11 +559,16 @@ public void verifySchema(DataSegmentsWithSchemas dataSegmentsWithSchemas)
continue;
}
nonTombstoneSegments++;
Assert.assertTrue(dataSegmentsWithSchemas.getSegmentSchemaMapping()
.getSegmentIdToMetadataMap()
.containsKey(segment.getId().toString()));
Assert.assertTrue(
dataSegmentsWithSchemas.getSegmentSchemaMapping()
.getSegmentIdToMetadataMap()
.containsKey(segment.getId().toString())
);
}
Assert.assertEquals(nonTombstoneSegments, dataSegmentsWithSchemas.getSegmentSchemaMapping().getSegmentIdToMetadataMap().size());
Assert.assertEquals(
nonTombstoneSegments,
dataSegmentsWithSchemas.getSegmentSchemaMapping().getSegmentIdToMetadataMap().size()
);
}

public TaskReport.ReportMap getReports() throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ public TaskStatus getStatus(String taskId)
}
}

public DataSegmentsWithSchemas getPublishedSegments(String taskId)
public DataSegmentsWithSchemas getPublishedSegmentsWithSchemas(String taskId)
{
final TaskContainer taskContainer = tasks.get(taskId);
if (taskContainer == null || taskContainer.actionClient == null) {
Expand Down Expand Up @@ -667,7 +667,7 @@ public ListenableFuture<TaskStatusResponse> taskStatus(String taskId)

public DataSegmentsWithSchemas getSegmentAndSchemas(Task task)
{
return taskRunner.getPublishedSegments(task.getId());
return taskRunner.getPublishedSegmentsWithSchemas(task.getId());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1164,7 +1164,7 @@ public void testBuildPublishAction()

Assert.assertEquals(
SegmentTransactionalAppendAction.class,
task.testBuildPublishAction(
task.buildPublishActionForTest(
Collections.emptySet(),
Collections.emptySet(),
null,
Expand All @@ -1174,7 +1174,7 @@ public void testBuildPublishAction()

Assert.assertEquals(
SegmentTransactionalReplaceAction.class,
task.testBuildPublishAction(
task.buildPublishActionForTest(
Collections.emptySet(),
Collections.emptySet(),
null,
Expand All @@ -1184,7 +1184,7 @@ public void testBuildPublishAction()

Assert.assertEquals(
SegmentTransactionalInsertAction.class,
task.testBuildPublishAction(
task.buildPublishActionForTest(
Collections.emptySet(),
Collections.emptySet(),
null,
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/docker/druid.sh
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ setupData()
# The "query" and "security" test groups require data to be setup before running the tests.
# In particular, they requires segments to be download from a pre-existing s3 bucket.
# This is done by using the loadSpec put into metadatastore and s3 credientials set below.
if [ "$DRUID_INTEGRATION_TEST_GROUP" = "query" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-error" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "high-availability" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "ldap-security" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "upgrade" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "centralized-datasource-schema" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "cds-task-schema-publish-disabled" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "cds-coordinator-smq-disabled" ]; then
if [ "$DRUID_INTEGRATION_TEST_GROUP" = "query" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-error" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "high-availability" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "ldap-security" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "upgrade" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "centralized-datasource-schema" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "cds-task-schema-publish-disabled" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "cds-coordinator-metadata-query-disabled" ]; then
# touch is needed because OverlayFS's copy-up operation breaks POSIX standards. See https://github.com/docker/for-linux/issues/72.
find /var/lib/mysql -type f -exec touch {} \; && service mysql start \
&& cat /test-data/${DRUID_INTEGRATION_TEST_GROUP}-sample-data.sql | mysql -u root druid \
Expand Down
Loading

0 comments on commit c61c378

Please sign in to comment.