-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark Direct Readers and Writers for Druid. #10920
Spark Direct Readers and Writers for Druid. #10920
Conversation
This commit consolidates the previous work for apache#9780 into one single merged commit. This commit adds Spark DataFrame readers and writers that interact directly with Druid deep storage, avoiding placing load on active clusters.
Reworked the configuration system, cleaned up the documentation, moved it to the docs/ directory, and a few other code-quality odds and ends.
Added support for callers to provide a list of Druid dimension objects via in addition to the existing support for a comma-separated list of dimension names or no value. This allows callers to specify how String dimension columns handle multi-values and whether or not to create bitmap indices instead of forcing the defaults. Also added more tests, improved the scaladocs throughout, and resolved a few scalastyle warnings.
I've added some more tests, improved some comments, and added support for configuring non-default multi-value handling and indexing for string dimensions. I also see that instructions for how to update the licenses and notices have been committed, but I'll wait on doing that until this has a little more traction 😉 |
Is anyone actively reviewing this? If not, I have a bit of a refactor I'd like to push that improves the organization of the code and hopefully makes it easier to follow. |
Please post this update. |
Refactor the spark extensions to improve the layout of packages and classes. Also add experimental support for columnar reads, initial support for Druid's default value null handling, improved tests and code documentation, and bump the Spark version to 2.4.7 & scala version to 2.12.12 to pull in security fixes.
Life intervened as always, so I haven't thought about this for a while. I've just pushed that refactor I mentioned above that improves the package and class layout. The commit also bumps the target Spark and scala patch versions (to 2.4.7 & 2.12.12, respectively) to pull in security fixes, adds more unit tests, and improves the code documentation. Finally, the commit adds two new features: the ability to specify SQL-compliant or Druid default value null handling (via the |
Awesome, I have had to resort adding a InitializeForTesting call from the NullHandliing module to the writer class, to ensure that NullHandling was initialized on the executors or else exceptions were thrown. Glad that is properly fixed up now. |
Make null ahndling configurable for writing as well as reading.
Added the |
* @return A map from column name to data type and whether or not the column is multi-value | ||
* for the schema of DATASOURCE. | ||
*/ | ||
def getSchema(dataSource: String, intervals: List[Interval]): Map[String, (String, Boolean)] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should intervals be optional? If interval is not provided, the default interval specified by the broker config druid.query.segmentMetadata.defaultHistory
will be used. https://druid.apache.org/docs/latest/querying/segmentmetadataquery.html#intervals
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Intervals are required because the default behavior is unlikely to be what Spark users would expect (e.g. if no time bounds are provided, I would expect to read the whole table in a batch context, not just the last week.) For now, this is only called in one place, which uses JodaTime.MIN_INSTANT
and JodaTime.MAX_INSTANT
as the interval bounds if no time filters are provided. This will be an expensive query for large datasources, but until #9707 is implemented I don't know of a better way. I suppose it would make sense to move that default handling into the getSchema
method here instead though, so I'll make the change.
if (segments.size() > 1) { | ||
logWarn("Segment metadata response had more than one row, problem?") | ||
} | ||
segments.asScala.head.getColumns.asScala.map{ case (name: String, col: ColumnAnalysis) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I couldn't find it in the docs, but what happens if segments have different schemas. For ex - a dimension maybe a string in one segment and a double in another. Should we set merge
to false
and give precedence to the latest segment when determining a column's type and other information.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The analysis will error if a dimension has different types across two merged segments. The returned column will have type String
and so we'll silently widen the type to String. It's been a couple years since I wrote this piece, and yeah that is really non-obvious (I had to go code spelunking to figure it out). I'm inclined to add a comment to the code explaining what happens and then leave the current behavior, but I could also see just throwing an error back to the caller and refusing to continue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To the second part of your question, the reason we don't just take the last segment in an interval and use its schema is because the latest segment may happen to not have multiple values for a particular column where other segments do, or a column may be all null for the latest segment and so not show up in the Segment Metadata. We query the full range of the requested interval and then merge to handle these cases, which does open up the merging problem if types change. At least in my Druid experience, the first two cases are much more frequent than the third case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to fail the operation if there is an error reported in the response? At a minimum, we should log a warning to let the user know that we are falling back to using "String" as datatype for a column since, possibly, data types are different for a column across segments in the requested interval.
...nsions-core/spark-extensions/src/main/scala/org/apache/druid/spark/clients/DruidClient.scala
Outdated
Show resolved
Hide resolved
Per review: Made the intervals arg for getSchema() optional, added a comment explaining how we handle a changing dimension type across the queried interval for getSchema and debug-level logging of the SegmentAnalysis we extract our Spark schema from, and changed the behavior when multiple SegmentAnalyses are returned from a merged SegmentMetadataQuery to throw an error instead of just logging a warning. (A merged SegmentMetadataQuery result should only have one row, so we should not expect to ever hit this error).
"metadataPassword" -> "diurd" | ||
) | ||
|
||
sparkSession |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to also include an example that illustrates how to provide broker url for determining schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JulianJaffePinterest , appreciate your effort to build something like this. Thank you !! . I am trying to use this spark connector . Just wanted to check if the - spark.md file is merged to sprak-druid-connector branch?
.load() | ||
``` | ||
|
||
Filters should be applied to the read-in data frame before any [Spark actions](http://spark.apache.org/docs/2.4.7/api/scala/index.html#org.apache.spark.sql.Dataset) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be a gap in my understanding - since the readers are directly loading from deep storage, would they not need to essentially download and scan the entire file to apply predicates?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a filter is on the time dimension, then we don't need to scan files that don't cover the requested time interval (down the road we could also add support for not fetching unnecessary files if the underlying data source is partitioned by range as well). If the segment file contains data for the requested time period we'll need to fetch it from deep storage but we can still filter out values while we read data into a DataFrame so there's some benefit there as well, at least for what Spark needs to keep in memory.
I pushed up a commit to clean up the documentation as you requested @samarthjain. I also added support for building a library jar suitable for adding to a Spark cluster with |
@JulianJaffePinterest I have tried to create a jar file from your PR branch and copied it to my spark code. I am using it to read a druid data source using the following code (I have retrieved the segments directly via the DruidMetaClient ):
but I keep on hitting the error:
Not sure what I am missing here exactly but I did not see a test case for this in your PR as well. |
) extends Logging { | ||
private lazy val druidMetadataTableConfig = MetadataStorageTablesConfig.fromBase(base) | ||
private lazy val dbcpProperties = new Properties() | ||
dbcpProperties.putAll(dbcpMap) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These 2 lines can be combined into one.
private lazy val dbcpProperties = new Properties(dbcpMap)
Without this I am getting a build failure when trying to build this PR locally:
[ERROR] /Users/btiwana/Downloads/druid-spark/druid/extensions-core/spark-extensions/src/main/scala/org/apache/druid/spark/utils/DruidMetadataClient.scala:52: error: ambiguous reference to overloaded definition, [ERROR] both method putAll in class Properties of type (x$1: java.util.Map[_, _])Unit [ERROR] and method putAll in class Hashtable of type (x$1: java.util.Map[_ <: Object, _ <: Object])Unit [ERROR] match argument types (java.util.Properties) [ERROR] dbcpProperties.putAll(dbcpMap) [ERROR] ^ [ERROR] one error found
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you share a stack trace or how you triggered this error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am simply doing mvn clean package -pl extensions-core/spark-extensions
. This actually seems like a scala version mismatch issue for me. I am using 2.13.5
but I think the expectation is to use 2.12.11
or 2.12.10
instead? Adding some stack trace here:
[INFO] --- scala-maven-plugin:3.2.2:compile (scala-compile-first) @ druid-spark-extensions --- [WARNING] Expected all dependencies to require Scala version: 2.12.11 [WARNING] com.fasterxml.jackson.module:jackson-module-scala_2.12:2.10.2 requires scala version: 2.12.10 [WARNING] Multiple versions of scala libraries detected! [INFO] /Users/btiwana/Downloads/druid-spark/druid/extensions-core/spark-extensions/src/main/scala:-1: info: compiling [INFO] Compiling 29 source files to /Users/btiwana/Downloads/druid-spark/druid/extensions-core/spark-extensions/target/classes at 1621433638727 [ERROR] /Users/btiwana/Downloads/druid-spark/druid/extensions-core/spark-extensions/src/main/scala/org/apache/druid/spark/utils/DruidMetadataClient.scala:52: error: ambiguous reference to overloaded definition, [ERROR] both method putAll in class Properties of type (x$1: java.util.Map[_, _])Unit [ERROR] and method putAll in class Hashtable of type (x$1: java.util.Map[_ <: Object, _ <: Object])Unit [ERROR] match argument types (java.util.Properties) [ERROR] dbcpProperties.putAll(dbcpMap) [ERROR] ^ [ERROR] one error found [INFO] ------------------------------------------------------------------------ [INFO] BUILD FAILURE [INFO] ------------------------------------------------------------------------ [INFO] Total time: 29.832 s [INFO] Finished at: 2021-05-19T07:14:04-07:00 [INFO] ------------------------------------------------------------------------ [ERROR] Failed to execute goal net.alchim31.maven:scala-maven-plugin:3.2.2:compile (scala-compile-first) on project druid-spark-extensions: wrap: org.apache.commons.exec.ExecuteException: Process exited with an error: 1 (Exit value: 1) -> [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch. [ERROR] Re-run Maven using the -X switch to enable full debug logging. [ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles: [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException
if (segments.size() > 1) { | ||
logWarn("Segment metadata response had more than one row, problem?") | ||
} | ||
segments.asScala.head.getColumns.asScala.map{ case (name: String, col: ColumnAnalysis) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to fail the operation if there is an error reported in the response? At a minimum, we should log a warning to let the user know that we are falling back to using "String" as datatype for a column since, possibly, data types are different for a column across segments in the requested interval.
val httpClient: HttpClient, | ||
val hostAndPort: HostAndPort | ||
) extends Logging { | ||
private val RetryCount = 5 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to have these settings configurable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Seq.empty | ||
} | ||
}.toSeq.flatten | ||
closer.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be in a try-with-resources like block you have added in this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if this throws an exception we should blow up to avoid polluting the Spark cluster's workspace with orphaned filed, but I'm open to alternative approaches.
private[spark] val brokerPortDefaultKey: (String, Int) = (brokerPortKey, 8082) | ||
|
||
// Common configs | ||
val useCompactSketchesKey: String = "useCompactSketches" // Default: false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the purpose of this config?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To allows users to work with compact versions of ThetaSketches if desired (for example, if they don't need to update the sketches after reading them)
|
||
val df = sparkSession | ||
.read | ||
.format("druid") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to run these test cases locally using Intellij's scala plugin but they are failing with the same error that I mentioned in my comment earlier regarding "druid" not being a correct DataSource:
`2021-05-18T14:23:04,851 INFO [ScalaTest-run-running-DruidDataSourceV2Suite] org.apache.spark.sql.internal.SharedState - Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/Users/btiwana/Downloads/druid-spark/druid/extensions-core/spark-extensions/spark-warehouse').
2021-05-18T14:23:04,852 INFO [ScalaTest-run-running-DruidDataSourceV2Suite] org.apache.spark.sql.internal.SharedState - Warehouse path is 'file:/Users/btiwana/Downloads/druid-spark/druid/extensions-core/spark-extensions/spark-warehouse'.
2021-05-18T14:23:04,855 INFO [ScalaTest-run-running-DruidDataSourceV2Suite] org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef - Registered StateStoreCoordinator endpoint
Failed to find data source: druid. Please find packages at http://spark.apache.org/third-party-projects.html
java.lang.ClassNotFoundException: Failed to find data source: druid. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:660)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:245)
at org.apache.druid.spark.v2.DruidDataSourceV2Suite.$anonfun$new$2(DruidDataSourceV2Suite.scala:77)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:189)
at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
at org.scalatest.funsuite.AnyFunSuite.withFixture(AnyFunSuite.scala:1562)
at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:187)
at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:199)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:199)
at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:181)
at org.apache.druid.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:34)
at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
at org.apache.druid.spark.SparkFunSuite.runTest(SparkFunSuite.scala:34)
at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:232)
at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:232)
at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:231)
at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1562)
at org.scalatest.Suite.run(Suite.scala:1112)
at org.scalatest.Suite.run$(Suite.scala:1094)
at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1562)
at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:236)
at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:236)
at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:235)
at org.scalatest.funsuite.AnyFunSuite.run(AnyFunSuite.scala:1562)
at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1320)
at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1314)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1314)
at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:993)
at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:971)
at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1480)
at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:971)
at org.scalatest.tools.Runner$.run(Runner.scala:798)
at org.scalatest.tools.Runner.run(Runner.scala)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:133)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:27)
Caused by: java.lang.ClassNotFoundException: druid.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:634)
at scala.util.Try$.apply(Try.scala:213)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:634)
at scala.util.Failure.orElse(Try.scala:224)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:634)
... 51 more
`
I think we are missing a step of adding druid as a data source to spark.
@birTiwana the problem is that you don't have the If you're using |
Hi @JulianJaffePinterest, just want to check how things are going. My previous suggestion for splitting this PR into smaller chunks was based on an assumption that it will not slow down the process. But it seems not working as well as I expected. Our ultimate goal here is merging this cool feature quickly. If you are busy, I totally understand and will support you to merge this PR as quickly as possible without further split. |
Hey @jihoonson, I had some unexpected and unfortunate personal/familial crises to deal with these past few months. While they're not entirely in the rear-view mirror, I should have more time again to push this to the finish line. I've opened #11823 with the next chunk of code (the reading half of the connector). Please let me know if you think the PR is still too big; I couldn't find a good place to split it that wouldn't require a reviewer to know the rest of the code anyway. |
@JulianJaffePinterest sorry to hear that. I hope things have gotten better. Also, thank you for making #11823. I will take a look. |
hi @JulianJaffePinterest, I hope you can get out of your sad mood soon and take good care of yourself! Last month I tested your code. I met a problem described below:
|
@wangxiaobaidu11 thank you for your kind wishes. The issue you've encountered is an artifact of pulling out some custom code and updating logic to match mainline Druid changes from newer versions. In your case, you can call |
@JulianJaffePinterest Thanks for your reply. I added my own extension, Long Unique Extension, and I ran the entire Spark writing process to Druid. Use hdfs for deep storage. |
@wangxiaobaidu11 you don't need to make changes to the druid spark code for your use case - you can call |
@JulianJaffePinterest Thanks!I will update it . I have another question. |
@wangxiaobaidu11 you can control the number of partitions by partitioning your dataframe prior to calling |
|
Calling import org.apache.druid.spark.DruidDataFrame
df.partitionAndWrite("__time", "millis", "DAY", 200000).format("druid").mode(SaveMode.Overwrite).options(map).save() or in Java import org.apache.druid.spark.package$.MODULE$.DruidDataFrame;
DruidDataFrame(dataset).partitionAndWrite("__time", "millis", "DAY", 200000).format("druid").mode(SaveMode.Overwrite).options(map).save(); If you don't want to use implicits/wrapper classes, you can also use the partitioner directly: SingleDimensionPartitioner partitioner = new SingleDimensionPartitioner(dataset);
Dataset<Row> partitionedDataSet = partitioner.partition("__time", "millis", "DAY", 200000, "dim1", true);
partitionedDataset.write().format("druid").mode(SaveMode.Overwrite).options(map).save(); Also, are you setting |
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 4 weeks if no further activity occurs. If you think that's incorrect or this pull request should instead be reviewed, please simply write any comment. Even if closed, you can still revive the PR at any time or discuss it on the dev@druid.apache.org list. Thank you for your contributions. |
No, please don’t close this PR. |
This issue is no longer marked as stale. |
@JulianJaffePinterest thank you for the efforts. There is any news about this PR. And there is a way that we can contribute to it. |
Hi @JulianJaffePinterest is this PR still active? |
This pull request has been marked as stale due to 60 days of inactivity. |
This pull request has been marked as stale due to 60 days of inactivity. |
This pull request/issue has been closed due to lack of activity. If you think that |
Implements #9780
Description
See #9780 and linked discussions for more context. This PR adds a new module,
druid-spark
, containing Spark direct readers and writers for Druid. Usage is documented in the module documentation.As discussed on the dev mailing list, a summary in human language of the UX and testing regimen follows:
UX
The entry point for users is deceptively simple: all interaction is handled through the existing spark interfaces plus configuration. To be specific, users read Druid data into a Spark dataframe via
and write a Spark dataframe to a Druid data source with
The meat of the interaction is through the
propertiesMap
passed to the reader or writer. These properties, cataloged in the documentation mostly follow the corresponding Druid properties. If desired, there are typed helpers for setting these options inorg.apache.druid.spark.DruidDataFrameReader
andorg.apache.druid.spark.DruidDataFrameWriter
as well. Sample usages of these helpers areand
There are a few key areas to be aware of:
First, due to Spark's design, DataSourceWriters can not repartition the dataframe they are responsible for writing and they have very little information about the overall partitioning. To compensate for this, the writer includes a number of partitioners out of the box. These partitioners by necessity have no context for the data they are partitioning and so will be slower than usage-specific partitioners but are suitable for prototyping. If the provided partitioners are not used, there are a few behaviors to be aware of. For "simple" ShardSpecs (
LinearShardSpec
andNumberedShardSpec
) the writer will default to rationalizing the output segments into contiguous and complete blocks, ensuring that loading and overshadowing of the output will be handled atomically by Druid. For more complex shard spec types such asHashBasedNumberedShardSpec
andSingleDimensionShardSpec
, users will need to provided a partition map to the writer, linking Spark partition ids to information required to construct the corresponding Druid segments. The included Spark partitioners all provide partition maps and can be used as references for how to implement similar behavior in custom partitioners.Second, because this code executes in a Spark cluster rather than a Druid cluster, it cannot take advantage of Druid's extension support directly. Instead, these connectors utilize a plugin registry system that parallels Druid's extensions. The complex metric types, metadata stores, and deep storage types supported by core Druid extensions are also supported out of the box with this extension, with the exception of Azure deep storage. If users wish to implement their own plugins to handle specific complex metrics types, metadata servers, shard specs, or deep storage implementations, they can register plugins to handle their use cases with the respective plugin registries before loading or saving data and the Spark connectors will use the provided logic for the corresponding tasks.
Testing
Testing is handled via a combination of unit tests and light integration tests. Testing is focused on the core Spark code that handles reading and writing data, although most functionality is covered by at least one unit test. The key classes are
DruidDataSourceReader
,DruidInputPartitionReader
, andDruidInputPartition
on the read side andDruidDataSourceWriter
,DruidDataWriterFactory
, andDruidDataWriter
on the write side. The tests are in the corresponding*Suite
classes. Additionally, there is a lightweight round-trip test inDruidDataSourceV2
suite which writes out test data to local segments, updates metadata in an embedded Derby instance, reads the segments back into a Dataframe, and confirms that the rows read in matches the rows written out. This test also confirms that the metadata entries created by the writer is correct.The main gap in testing is the cross-compatibility matrix. I'm not sure how to repeatably test these connectors' interaction with deep storage types other than local and with metadata servers other than derby. This code has been run at scale against Druid deployments using HDFS for deep storage and Postgresql for metadata and I am aware of production users with S3 and GCP deep storage, but the Azure support is lightly modified from how the corresponding Druid extension handles configuration and otherwise untested beyond parsing.
This PR has:
Key changed/added classes in this PR
spark/*