Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent execution of FlowJob may cause FlowExecutionException #4092

Closed
acktsap opened this issue Apr 11, 2022 · 2 comments
Closed

Concurrent execution of FlowJob may cause FlowExecutionException #4092

acktsap opened this issue Apr 11, 2022 · 2 comments
Labels
for: backport-to-4.3.x Issues that will be back-ported to the 4.3.x line for: backport-to-5.0.x Issues that will be back-ported to the 5.0.x line has: minimal-example Bug reports that provide a minimal complete reproducible example in: core type: bug
Milestone

Comments

@acktsap
Copy link
Contributor

acktsap commented Apr 11, 2022

Bug description

When flow is made by FlowBuilder, transition is not initialized. It may cause FlowExecutionException when multiple thread runs the same job object with different jobExecutions.

[pool-1-thread-11] ERROR org.springframework.batch.core.job.AbstractJob - Encountered fatal error executing job
org.springframework.batch.core.JobExecutionException: Flow execution ended unexpectedly
	at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:143)
	at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:332)
	at org.springframework.batch.core.job.builder.FlowJobBuilderTests.lambda$testConcurrentDecision$1(FlowJobBuilderTests.java:260)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.springframework.batch.core.job.flow.FlowExecutionException: Ended flow=flow at state=flow.flow0 with exception
	at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:177)
	at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:143)
	at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:137)
	... 6 more
Caused by: java.lang.IllegalArgumentException: Missing state for [StateTransition: [state=subflow1.step0, pattern=COMPLETED, next=subflow1.step1]]
	at org.springframework.batch.core.job.flow.support.SimpleFlow.initializeTransitions(SimpleFlow.java:303)
	at org.springframework.batch.core.job.flow.support.SimpleFlow.initializeTransitionsIfNotInitialized(SimpleFlow.java:268)
	at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:139)
	at org.springframework.batch.core.job.flow.support.state.FlowState.handle(FlowState.java:56)
	at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:168)

Environment

v4.3.5, jdk17

Steps to reproduce

I've made some test code to reproduce it. You can reproduce it by running a following code within FlowJobBuilderTests.

	@Test
	@RepeatedTest(100) // since it's rarely occured.
	public void testConcurrentDecision() throws Exception {
		EmbeddedDatabase embeddedDatabase = new EmbeddedDatabaseBuilder()
				.addScript("/org/springframework/batch/core/schema-drop-hsqldb.sql")
				.addScript("/org/springframework/batch/core/schema-hsqldb.sql")
				.build();
		JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
		factory.setDataSource(embeddedDatabase);
		factory.setTransactionManager(new DataSourceTransactionManager(embeddedDatabase));
		factory.afterPropertiesSet();
		jobRepository = factory.getObject();

		SimpleFlow flow1 = new FlowBuilder<SimpleFlow>("subflow1").start(step1).next(step2).end();
		FlowBuilder<FlowJobBuilder> builder = new JobBuilder("flow").repository(jobRepository).start(flow1)
				.next(step3);
		Job job = builder.build().build();

		int nThreads = Runtime.getRuntime().availableProcessors();
		CyclicBarrier barrier = new CyclicBarrier(nThreads);
		ExecutorService pool = Executors.newFixedThreadPool(nThreads);
		List<Future<JobExecution>> futures = LongStream.range(0, nThreads)
				.boxed()
				.map(i -> {
							try {
								JobParameters jobParameters = new JobParametersBuilder()
										.addLong("count", i)
										.toJobParameters();
								return jobRepository.createJobExecution("flow", jobParameters);
							} catch (Exception e) {
								throw new IllegalStateException(e);
							}
						}
				).map(jobExecution -> pool.submit(() -> {
					barrier.await();
					job.execute(jobExecution);
					return jobExecution;
				}))
				.toList();

		List<JobExecution> jobExecutions = futures.stream()
				.map(future -> {
					try {
						return future.get();
					} catch (Exception e) {
						throw new IllegalStateException(e);
					}
				})
				.collect(Collectors.toList());
		for (JobExecution jobExecution : jobExecutions) {
			assertEquals(BatchStatus.COMPLETED, jobExecution.getStatus());
		}
	}

Expected behavior

No exception.

Minimal Complete Reproducible example

See Steps to reproduce.

@acktsap acktsap added status: waiting-for-triage Issues that we did not analyse yet type: bug labels Apr 11, 2022
@fmbenhassine fmbenhassine added in: core has: minimal-example Bug reports that provide a minimal complete reproducible example and removed status: waiting-for-triage Issues that we did not analyse yet labels Apr 14, 2022
@svenmeier
Copy link

As a workround you have to make sure #afterPropertiesSet() is called during bean creation, e.g. by using a separate bean definition:

@Bean
public Flow partitionFlow() {
    return new FlowBuilder<Flow>("flow")
            .start(...)
            .build();
}

Manually calling #afterPropertiesSet() on the built flow is another possibility.

@acktsap
Copy link
Contributor Author

acktsap commented Jul 25, 2022

@svenmeier yes i'm calling afterPropertiesSet for all flow..

acktsap added a commit to naver/spring-batch-plus that referenced this issue Oct 29, 2022
@fmbenhassine fmbenhassine added for: backport-to-4.3.x Issues that will be back-ported to the 4.3.x line for: backport-to-5.0.x Issues that will be back-ported to the 5.0.x line labels Aug 22, 2023
@fmbenhassine fmbenhassine added this to the 5.1.0-M2 milestone Aug 22, 2023
fmbenhassine pushed a commit that referenced this issue Aug 23, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
for: backport-to-4.3.x Issues that will be back-ported to the 4.3.x line for: backport-to-5.0.x Issues that will be back-ported to the 5.0.x line has: minimal-example Bug reports that provide a minimal complete reproducible example in: core type: bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants