-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Spark connector reader support. #11823
Add Spark connector reader support. #11823
Conversation
Add code, utilities, tests, and documentation for reading data from Druid using Spark.
Closing and reopening to trigger Travis. |
@JulianJaffePinterest I just noticed that Travis is not running for this branch since it's not included in the travis configuration. Can you please add the |
@jihoonson added the branch to |
The packaging check is failing with
I had thought this was an non-obvious error due to not adding licenses yet but the check is still failing after I add the licenses. Is this something people regularly encounter? |
I missed including this in the initial set up PR, so including the DynamicConfigProviderRegistry now. DynamicConfigProviders are used in the DruidMetadataClient and in the DeepstorageHelpers. Because we provide the org.apache.druid.spark.registerSubtype function, this registry is technically unnecessary but is included for ease of use.
Rename org.apache.druid.spark.injectableValues to baseInjectableValues to reduce confusion.
* Update Dockerfile * Update docker_build_containers.sh * Update Dockerfile
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JulianJaffePinterest thanks for enabling Travis. I haven't finished my review yet, but reviewed only up to the "Determining which segments to load" part. Feel free to address my comments at once when my review is done. I will try to finish my review soon. BTW, I remember you said that you added some tests and made them run in Travis. Are those tests are not in this PR but planned as a follow-up? I'm wondering where they are and how they run in Travis.
docs/operations/spark.md
Outdated
|`table`|The Druid data source to read from|Yes|| | ||
|`reader.deepStorageType`|The type of deep storage used to back the target Druid cluster|No|`local`| | ||
|`reader.segments`|A hard-coded list of Druid segments to read. If set, the table and druid client configurations are ignored and the specified segments are read directly. Must be deserializable into Druid DataSegment instances|No| | ||
|`reader.useCompactSketches`|Controls whether or not compact representations of complex metrics are used (only for metrics that support compact forms)|No|False| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it be reader.useCompactMetrics
if it applies to all metrics that support compact forms? Or does it apply to only sketches?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there complex metrics that have compact forms and aren't sketches? Either way, at the moment this property is only applied to sketches (see ComplexMetricRegistry
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then, perhaps better rephrasing this to something like:
|`reader.useCompactSketches`|Controls whether or not compact representations of complex metrics are used (only for metrics that support compact forms)|No|False| | |
|`reader.useCompactSketches`|Controls whether or not compact representations of sketch metrics are used|No|False| |
// Otherwise, we'd need to full scan the segments table | ||
val (lowerTimeBound, upperTimeBound) = FilterUtils.getTimeFilterBounds(filters) | ||
|
||
metadataClient.getSegmentPayloads(conf.getString(DruidConfigurationKeys.tableKey), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, it seems that metadataClient.getSegmentPayloads
simply returns used segments in a given interval. In that case, the returned segments can include some segments overshadowed by others if the coordinator hasn't marked those overshadowed segments unused yet. To handle this problem, Druid uses VersionedIntervalTimeline
to find the most recent set of segments. Is there some code somewhere to find the most recent set of segments from the returned segments?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I dropped the overshadow checking logic while chunking up PRs. Thanks for catching this. I've moved the logic to the metadata client because I think it makes more sense there now and addressed a TODO and added support for querying incomplete partitions if desired.
Addressing review comments from Jihoon. Fixes a few typos, adds more logging, and adds back the overshadowed check that was lost in the PR chunking. Additionally resolves a long-standing TODO and allows configuring the reader to include incomplete partitions if desired.
@jihoonson thanks for starting your review. The end-to-end tests I was referring to in our earlier discussion haven't been added yet (since this PR doesn't include the writing logic), but when they are added no changes will be necessary - they're lightweight enough run as part of |
@jihoonson gentle reminder 🙂 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First round of reviews. Will look closer at deep storage interaction and test coverage next.
* non-overshadowed segments in the interval will be returned, regardless of completesness. | ||
* | ||
* @param datasource The Druid data source to get segment payloads for. | ||
* @param intervalStart The start of the interval to fetch segment payloads for. If None, MIN_INSTANT is used. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You may want to mention that start and end intervals are inclusive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not 100% on how to word this: the SQL query being executed uses >=
and <=
for its bounds, but it's effectively querying against JodaTime Interval endpoints, where the start time is inclusive and the end time is exclusive. The net result is fetching segments within [startTime, endTime)
. If you were encountering this method while working in an unfamiliar code base, how would you want this communicated?
spark/src/main/scala/org/apache/druid/spark/utils/NullHandlingUtils.scala
Outdated
Show resolved
Hide resolved
* @param filter The Spark filter to map to a Druid filter. | ||
* @return The Druid filter corresponding to the filter condition described by FILTER. | ||
*/ | ||
def mapFilter(filter: Filter, schema: StructType): DimFilter = { // scalastyle:ignore method.length |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Druid supports a few other filter types like Search
, Extraction
, Expression
, Javascript
etc. Are there any spark counterparts for those?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There aren't. The main reason to push down some Spark filters to the readers is to be able to parse out bounds on __time
that we could use to further reduce which segments we open. As a happy benefit, in some cases we can push down filters to the readers and filter out rows before returning them to Spark, but we're not aiming to run the Druid query execution engine in a Spark executor. Any operation that would be evaluated in a Druid Expression
or Javascript
filter should instead be handled by Spark. (As an aside, I wouldn't expect Spark to even attempt to push down any predicates that needed UDFs or custom code to be executed. You can see the set of Spark Filters that can be pushed down in the isSupportedFilter
function.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at this code again though, there's probably room in the future to better tie it to the two null handling schemes and increase the number of filters we're able to push down. This is just an efficiency improvement though - if we can't handle a given set of filters, Spark will execute them itself.
StructField, StructType, TimestampType} | ||
import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch} | ||
|
||
class DruidColumnarInputPartitionReader( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would a better name instead be DruidVectorizedInputPartitionReader
. I am not sure if Columnar
is representing the fact that this reader is reading column values in batches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Columnar
is the Spark convention for this (e.g. the DataSourceReader mixes in SupportsScanColumnarBatch
and this InputPartitionReader returns ColumnarBatch
es of ColumnVector
s). Since this will primarily be read by Druid developers I'm happy to ditch the Spark naming convention and name this DruidVectorizedInputPartitionReader
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, yes. Makes sense. I am ok with DruidColumnarInputPartitionReader
spark/src/main/scala/org/apache/druid/spark/v2/reader/DruidColumnarInputPartitionReader.scala
Outdated
Show resolved
Hide resolved
val filterOpt = FilterUtils.mapFilters(filters, schema.get) | ||
filterOpt.fold(true) { filter => | ||
val rowSignature = SchemaUtils.generateRowSignatureFromSparkSchema(schema.get) | ||
val canVectorize = filter.toOptimizedFilter.canVectorizeMatcher(rowSignature) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about the case when Druid doesn't support vectorized reads for say a complex column type? Does that mean we should be looking at the schema also to decide value of canVectorize
? We should probably disable vectorization then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are looking at the schema to decide the value of canVectorize
(we generate the Druid RowSignature
and pass it to the canVectorizeMatcher
call). Are you saying we should be more conservative and disable vectorization in additional cases?
import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMediumFactory | ||
import org.apache.druid.segment.{IndexIO, IndexMergerV9} | ||
|
||
package object v2 { // scalastyle:ignore package.object.name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the purpose of this class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a Scala package object. Although the INDEX_IO
object is only used one place in the reader, that object and the INDEX_MERGER_V9
object will also be used multiple places in the writer, which is why I've put them in the package object.
One package class was accidentally renamed while moving to piecemeal commits. This doesn't actually affect anything, but moving back to Scala conventions.
Add a try/catch block around the InputPartition.close() methods to make sure we delete the temporary files we created if we're able. If deleting the temp dir throws an Exception we still might leave an orphaned file, but Spark should clean up the temp directory as well so best effort is ok here.
If we're using SQL-compatible null handling, we can support pushing down IsNull and IsNotNull filters to the readers. We can also support pushing down EqualNullSafe filters whose value is null (null <=> null == true), In filters where null appears in the list of value (including short-circuiting for IN (null)), and short-circuiting EqualTo(field, null) filters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JulianJaffePinterest sorry, I've been swamping with work recently. I focused on documents and user-facing parts in my review. They LGTM. As long as they are good enough, we can improve other details later.
@jihoonson thanks for starting your review. The end-to-end tests I was referring to in our earlier discussion haven't been added yet (since this PR doesn't include the writing logic), but when they are added no changes will be necessary - they're lightweight enough run as part of mvn test. This PR does include the unit tests for the reader logic, which also run via mvn test, as expected. If you'd like, I can create an explicit Travis job for the spark tests instead of letting them be bundled into the other modules test jobs.
I prefer having a separate Travis job since it is easy to tell where spark tests are running. I'm also expecting to add much more tests in the future. In that case, splitting spark tests from others can reduce the job runtime. Finally, we have a script that tells Travis which part has changed in each PR, so that we don't have to run those tests that are unrelated to the change. Having a separate Travis job for spark connector will make easy for script to run only relevant tests when we make some changes in the connector.
* A registry for plugging in support for Druid complex metric types. Provides definitions for supporting complex types | ||
* in extensions-core out of the box. | ||
*/ | ||
object ComplexMetricRegistry extends Logging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: ComplexTypeRegistry
could be a better name as we are trying to support complexTypes for dimensions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done 👍
convertConfToInstance(conf, classOf[AzureAccountConfig]) | ||
} | ||
|
||
def createAzureStorage(conf: Configuration): AzureStorage = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we ever want to support a new storage type, then should we modify this file? Could there be a more extendible way to support new types without recompiling the spark connector?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Users can add new deep storage handlers (or override the default ones) using the various plugin registries with no need to recompile the spark connector. If you're asking what happens when Druid adds support for new deep storage types, then someone needs to tell spark how to access the new deep storage type. This can be done either in the spark module or theoretically in the new druid extension, although users would then need to register the new functions themselves. I'm not sure I follow the concern about needing to recompile the spark connector - presumably Druid itself will only add support for new deep storage types when new versions are released, which will mean compiling from source anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was mostly wondering what people should do if they want to register their custom deep storage type to the spark connector. But now looking at the code again, I think I was misunderstood the code probably. I see that users can register their own custom type in the registry.
val dimensions = columns.filter(availableDimensions.contains).asJava | ||
val metrics = columns.filter(availableMetrics.contains).asJava | ||
|
||
new IngestSegmentFirehose(List(adapter).asJava, TransformSpec.NONE, dimensions, metrics, filter) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Firehose is deprecated and will be removed sooner or later. Suggest to use InputFormat instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored 👍
@jihoonson no worries I know how it goes 🙂. I've split out the spark module tests into separate Travis jobs as you requested. |
@JulianJaffePinterest thanks for addressing comments. Can you check the travis failure? One is the doc spelling check and another is the "other module test". The other module test failure seems strange as the forked vm exploded while testing |
Missed updating .spelling when renaming ComplexMetricRegistry to ComplexTypeRegistry.
@jihoonson do you know if it's possible to get the logs mentioned in the error message ( |
@JulianJaffePinterest I'm not sure. Maybe we can see what is in the surefire reports after they fail in Travis? |
Previous build was canceled by Travis - closing and reopening to retrigger |
Cancel dance again |
@JulianJaffePinterest Sorry about that. I canceled your travis run because I was trying to make space for the tests to run on 0.22.1 first. If I cancel again I'll restart your travis job - no need to close and re-open. |
Confirm it was in fact recent changes that broke the build.
Silly mistake.
@jihoonson |
Resets this branch to the latest changes after confirming the failing test was unrelated.
New approach to fixing the KafkaLookupExtractorFactoryTest failure and added some more tests/did some housekeeping of the reader code.
Last change didn't help, now trying to find the .dump files mentioned in the mvn error message.
@jihoonson picking this back up now that I'm back from vacation. Because this PR isn't against master, would it be possible to land it despite the seemingly unrelated failing test? This would allow me to put the last PR for this feature up (the writing component) and begin getting reviews while working on the kafka lookup extraction test in parallel. |
@JulianJaffePinterest thank you for picking this up again. It makes sense to me. I will merge this PR shortly. Happy new year! |
Happy to hear that this PR has been merged |
Add code, utilities, tests, and documentation for reading data from Druid using Spark.
(See #10920 and #11474 for context)
This PR splits out the reader logic from #10920 into a standalone PR as discussed. This PR also reworks how the connector handles segment loading to allow for more extensibility.
Description
At a high level, this connector reads data from Druid into Spark by querying the Druid metadata SQL server for the segment load specs for the specified data source and time period and then fetching and loading those segments from deep storage. On the Spark side, querying the Druid cluster and determining which segments to load is done on the driver side, while the actual fetching and loading of segment files is done on the executors.
A more granular walk-through follows:
Determining which segments to load
When a user calls
.load()
to read in a Druid data source into a Spark DataFrame, the first step for the connector is to determine which segments need to be read. This is done by querying a Druid cluster's backing metadata database for the specified data source and time range via theDruidMetadataClient
(we need to query the backing SQL database directly because segment load specs are pruned when served from the broker). Additionally, if the user does not provide a schema for the data source, we need to construct the correct schema ourselves. We do this via SegmentMetadata queries sent to a Druid broker.Key Classes
DruidDataSourceReader
DruidMetadataClient
DruidClient
(already reviewed in #11474)Distributing the segments
Once we've determined which Druid segments we need to load, we need to assign the segments to Spark partitions that will actually do the reading. For now, we simply assign each Druid segment file to a Spark partition, although in the future this could be extended to do smarter things (for instance, we could map multiple Druid segments to a single Spark partition and thus allow the user to specify how many Spark partitions they wanted their DataFrame to have, regardless of the number of underlying Druid segment files).
Key Classes
DruidDataSourceReader
(thePlanInputPartition
methods)DruidInputPartition
Reading the data
Once we've assigned a Druid segment file to a Spark partition, we need to actually fetch the segment file from deep storage and read its data. This is handled by an
InputPartitionReader
(DruidInputPartitionReader
for row-based reads;DruidColumnarInputPartitionReader
for vectorized reads). Using either default or custom, user-provided logic the input partition reader pulls the segment file from deep storage and opens it as a Druid QueryableIndex. The reader then applies any pushed-down filters, projects the specified columns (if the user provided a schema explicitly), and fills Spark rows or vectors with the segment's data.A key piece here (and an enhancement from #10920) is the logic used to pull the segment file from deep storage. There are two supported approaches: by default, the partition reader will attempt to deserialize the load spec for it assigned segment and delegate to a Druid DataSegmentPuller to handle fetching. This requires a user to configure the reader with all necessary properties to connect and authenticate to their deep storage, but all Druid "core" deep storages (local, HDFS, S3, GCS, and Azure) are supported. Alternatively, users can defer to their Spark application's configuration. In this scenario, the reader extracts a URI from it's assigned segment's load spec and then constructs a FileSystem from the Spark application's configuration. The reader uses the file system to pull the extracted URI, meaning that users are not responsible for handling connection and authentication to deep storage. This second case is useful for users running on clusters that rely on GCS ADCs or AWS IAM roles for machine authorization to GCS/S3, or for clusters that manage access keys for their users. Only local, HDFS, S3, and GCS deep storage implementation are supported out of the box for this approach (Azure users will need to use the first approach or register a custom load function via the
SegmentReaderRegistry
).Key Classes
DruidBaseInputPartitionReader
DruidColumnarInputPartitionReader
DruidInputPartitionReader
SegmentReaderRegistry
User interface
Users use the connector like any other Spark reader: they call
.read
on their Spark session, set the format (in this case,"druid"
), specify the properties to use, and then callload()
. For example:For convenience, a more strongly typed way to apply configure the reader is also provided:
User documentation with examples and configuration option descriptions is provided in spark.md.
Additionally, because the connector does not run in a Druid cluster, we can't use Druid's dependency injection to transparently handle custom extensions and behavior. In order to support users who use unsupported or custom complex metrics, deep storage implementations, or metadata databases, the connector uses a plugin-based architecture. All Druid core extensions are supported out of the box. If users need to use their own custom logic, they can register the appropriate functions with the corresponding registry (
ComplexMetricRegistry
for complex metrics,SQLConnectorRegistry
for metadata databases, andSegmentReaderRegistry
for loading segments from deep storage).Key Classes
ComplexMetricRegistry
SQLConnectorRegistry
SegmentReaderRegistry
DruidConfigurationKeys
spark.md
This PR has: