Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MAINTENANCE] [MAINTENANCE] Add force_reuse_spark_context to DatasourceConfigSchema #3126

Conversation

pasmavie
Copy link
Contributor

@pasmavie pasmavie commented Jul 28, 2021

With PR 2733 a parameter was added to SparkDFDatasource, to make GE's context reuse an existing Spark Context.

This was extremely useful.

However, when using a dynamic Data Context configuration (e.g. in EMR) like

data_context_config = DataContextConfig(
    config_version=2,
    plugins_directory=None,
    config_variables_file_path=None,
    datasources={
        "my_data_source": {
            "class_name": "SparkDFDatasource",
            "spark_config": dict(spark.sparkContext.getConf().getAll()),
            "force_reuse_spark_context": True,
            "module_name": "great_expectations.datasource",
            "batch_kwargs_generators": {},
        }
    },
    ...

I've found that the spark_config and force_reuse_spark_context parameters weren't actually passed to geat_expectations.core.util.get_or_create_spark_application().

In fact, the parameters are lost when the DataContextConfig object is dumped into a dictionary, because the DatasourceConfigSchema (elem of the DataContextConfigSchema) doesn't list these parameters.

Changes proposed in this pull request:

  • Add spark_config and force_reuse_spark_context to the DatasourceConfigSchema

Definition of Done

Please delete options that are not relevant.

  • My code follows the Great Expectations style guide
  • I have performed a self-review of my own code
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have added unit tests where applicable and made sure that new and existing tests are passing.
  • I have run any local integration tests and made sure that nothing is broken.

@netlify
Copy link

netlify bot commented Jul 28, 2021

👷 Deploy request for niobium-lead-7998 pending review.
Visit the deploys page to approve it

🔨 Explore the source changes: a210a83

@pasmavie
Copy link
Contributor Author

@talagluck here's the new attempt for the closed #2968

@talagluck
Copy link
Contributor

Thanks for re-opening this, @gipaetusb ! We will review over the next week and be in touch.

@mbakunze
Copy link
Contributor

mbakunze commented Aug 5, 2021

This would be very useful to have!

@talagluck talagluck added the devrel This item is being addressed by the Developer Relations Team label Aug 5, 2021
@pasmavie
Copy link
Contributor Author

pasmavie commented Aug 6, 2021

Thanks @mbakunze.

@talagluck:

  • I've added a description of the changes to docs_rtd/changelog.rst
  • I took another look at the tests this morning but I can't figure why they're failing. I'm afraid I'll really need some help 🙏

@mbakunze
Copy link
Contributor

I mentioned this issue in the Slack channel - this seems to currently block us to use GE with spark on k8s (at least in the way we wanted to use it :) ).

@talagluck
Copy link
Contributor

Thanks so much for the prompt, @mbakunze ! By any chance would you have time to take a look and see why the tests are failing here? Otherwise I can prioritize this for next week.

@mbakunze
Copy link
Contributor

I didn't understand why some tests failed when I glanced at it. But will try to take a look again.

…spark-context-emr

# Conflicts:
#	docs_rtd/changelog.rst
@mbakunze
Copy link
Contributor

mbakunze commented Aug 14, 2021

@gipaetusb I started working on fixing the tests in #3245

@pasmavie
Copy link
Contributor Author

@mbakunze thank you very very much!

@pasmavie
Copy link
Contributor Author

Hi @talagluck, thanks to @mbakunze this is finally ready to be merged :) !

Copy link
Contributor

@talagluck talagluck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much this contribution and for your patience, @gipaetusb and @mbakunze ! Great work - LGTM!

@talagluck talagluck merged commit 5d818ee into great-expectations:develop Aug 20, 2021
@fep2
Copy link
Contributor

fep2 commented Aug 24, 2021

I've looked into this fix in my current setup, unfortunately if I changed my config from Datasource to SparkDFDatasource I get the following issue return datasource.get_batch_list_from_batch_request( AttributeError: 'SparkDFDatasource' object has no attribute 'get_batch_list_from_batch_request'

Next if I change it back to Datasource with the fix I get the following error datasource: Datasource = cast(Datasource, self.datasources[datasource_name]) KeyError: 'my_spark_datasource' this is do to the fact that the Datasource class when instantiated doesn't know what to do with the force_reuse_spark_context flag and the error get's hidden (this needs to be fix) and the my_spark_datasource is never instantiated causing it to throw this KeyError exception

Here a reference of what my data_source config looks like
{ "my_spark_datasource": { "class_name": "Datasource", "force_reuse_spark_context": True, "execution_engine": { "class_name": "SparkDFExecutionEngine" }, "data_connectors": { "my_runtime_data_connector": { "module_name": "great_expectations.datasource.data_connector", "class_name": "RuntimeDataConnector", "batch_identifiers": [ "some_key" ] } } } }

In this case I want a runtime batch following the directions laid out here -> https://discuss.greatexpectations.io/t/how-to-validate-spark-dataframes-in-0-13/582

I think the solution is to not only pass force_reuse_spark_context to the SparkDFDatasource but also pass it to SparkDFExecutionEngine I was able to get a working solution by adding the following to ExecutionEngineSchema

class ExecutionEngineConfigSchema(Schema):
    class Meta:
        unknown = INCLUDE

    class_name = fields.String(required=True)
    module_name = fields.String(missing="great_expectations.execution_engine")
    connection_string = fields.String(required=False, allow_none=True)
    credentials = fields.Raw(required=False, allow_none=True)
    spark_config = fields.Raw(required=False, allow_none=True)
    boto3_options = fields.Dict(
        keys=fields.Str(), values=fields.Str(), required=False, allow_none=True
    )
    caching = fields.Boolean(required=False, allow_none=True)
    batch_spec_defaults = fields.Dict(required=False, allow_none=True)
    force_reuse_spark_context = fields.Bool(required=False, missing=False)

This is what my data_source config looks like

{
        "my_spark_datasource": {
            "class_name": "Datasource",
            "execution_engine": {
                "class_name": "SparkDFExecutionEngine",
                "force_reuse_spark_context": True,
            },
            "data_connectors": {
                "my_runtime_data_connector": {
                    "module_name": "great_expectations.datasource.data_connector",
                    "class_name": "RuntimeDataConnector",
                    "batch_identifiers": [
                        "some_key"
                    ]
                }
            }
        }
    }

@mbakunze
Copy link
Contributor

Nice - I guess @gipaetusb and I were still using the v2 approach. Great if we can get this working in v3 as well - I did not test it.

@talagluck
Copy link
Contributor

Thanks for the feedback, @fep2! @mbakunze is exactly right here. This fix was just for V2. SparkDFDatasource is a V2 abstraction, and so mixing and matching with V3 abstractions like ExecutionEngines will cause errors. If you have any interest in making the fix for V3, we would welcome the contribution!

@fep2
Copy link
Contributor

fep2 commented Aug 25, 2021

As much as I would love to contribute, I have other commitments I must prioritize, that said I think I'll open a bug ticket or feature ticket and reference the following. I think that way the need is address and hopefully this is something user would want sooner rather than later.

@talagluck
Copy link
Contributor

That sounds great, thank you @fep2!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
community devrel This item is being addressed by the Developer Relations Team
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants