Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Spark connector reader support. #11823

Conversation

JulianJaffePinterest
Copy link
Contributor

@JulianJaffePinterest JulianJaffePinterest commented Oct 21, 2021

Add code, utilities, tests, and documentation for reading data from Druid using Spark.

(See #10920 and #11474 for context)

This PR splits out the reader logic from #10920 into a standalone PR as discussed. This PR also reworks how the connector handles segment loading to allow for more extensibility.

Description

At a high level, this connector reads data from Druid into Spark by querying the Druid metadata SQL server for the segment load specs for the specified data source and time period and then fetching and loading those segments from deep storage. On the Spark side, querying the Druid cluster and determining which segments to load is done on the driver side, while the actual fetching and loading of segment files is done on the executors.

A more granular walk-through follows:

Determining which segments to load

When a user calls .load() to read in a Druid data source into a Spark DataFrame, the first step for the connector is to determine which segments need to be read. This is done by querying a Druid cluster's backing metadata database for the specified data source and time range via the DruidMetadataClient (we need to query the backing SQL database directly because segment load specs are pruned when served from the broker). Additionally, if the user does not provide a schema for the data source, we need to construct the correct schema ourselves. We do this via SegmentMetadata queries sent to a Druid broker.

Key Classes

DruidDataSourceReader
DruidMetadataClient
DruidClient (already reviewed in #11474)

Distributing the segments

Once we've determined which Druid segments we need to load, we need to assign the segments to Spark partitions that will actually do the reading. For now, we simply assign each Druid segment file to a Spark partition, although in the future this could be extended to do smarter things (for instance, we could map multiple Druid segments to a single Spark partition and thus allow the user to specify how many Spark partitions they wanted their DataFrame to have, regardless of the number of underlying Druid segment files).

Key Classes

DruidDataSourceReader (the PlanInputPartition methods)
DruidInputPartition

Reading the data

Once we've assigned a Druid segment file to a Spark partition, we need to actually fetch the segment file from deep storage and read its data. This is handled by an InputPartitionReader (DruidInputPartitionReader for row-based reads; DruidColumnarInputPartitionReader for vectorized reads). Using either default or custom, user-provided logic the input partition reader pulls the segment file from deep storage and opens it as a Druid QueryableIndex. The reader then applies any pushed-down filters, projects the specified columns (if the user provided a schema explicitly), and fills Spark rows or vectors with the segment's data.

A key piece here (and an enhancement from #10920) is the logic used to pull the segment file from deep storage. There are two supported approaches: by default, the partition reader will attempt to deserialize the load spec for it assigned segment and delegate to a Druid DataSegmentPuller to handle fetching. This requires a user to configure the reader with all necessary properties to connect and authenticate to their deep storage, but all Druid "core" deep storages (local, HDFS, S3, GCS, and Azure) are supported. Alternatively, users can defer to their Spark application's configuration. In this scenario, the reader extracts a URI from it's assigned segment's load spec and then constructs a FileSystem from the Spark application's configuration. The reader uses the file system to pull the extracted URI, meaning that users are not responsible for handling connection and authentication to deep storage. This second case is useful for users running on clusters that rely on GCS ADCs or AWS IAM roles for machine authorization to GCS/S3, or for clusters that manage access keys for their users. Only local, HDFS, S3, and GCS deep storage implementation are supported out of the box for this approach (Azure users will need to use the first approach or register a custom load function via the SegmentReaderRegistry).

Key Classes

DruidBaseInputPartitionReader
DruidColumnarInputPartitionReader
DruidInputPartitionReader
SegmentReaderRegistry

User interface

Users use the connector like any other Spark reader: they call .read on their Spark session, set the format (in this case, "druid"), specify the properties to use, and then call load(). For example:

sparkSession
  .read
  .format("druid")
  .options(Map[String, String](
    "metadata.dbType" -> "mysql",
    "metadata.connectUri" -> "jdbc:mysql://druid.metadata.server:3306/druid",
    "metadata.user" -> "druid",
    "metadata.password" -> "diurd",
    "broker.host" -> "localhost",
    "broker.port" -> 8082,
    "table" -> "dataSource",
    "reader.deepStorageType" -> "local",
    "local.storageDirectory" -> "/mnt/druid/druid-segments/"
))
  .load()

For convenience, a more strongly typed way to apply configure the reader is also provided:

import org.apache.druid.spark.DruidDataFrameReader

val deepStorageConfig = new LocalDeepStorageConfig().storageDirectory("/mnt/druid/druid-segments/")

sparkSession
  .read
  .brokerHost("localhost")
  .brokerPort(8082)
  .metadataDbType("mysql")
  .metadataUri("jdbc:mysql://druid.metadata.server:3306/druid")
  .metadataUser("druid")
  .metadataPassword("diurd")
  .dataSource("dataSource")
  .deepStorage(deepStorageConfig)
  .druid()

User documentation with examples and configuration option descriptions is provided in spark.md.

Additionally, because the connector does not run in a Druid cluster, we can't use Druid's dependency injection to transparently handle custom extensions and behavior. In order to support users who use unsupported or custom complex metrics, deep storage implementations, or metadata databases, the connector uses a plugin-based architecture. All Druid core extensions are supported out of the box. If users need to use their own custom logic, they can register the appropriate functions with the corresponding registry (ComplexMetricRegistry for complex metrics, SQLConnectorRegistry for metadata databases, and SegmentReaderRegistry for loading segments from deep storage).

Key Classes

ComplexMetricRegistry
SQLConnectorRegistry
SegmentReaderRegistry
DruidConfigurationKeys
spark.md


This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

Add code, utilities, tests, and documentation for reading data from Druid using Spark.
@asdf2014 asdf2014 added the Apache Spark https://spark.apache.org/ label Oct 22, 2021
@jihoonson
Copy link
Contributor

Closing and reopening to trigger Travis.

@jihoonson jihoonson closed this Oct 26, 2021
@jihoonson jihoonson reopened this Oct 26, 2021
@jihoonson
Copy link
Contributor

@JulianJaffePinterest I just noticed that Travis is not running for this branch since it's not included in the travis configuration. Can you please add the spark_druid_connector branch there? I think we can remove it later when we merge this branch to master.

@JulianJaffePinterest
Copy link
Contributor Author

@jihoonson added the branch to .travis.yml and fixed spellcheck 🙂

@JulianJaffePinterest
Copy link
Contributor Author

JulianJaffePinterest commented Oct 27, 2021

The packaging check is failing with

  File "/home/travis/build/apache/druid/distribution/bin/generate-binary-license.py", line 181, in <module>
    generate_license(apache_license_v2, license_yaml)
  File "/home/travis/build/apache/druid/distribution/bin/generate-binary-license.py", line 140, in generate_license
    licenses_list = list(yaml.load_all(registry_file))

TypeError: load_all() missing 1 required positional argument: 'Loader'

[ERROR] Command execution failed.

org.apache.commons.exec.ExecuteException: Process exited with an error: 1 (Exit value: 1)
    at org.apache.commons.exec.DefaultExecutor.executeInternal (DefaultExecutor.java:404)
    at org.apache.commons.exec.DefaultExecutor.execute (DefaultExecutor.java:166)
    at org.codehaus.mojo.exec.ExecMojo.executeCommandLine (ExecMojo.java:804)
    at org.codehaus.mojo.exec.ExecMojo.executeCommandLine (ExecMojo.java:751)
    at org.codehaus.mojo.exec.ExecMojo.execute (ExecMojo.java:313)
    at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo (DefaultBuildPluginManager.java:137)
    at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:210)
    at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:156)
    at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:148)
    at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:117)
    at org.apache.maven.lifecycle.internal.builder.multithreaded.MultiThreadedBuilder$1.call (MultiThreadedBuilder.java:190)
    at org.apache.maven.lifecycle.internal.builder.multithreaded.MultiThreadedBuilder$1.call (MultiThreadedBuilder.java:186)
    at java.util.concurrent.FutureTask.run (FutureTask.java:266)
    at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
    at java.util.concurrent.FutureTask.run (FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:624)
    at java.lang.Thread.run (Thread.java:748)

I had thought this was an non-obvious error due to not adding licenses yet but the check is still failing after I add the licenses. Is this something people regularly encounter?

@JulianJaffePinterest
Copy link
Contributor Author

JulianJaffePinterest commented Oct 27, 2021

Ah I see #11814. Let me see if the solution to that issue also solves this issue

Edit: #11799 is the fix for the root issue. I'll pull these changes in and then we can ignore my versions when we eventually rebase off master

I missed including this in the initial set up PR, so including the DynamicConfigProviderRegistry now. DynamicConfigProviders are used in the DruidMetadataClient and in the DeepstorageHelpers. Because we provide the org.apache.druid.spark.registerSubtype function, this registry is technically unnecessary but is included for ease of use.
Rename org.apache.druid.spark.injectableValues to baseInjectableValues to reduce confusion.
* Update Dockerfile

* Update docker_build_containers.sh

* Update Dockerfile
Copy link
Contributor

@jihoonson jihoonson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JulianJaffePinterest thanks for enabling Travis. I haven't finished my review yet, but reviewed only up to the "Determining which segments to load" part. Feel free to address my comments at once when my review is done. I will try to finish my review soon. BTW, I remember you said that you added some tests and made them run in Travis. Are those tests are not in this PR but planned as a follow-up? I'm wondering where they are and how they run in Travis.

docs/operations/spark.md Outdated Show resolved Hide resolved
|`table`|The Druid data source to read from|Yes||
|`reader.deepStorageType`|The type of deep storage used to back the target Druid cluster|No|`local`|
|`reader.segments`|A hard-coded list of Druid segments to read. If set, the table and druid client configurations are ignored and the specified segments are read directly. Must be deserializable into Druid DataSegment instances|No|
|`reader.useCompactSketches`|Controls whether or not compact representations of complex metrics are used (only for metrics that support compact forms)|No|False|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be reader.useCompactMetrics if it applies to all metrics that support compact forms? Or does it apply to only sketches?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there complex metrics that have compact forms and aren't sketches? Either way, at the moment this property is only applied to sketches (see ComplexMetricRegistry)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then, perhaps better rephrasing this to something like:

Suggested change
|`reader.useCompactSketches`|Controls whether or not compact representations of complex metrics are used (only for metrics that support compact forms)|No|False|
|`reader.useCompactSketches`|Controls whether or not compact representations of sketch metrics are used|No|False|

// Otherwise, we'd need to full scan the segments table
val (lowerTimeBound, upperTimeBound) = FilterUtils.getTimeFilterBounds(filters)

metadataClient.getSegmentPayloads(conf.getString(DruidConfigurationKeys.tableKey),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, it seems that metadataClient.getSegmentPayloads simply returns used segments in a given interval. In that case, the returned segments can include some segments overshadowed by others if the coordinator hasn't marked those overshadowed segments unused yet. To handle this problem, Druid uses VersionedIntervalTimeline to find the most recent set of segments. Is there some code somewhere to find the most recent set of segments from the returned segments?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I dropped the overshadow checking logic while chunking up PRs. Thanks for catching this. I've moved the logic to the metadata client because I think it makes more sense there now and addressed a TODO and added support for querying incomplete partitions if desired.

Addressing review comments from Jihoon. Fixes a few typos, adds more logging, and adds back the overshadowed check that was lost in the PR chunking. Additionally resolves a long-standing TODO and allows configuring the reader to include incomplete partitions if desired.
@JulianJaffePinterest
Copy link
Contributor Author

JulianJaffePinterest commented Oct 28, 2021

@jihoonson thanks for starting your review. The end-to-end tests I was referring to in our earlier discussion haven't been added yet (since this PR doesn't include the writing logic), but when they are added no changes will be necessary - they're lightweight enough run as part of mvn test. This PR does include the unit tests for the reader logic, which also run via mvn test, as expected. If you'd like, I can create an explicit Travis job for the spark tests instead of letting them be bundled into the other modules test jobs.

@JulianJaffePinterest
Copy link
Contributor Author

@jihoonson gentle reminder 🙂

Copy link
Contributor

@samarthjain samarthjain left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First round of reviews. Will look closer at deep storage interaction and test coverage next.

* non-overshadowed segments in the interval will be returned, regardless of completesness.
*
* @param datasource The Druid data source to get segment payloads for.
* @param intervalStart The start of the interval to fetch segment payloads for. If None, MIN_INSTANT is used.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may want to mention that start and end intervals are inclusive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% on how to word this: the SQL query being executed uses >= and <= for its bounds, but it's effectively querying against JodaTime Interval endpoints, where the start time is inclusive and the end time is exclusive. The net result is fetching segments within [startTime, endTime). If you were encountering this method while working in an unfamiliar code base, how would you want this communicated?

* @param filter The Spark filter to map to a Druid filter.
* @return The Druid filter corresponding to the filter condition described by FILTER.
*/
def mapFilter(filter: Filter, schema: StructType): DimFilter = { // scalastyle:ignore method.length
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Druid supports a few other filter types like Search, Extraction, Expression, Javascript etc. Are there any spark counterparts for those?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There aren't. The main reason to push down some Spark filters to the readers is to be able to parse out bounds on __time that we could use to further reduce which segments we open. As a happy benefit, in some cases we can push down filters to the readers and filter out rows before returning them to Spark, but we're not aiming to run the Druid query execution engine in a Spark executor. Any operation that would be evaluated in a Druid Expression or Javascript filter should instead be handled by Spark. (As an aside, I wouldn't expect Spark to even attempt to push down any predicates that needed UDFs or custom code to be executed. You can see the set of Spark Filters that can be pushed down in the isSupportedFilter function.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at this code again though, there's probably room in the future to better tie it to the two null handling schemes and increase the number of filters we're able to push down. This is just an efficiency improvement though - if we can't handle a given set of filters, Spark will execute them itself.

StructField, StructType, TimestampType}
import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}

class DruidColumnarInputPartitionReader(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would a better name instead be DruidVectorizedInputPartitionReader. I am not sure if Columnar is representing the fact that this reader is reading column values in batches.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Columnar is the Spark convention for this (e.g. the DataSourceReader mixes in SupportsScanColumnarBatch and this InputPartitionReader returns ColumnarBatches of ColumnVectors). Since this will primarily be read by Druid developers I'm happy to ditch the Spark naming convention and name this DruidVectorizedInputPartitionReader.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes. Makes sense. I am ok with DruidColumnarInputPartitionReader

val filterOpt = FilterUtils.mapFilters(filters, schema.get)
filterOpt.fold(true) { filter =>
val rowSignature = SchemaUtils.generateRowSignatureFromSparkSchema(schema.get)
val canVectorize = filter.toOptimizedFilter.canVectorizeMatcher(rowSignature)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about the case when Druid doesn't support vectorized reads for say a complex column type? Does that mean we should be looking at the schema also to decide value of canVectorize? We should probably disable vectorization then.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are looking at the schema to decide the value of canVectorize (we generate the Druid RowSignature and pass it to the canVectorizeMatcher call). Are you saying we should be more conservative and disable vectorization in additional cases?

import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMediumFactory
import org.apache.druid.segment.{IndexIO, IndexMergerV9}

package object v2 { // scalastyle:ignore package.object.name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a Scala package object. Although the INDEX_IO object is only used one place in the reader, that object and the INDEX_MERGER_V9 object will also be used multiple places in the writer, which is why I've put them in the package object.

One package class was accidentally renamed while moving to piecemeal commits. This doesn't actually affect anything, but moving back to Scala conventions.
Add a try/catch block around the InputPartition.close() methods to make sure we delete the temporary files we created if we're able. If deleting the temp dir throws an Exception we still might leave an orphaned file, but Spark should clean up the temp directory as well so best effort is ok here.
If we're using SQL-compatible null handling, we can support pushing down IsNull and IsNotNull filters to the readers. We can also support pushing down EqualNullSafe filters whose value is null (null <=> null == true), In filters where null appears in the list of value (including short-circuiting for IN (null)), and short-circuiting EqualTo(field, null) filters.
Copy link
Contributor

@jihoonson jihoonson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JulianJaffePinterest sorry, I've been swamping with work recently. I focused on documents and user-facing parts in my review. They LGTM. As long as they are good enough, we can improve other details later.

@jihoonson thanks for starting your review. The end-to-end tests I was referring to in our earlier discussion haven't been added yet (since this PR doesn't include the writing logic), but when they are added no changes will be necessary - they're lightweight enough run as part of mvn test. This PR does include the unit tests for the reader logic, which also run via mvn test, as expected. If you'd like, I can create an explicit Travis job for the spark tests instead of letting them be bundled into the other modules test jobs.

I prefer having a separate Travis job since it is easy to tell where spark tests are running. I'm also expecting to add much more tests in the future. In that case, splitting spark tests from others can reduce the job runtime. Finally, we have a script that tells Travis which part has changed in each PR, so that we don't have to run those tests that are unrelated to the change. Having a separate Travis job for spark connector will make easy for script to run only relevant tests when we make some changes in the connector.

* A registry for plugging in support for Druid complex metric types. Provides definitions for supporting complex types
* in extensions-core out of the box.
*/
object ComplexMetricRegistry extends Logging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: ComplexTypeRegistry could be a better name as we are trying to support complexTypes for dimensions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done 👍

convertConfToInstance(conf, classOf[AzureAccountConfig])
}

def createAzureStorage(conf: Configuration): AzureStorage = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we ever want to support a new storage type, then should we modify this file? Could there be a more extendible way to support new types without recompiling the spark connector?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users can add new deep storage handlers (or override the default ones) using the various plugin registries with no need to recompile the spark connector. If you're asking what happens when Druid adds support for new deep storage types, then someone needs to tell spark how to access the new deep storage type. This can be done either in the spark module or theoretically in the new druid extension, although users would then need to register the new functions themselves. I'm not sure I follow the concern about needing to recompile the spark connector - presumably Druid itself will only add support for new deep storage types when new versions are released, which will mean compiling from source anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was mostly wondering what people should do if they want to register their custom deep storage type to the spark connector. But now looking at the code again, I think I was misunderstood the code probably. I see that users can register their own custom type in the registry.

val dimensions = columns.filter(availableDimensions.contains).asJava
val metrics = columns.filter(availableMetrics.contains).asJava

new IngestSegmentFirehose(List(adapter).asJava, TransformSpec.NONE, dimensions, metrics, filter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Firehose is deprecated and will be removed sooner or later. Suggest to use InputFormat instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored 👍

@JulianJaffePinterest
Copy link
Contributor Author

@jihoonson no worries I know how it goes 🙂. I've split out the spark module tests into separate Travis jobs as you requested.

@jihoonson
Copy link
Contributor

@JulianJaffePinterest thanks for addressing comments. Can you check the travis failure? One is the doc spelling check and another is the "other module test". The other module test failure seems strange as the forked vm exploded while testing druid-kafka-extraction-namespace but seems related to this PR because the same job failed at the same module after restart. Perhaps it's something about splitting the "spark module test" out of the "other module test"? The PR looks good to go once the CI is fixed.

Missed updating .spelling when renaming ComplexMetricRegistry to ComplexTypeRegistry.
@JulianJaffePinterest
Copy link
Contributor Author

@jihoonson do you know if it's possible to get the logs mentioned in the error message (Error occurred in starting fork, check output in log)? I'm having trouble replicating this issue locally so any more information would be helpful.

@jihoonson
Copy link
Contributor

@JulianJaffePinterest I'm not sure. Maybe we can see what is in the surefire reports after they fail in Travis?

@JulianJaffePinterest
Copy link
Contributor Author

Previous build was canceled by Travis - closing and reopening to retrigger

@JulianJaffePinterest
Copy link
Contributor Author

Cancel dance again

@suneet-s
Copy link
Contributor

@JulianJaffePinterest Sorry about that. I canceled your travis run because I was trying to make space for the tests to run on 0.22.1 first. If I cancel again I'll restart your travis job - no need to close and re-open.

@JulianJaffePinterest
Copy link
Contributor Author

@jihoonson git diff d41a3f82 2b97037 --name-only has empty output (d41a3f82 is the last commit to successfully build kafka-extraction-namespace, 2b97037 is the revert commit back after my recent changes that errors while building kafka-extraction-namespace). Based on this, I'm going to reintroduce my changes since they aren't the cause of the failure. Do you know if anything changed in the Travis environment between November 16th and December 1st, or if any other PRs have encountered this issue?

Resets this branch to the latest changes after confirming the failing test was unrelated.
New approach to fixing the KafkaLookupExtractorFactoryTest failure and added some more tests/did some housekeeping of the reader code.
Last change didn't help, now trying to find the .dump files mentioned in the mvn error message.
@JulianJaffePinterest
Copy link
Contributor Author

@jihoonson picking this back up now that I'm back from vacation. Because this PR isn't against master, would it be possible to land it despite the seemingly unrelated failing test? This would allow me to put the last PR for this feature up (the writing component) and begin getting reviews while working on the kafka lookup extraction test in parallel.

@jihoonson
Copy link
Contributor

@JulianJaffePinterest thank you for picking this up again. It makes sense to me. I will merge this PR shortly. Happy new year!

@jihoonson jihoonson merged commit f0888d3 into apache:spark_druid_connector Jan 14, 2022
@cdmikechen
Copy link

Happy to hear that this PR has been merged

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Apache Spark https://spark.apache.org/
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants