Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Beam Jobs built using version 2.50 and above are not running with FlinkRunners v 1.16.x #32944

Open
3 of 17 tasks
siddhanta-rath opened this issue Oct 25, 2024 · 1 comment

Comments

@siddhanta-rath
Copy link

siddhanta-rath commented Oct 25, 2024

What happened?

We have been running our Beam jobs on Google Cloud Dataflow for a while now. We are now evaluating migrating to running them on Flink.
All of our jobs are built using beam sdk version 2.56.0.

During this exercise, we have experienced an issue where our jobs are not coming up on FlinkRunner with Flink(v1.16.x) when we use the same pipelines built using 2.56.0 but when we downgrade the beam SDK version to 2.49 the jobs start running. But, when we downgrade we lose out on some of the features offered in the newer Beam Sdks ( RequestResponseIO among others, which is a critical component in one of our jobs).

We tried to run the WordCount example pipeline with the beam version starting from 2.50 till 2.56.0 with the respective runner versions from 2.50 to 2.58.0 on Flink 1.16.x but to no avail. But it runs fine on 2.49.0 version

These are the failure LOGS

caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Pipeline execution failed at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:261) ~[flink-dist_2.12-1.14.3.jar:1.14.3] ... 12 more Caused by: java.lang.RuntimeException: Pipeline execution failed at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:107) ~[?:?] at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325) ~[?:?] at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310) ~[?:?] at com.gojek.de.jobs.JobManager.main(JobManager.java:83) ~[?:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_372] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_372] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_372] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_372] at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:261) ~[flink-dist_2.12-1.14.3.jar:1.14.3] ... 12 more Caused by: org.apache.flink.api.common.InvalidProgramException: Job was submitted in detached mode. Results of job execution, such as accumulators, runtime, etc. are not available. at org.apache.flink.core.execution.DetachedJobExecutionResult.getNetRuntime(DetachedJobExecutionResult.java:51) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.createAttachedPipelineResult(FlinkPipelineExecutionEnvironment.java:171) ~[?:?] at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:154) ~[?:?] at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:104) ~[?:?] at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325) ~[?:?] at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310) ~[?:?] at com.gojek.de.jobs.JobManager.main(JobManager.java:83) ~[?:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_372] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_372] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_372] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_372] at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:261) ~[flink-dist_2.12-1.14.3.jar:1.14.3] ... 12 more 2024-07-25 11:45:12,956 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error occurred in the cluster entrypoint. java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application. at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_372] at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_372] at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957) ~[?:1.8.0_372] at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) ~[?:1.8.0_372] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_372] at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_372] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:287) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:224) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_372] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_372] at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171) ~[flink-rpc-akka_3172f478-923e-4f49-b4aa-a63947c06051.jar:1.14.3] at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_3172f478-923e-4f49-b4aa-a63947c06051.jar:1.14.3] at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41) ~[flink-rpc-akka_3172f478-923e-4f49-b4aa-a63947c06051.jar:1.14.3] at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) [flink-rpc-akka_3172f478-923e-4f49-b4aa-a63947c06051.jar:1.14.3] at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48) [flink-rpc-akka_3172f478-923e-4f49-b4aa-a63947c06051.jar:1.14.3] at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_372] at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_372] at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_372] at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) [?:1.8.0_372] Caused by: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application. ... 13 more Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Pipeline execution failed at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:261) ~[flink-dist_2.12-1.14.3.jar:1.14.3] ... 12 more Caused by: java.lang.RuntimeException: Pipeline execution failed at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:107) ~[?:?] at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325) ~[?:?] at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310) ~[?:?] at com.gojek.de.jobs.JobManager.main(JobManager.java:83) ~[?:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_372] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_372] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_372] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_372] at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:261) ~[flink-dist_2.12-1.14.3.jar:1.14.3] ... 12 more Caused by: org.apache.flink.api.common.InvalidProgramException: Job was submitted in detached mode. Results of job execution, such as accumulators, runtime, etc. are not available. at org.apache.flink.core.execution.DetachedJobExecutionResult.getNetRuntime(DetachedJobExecutionResult.java:51) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.createAttachedPipelineResult(FlinkPipelineExecutionEnvironment.java:171) ~[?:?] at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:154) ~[?:?] at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:104) ~[?:?] at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325) ~[?:?] at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310) ~[?:?] at com.gojek.de.jobs.JobManager.main(JobManager.java:83) ~[?:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_372] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_372] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_372] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_372] at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.14.3.jar:1.14.3] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:261) ~[flink-dist_2.12-1.14.3.jar:1.14.3] ... 12 more

I have gone through the beam community issues [Issue/29660] and also the release notes. Beyond the compatibility mismatch (which in our case is already addressed), I didn't find anything else that could directly be the cause for the mentioned issue...

QUESTION IS - Has anyone in the community experienced such issues and have found a workaround to run Beam pipelines built using Beam SDKs > 2.49.0 on Flink ? Particularly if anybody is successfully running Beam pipelines built using SDK version 2.56 or newer...

Any help is appreciated !

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@siddhanta-rath
Copy link
Author

From the change log from beam SDK 2.50.0, I learned that some changes have been made to support both x86 and ARM CPU architectures in beam images. Is this the reason for this incompatibility?

@siddhanta-rath siddhanta-rath changed the title [Bug]: Beam Jobs built using ver 2.50 and above are not running with FlinkRunners v 1.16.x [Bug]: Beam Jobs built using version 2.50 and above are not running with FlinkRunners v 1.16.x Oct 25, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant