Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removed Promise in favor of Future.join(). Made ForkJoinPool.commonPool() default ExecutorService. #2093

Merged
merged 7 commits into from
Sep 25, 2017
Merged

Removed Promise in favor of Future.join(). Made ForkJoinPool.commonPool() default ExecutorService. #2093

merged 7 commits into from
Sep 25, 2017

Conversation

danieldietrich
Copy link
Contributor

Fixes #2085

@nfekete
Copy link
Member

nfekete commented Sep 16, 2017

Does the deadlock only occur on the CI server? Can you reproduce it locally?

@danieldietrich
Copy link
Contributor Author

danieldietrich commented Sep 16, 2017

Only on the CI server, locally all is fine.

I started a new build without junit test forking.
It just started: https://travis-ci.org/vavr-io/vavr/builds/276364880

@danieldietrich
Copy link
Contributor Author

danieldietrich commented Sep 16, 2017

It did not work. Now I print the enter/exit information during FutureTest:

    @Rule
    public TestRule watcher = new TestWatcher() {
        @Override
        protected void starting(Description description) {
            System.out.println("Starting test: " + description.getMethodName());
        }
        @Override
        protected void finished(Description description) {
            System.out.println("Finished test: " + description.getMethodName());
        }
    };

Let's see, which method causes the deadlock...
https://travis-ci.org/vavr-io/vavr/builds/276368239

@danieldietrich
Copy link
Contributor Author

The blocking guy seems to be FutureTest.shouldFilterFuture(). Will digg into it. Maybe we revealed a bug?

@nfekete
Copy link
Member

nfekete commented Sep 16, 2017

I don't know. It would be good to see what are the differences between CI instance and local one. Maybe CI is virtualized so that it only sees 1 CPU, which might cause differences between the default JVM setup of common concurrency primitives? It would be much easier to debug if we could reproduce the issue locally. Can you find out the Travis instance VCPU count?

@danieldietrich
Copy link
Contributor Author

I think there is one CPU with two cores:

screen shot 2017-09-17 at 01 26 04

screen shot 2017-09-17 at 01 24 45

I don't know how to reproduce it. Will for now go step by step (with System.out.println)...

@danieldietrich
Copy link
Contributor Author

danieldietrich commented Sep 16, 2017

After I commented out

    @Test
    public void shouldFilterFuture() {
        final Future<Integer> future = Future.successful(42);
        assertThat(future.filter(i -> i == 42).get()).isEqualTo(42);
        assertThat(future.filter(i -> i == 43).isEmpty()).isTrue();
    }

the tests proceeded much further but blocked again at

    @Test
    public void shouldAwaitOnGet() {
        final Future<Integer> future = Future.of(() -> {
            Try.run(() -> Thread.sleep(250L));
            return 1;
        });
        assertThat(future.get()).isEqualTo(1);
    }

I think it is the Future.get() method that causes the trouble:

    @Override
    default T get() {
        return await().getValue().get().get();
    }

More specifically the await() method:

    /**
     * Blocks the current Thread until this Future completed or returns immediately if this Future is already completed.
     *
     * @return this {@code Future} instance
     */
    Future<T> await();

Which is implemented in FutureImpl.java:

    @Override
    public Future<T> await() {  // 1)
        if (!isCompleted()) {
            final Object monitor = new Object();
            onComplete(ignored -> {  // 2)
                synchronized (monitor) {
                    monitor.notify();
                }
            });
            synchronized (monitor) {  // 3)
                if (!isCompleted()) {
                    Try.run(monitor::wait);
                }
            }
        }
        return this;
    }

**One possibility is that the monitor.notify() is executed before monitor.wait(). In that case the Future would never return back...

I will wrap my head around it after getting some coffee :)

@danieldietrich
Copy link
Contributor Author

An execution like the following would explain the failure. But such an execution is not possible because we synchronize wait/notify on monitor. But it has to be a similar execution like this...

  1. await() is called
  2. onComplete(...)
  • this future is not complete, yet
  • therefore the complete-handler is queued
  • onComplete exits
  1. synchronized(monitor) is entered and this Future is not completed yet
  • if (!isCompleted()) is entered
  • a context switch happens and completes the Future
  • on that completing thread a sub-thread is spawned that executes the complete handler
  • monitor.notify() is called (context switches still may have taken place)
  • finally the monitor::wait is executed... and hangs for-ever

@nfekete
Copy link
Member

nfekete commented Sep 17, 2017

I pulled your changes from your branch and changed the following:

Index: pom.xml
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- pom.xml	(date 1505604858000)
+++ pom.xml	(revision )
@@ -302,6 +302,7 @@
                     <artifactId>maven-surefire-plugin</artifactId>
                     <version>${maven.surefire.version}</version>
                     <configuration>
+                        <argLine>-Djava.util.concurrent.ForkJoinPool.common.parallelism=1</argLine>
                         <parallel>none</parallel>
                         <!--
                         <parallel>all</parallel>

With this change I was able to reproduce the Travis deadlock. It should be easier to debug it locally with this.

@danieldietrich
Copy link
Contributor Author

@nfekete that's great! Maybe it is related to this:

screen shot 2017-09-17 at 02 35 12

(Source: https://stackoverflow.com/a/23480863/1110815)

@nfekete
Copy link
Member

nfekete commented Sep 17, 2017

I think I met a similar problem with parallel streams a while ago, that's why I volunteered to help with ForkJoinPool issues. I can't recall precisely what happened there, but there was a similar pattern: tasks in an FJP waiting on other tasks in the same pool.

@danieldietrich
Copy link
Contributor Author

I've now a better understanding of the 'error'. I'm not sure, if we are able to control the fork/join mechanism within one thread pool. There is ForkJoinPool.ManagedBlocker but we can't hard-code it into FutureImpl because we do not know which ExecutorService is used at runtime.

I will sleep over the topic and look into it tomorrow.

@danieldietrich
Copy link
Contributor Author

Maybe I find a way to implement await() based on ExecutorService.invokeAny(...)...

@danieldietrich
Copy link
Contributor Author

danieldietrich commented Sep 17, 2017

Status: I reimplement await() now (without the low-level wait/notify, which does not seem to work on a ForkJoinPool):

@Override
public Future<T> await() {
    // lock is the guard for all future result related fields (i.e. the completion state)
    synchronized (lock) {
        if (!isCompleted()) {
            try {
                // job is of type j.u.c.Future, get() blocks - and could also receive a timeout parameter
                complete(job.get());
            } catch(Throwable x) {
                complete(Try.failure(x));
            }
        }
    }
    return this;
}

However, it is still not working - but should in my opinion :)

@danieldietrich
Copy link
Contributor Author

danieldietrich commented Sep 17, 2017

[STARTING] 2017-09-17T12:08:10.944 shouldFilterFuture(io.vavr.concurrent.FutureTest)

Future.success(42)
---
[Promise] Thread[main,5,main] tryComplete(Success(42))
[Future] Thread[main,5,main] tryComplete: value = Success(42)
[Future] Thread[main,5,main] tryComplete: isCompleted() = false
[Future] Thread[main,5,main] tryComplete: complete(Success(42))
[Future] Thread[main,5,main] complete: value = Success(42)
[Future] Thread[main,5,main] complete: setting value
[Future] Thread[main,5,main] complete: executing actions
[Future] Thread[main,5,main] tryComplete: completed

asserting...
---
[Future] Thread[main,5,main] filter: io.vavr.concurrent.FutureTest$$Lambda$6/1058025095@27abe2cd
[Future] Thread[main,5,main] filterTry: io.vavr.concurrent.Future$$Lambda$7/1359044626@29444d75
[Future] Thread[main,5,main] filterTry: creating promise
[Future] Thread[main,5,main] filterTry: onComplete(...)
[Future] Thread[main,5,main] perform: spawning new onComplete thread
[Future] Thread[main,5,main] filterTry: return promise.future()
[Future] Thread[main,5,main] await()

get()
---
[Future] Thread[main,5,main] await: synchronized (lock) {
[Future] Thread[main,5,main] await: job = null
[Future] Thread[main,5,main] await: value = None
[Future] Thread[main,5,main] await: !isCompleted()
#
# Update: this is the root cause: job can't be null if the future is not completed, yet (see below invariant)
#
[Future] Thread[main,5,main] await: job.get()

filtering i == 42
[Promise] Thread[ForkJoinPool.commonPool-worker-1,5,main] tryComplete(Success(42))
[Future] Thread[ForkJoinPool.commonPool-worker-1,5,main] tryComplete: value = Success(42)

#
# Here is the error. The job (j.u.c.Future(() -> Success(42).filter(i -> i == 42))) is finished before the Promise is completed!?
#
[Future] Thread[main,5,main] await: completed

[Future] Thread[ForkJoinPool.commonPool-worker-1,5,main] tryComplete: isCompleted() = false
[Future] Thread[ForkJoinPool.commonPool-worker-1,5,main] tryComplete: complete(Success(42))
[Future] Thread[ForkJoinPool.commonPool-worker-1,5,main] complete: value = Success(42)
[Future] Thread[ForkJoinPool.commonPool-worker-1,5,main] complete: setting value
[Future] Thread[ForkJoinPool.commonPool-worker-1,5,main] complete: executing actions
[Future] Thread[ForkJoinPool.commonPool-worker-1,5,main] tryComplete: completed

[FINISHED] 2017-09-17T12:08:10.988 shouldFilterFuture(io.vavr.concurrent.FutureTest)

Update: The problem with this new implementation is, that Promise.make() creates a Future that has no underlying job (= null). I will find a solution. The invariant has to be

future.isCompleted() <=> future.job == null && future.value != null

In Promise.make().future() the invariant is not satisfied.

@danieldietrich
Copy link
Contributor Author

danieldietrich commented Sep 17, 2017

Currently I see the following solution:

  1. We switch to ForkJoinPool
  2. We deprecate Promise (to be removed in 1.0.0)
  3. We enhance Future with a safe/scoped completion feature (see Add factory method that allows to resolve/reject a Future #1529). This is comparable to ECMAScript 6 Promise. Internally we then replace usages of Promise with the new feature.
static <T> Future<T> complete(CheckedConsumer<CheckedConsumer<Try<? extends T>>> resultConsumer) {
    return complete(DEFAULT_EXECUTOR_SERVICE, resultConsumer);
}

static <T> Future<T> complete(ExecutorService executorService, CheckedConsumer<CheckedConsumer<Try<? extends T>>> resultConsumer) {
    return null; // TODO
}

static <T> Future<T> complete(CheckedBiConsumer<CheckedConsumer<? extends T>, CheckedConsumer<? extends Throwable>> resultConsumer) {
    return complete(DEFAULT_EXECUTOR_SERVICE, resultConsumer);
}

static <T> Future<T> complete(ExecutorService executorService, CheckedBiConsumer<CheckedConsumer<? extends T>, CheckedConsumer<? extends Throwable>> resultConsumer) {
    return null; // TODO
}

static void examples() {

    // calls Future.complete(CheckedConsumer<CheckedConsumer<Try<T>>)
    Future.complete(complete -> complete.accept(Try.of(() -> null)));

    // calls Future.complete(CheckedBiConsumer<CheckedConsumer<T>, CheckedConsumer<Throwable>>)
    Future.complete((success, failure) -> success.accept(null));
}


@FunctionalInterface
interface CheckedBiConsumer<T, U> {

    void accept(T t, U u) throws Throwable;

    default CheckedBiConsumer<T, U> andThen(CheckedBiConsumer<? super T, ? super U> after) {
        Objects.requireNonNull(after);
        return (l, r) -> {
            accept(l, r);
            after.accept(l, r);
        };
    }
}

(Please note: It is correct here to use Checked(Bi)Consumers in a covariant way rather than a contravariant way, like usual.)

Given the suggested await() / await(timout, timeunit), we will solve here the following issues:

Note: We already planned to remove Promise in 1.0.0 and possibly rename Future to Promise because of a name clash with java.util.concurrent.Future. See this blog post.

@danieldietrich
Copy link
Contributor Author

This change has a too great impact for a patch release. I will target 1.0 instead of 0.9.1.

@danieldietrich danieldietrich modified the milestones: vavr-0.9.1, vavr-1.0.0 Sep 17, 2017
@danieldietrich
Copy link
Contributor Author

danieldietrich commented Sep 18, 2017

Pushed a test-version with some /*TODO*/ sections.
Basically we need to add an internal feature that enables us to complete a Future on the same thread with onComplete(). By default onComplete() spawns a new task that executes the follow-up action.
Will do that tomorrow.

@danieldietrich
Copy link
Contributor Author

danieldietrich commented Sep 21, 2017

I've collected some test data and ran the FutureTest class for different levels of parallelism: 1, 2, 4, 8

Tests run: 221, Failures: 0, Errors: 0, Skipped: 2, Time elapsed: 13.722 sec - in io.vavr.concurrent.FutureTest

I observe that the 'queued submission' of the common ForkJoinPool goes up at some point if the parallelism is low (1, 2). Then the deadlock occurs.

If we increase the parallelism (4, 8), the tests work as expected.

screen shot 2017-09-21 at 22 30 18

screen shot 2017-09-21 at 22 32 05

screen shot 2017-09-21 at 22 32 56

screen shot 2017-09-21 at 22 33 40

@chb0github
Copy link
Contributor

@danieldietrich empirical evidence FTW

@danieldietrich
Copy link
Contributor Author

I could imagine that we need to fine-tune the ForkJoinPool behavior and the thread creation. (See also scala/bug#8955).

I could also imagine that the probability of deadlocks increases because we use the blocking await() call (also implicitly used in get() and isEmpty()).

@danieldietrich
Copy link
Contributor Author

danieldietrich commented Sep 21, 2017

We need a different blocking strategy. Currently we have

// FutureImpl
    @Override
    public Future<T> await() throws InterruptedException {
        if (!isCompleted()) {
            final BlockingQueue<Try<T>> queue = new ArrayBlockingQueue<>(1);
            onComplete(queue::add);
            ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker() {
                @Override
                public boolean block() throws InterruptedException {
                    if (value.isEmpty()) {
                        // Possibly blocks until one element is available.
                        // If the current Thread receives the element,
                        // this Future is already completed because
                        // actions are performed after completion.
                        queue.take();
                    }
                    return true;
                }
                @Override
                public boolean isReleasable() {
                    return value.isDefined();
                }
            });
        }
        return this;
    }

It may be that the use of LockSupport makes more sense (parking/unparking threads). This would release resources. However, the CompletableFuture implementation is more than ugly. Usages of UNSAFE everywhere.

However, blocking is discouraged anyway. If a async program makes extensive use of blocking, there's something wrong. Blocking in the presence of low parallelism might be also dangerous. We need to understand the domain better. The only thing we can do is measuring.

I tested the ignored FutureTest unit tests with CompletableFuture (see #1530). This means that our await() method can be improved.

I think the whole deadlock problem is all about blocking Futures - more specifically, the wrong blocking strategy. A thread should wake up periodically in order to check the completion state. However, we can't loop without parking threads because it would eat up 100% of one core.

Parking/unparking threads and periodically checking the finished state is the strategy of CompletableFuture. Parking is not the problem. The question is: who unparks a thread. I think it is hidden somewhere in the ForkJoin framework, might be specific for ForkJoinWorkerThreads.


We should play around with ThreadFactories. This leads to an own ForkJoinPool as DEFAULT_EXECUTOR_SERVICE. We can't use the ForkJoinPool.commonPool() anymore.

new ForkJoinPool(
    parallelism, // int
    factory, // ForkJoinPool.ForkJoinWorkerThreadFactory
    handler, // Thread.UncaughtExceptionHandler
    asyncMode // boolean
)

Especially the asyncMode controls the way, queued tasks are run. The default (false) has a LIFO run order. But we might want to run the tasks first that where enqueued first (FIFO).

@danieldietrich
Copy link
Contributor Author

danieldietrich commented Sep 22, 2017

There are some minor changes in here. I will split this PR into smaller ones. However, for now I will go on to do some improvements here (in this PR).

This PR goes into 1.0.0. But I think that I'm able to provide a fix that will go into 0.9.2, namely switching to ForkJoinPool.commonPool() and re-implementing a more effective await() + await(timeout, timeunit).


But now to something completely different. Here is an idea on how to optimize the 'blocking' await() method.

In Java there are several Thread states:

java-thread-states

Image: © by http://uml-diagrams.org, created by Kirill Fakhroutdinov

I want to investigate if it is possible to put an existing (the current) thread in an 'inactive' state in the way that other threads are allowed to run instead (if they otherwise had to wait until more 'slots' are available in the given context).

I see the opportunity to use LockSupport#park() for that purpose. Here is my implementation idea:

  • our FutureImpl gets a new field, a queue of threads that wait for this Future
  • the FutureImpl complete method will execute waitingThreads.forEach(Thread::unpark)
  • the FutureImpl.await() method puts Thread.currentThread() on the waiting queue and parks it (if the Future isn't completed yet)

Technical details:

  • waitingThreads needs to be 'guarded' by the same lock as the other instance variables that are relevant regarding the completion state
  • waitingThreads need only to be unparked if their actual state is 'parked' (aka suspended?)
  • maybe we need to extra-check interrupted conditions (where threads come into play) and act accordingly
  • ...

@danieldietrich
Copy link
Contributor Author

danieldietrich commented Sep 23, 2017

The park/unpark does not seem to help regarding our problem of deadlocks when executing the FutureTest test suite using ForkJoinPool parallelism=1.

I've implement several await() strategies, namely

  • using wait/notify using a monitor
  • using park/unpark
  • using a ManagedBlocker than blocks on a concurrent queue than takes the result

None of them solves the deadlock problem described above. I tend to use the ManagedBlocker for the final version because this is the way to go in a ForkJoin context. Update: The ManagedBlocker itself does not block the thread. We will use it and internally park/unpark because it is considered fast. CompletableFuture acts the same way.

Observation: It is interesting that each test runs fine if started as a single test. Only the whole test suite seams to deadlock, even if we do not allow parallel tests (as surefire configuration).

The next step I take is to use a custom ForkJoinPool that differs in two aspects from ForkJoinPool.commonPool():

  • We will set parallelism to default parallelism + 1.
  • We create a custom thread factory that gives us more control over the threads that are created. Of course we need daemons in order to not block the system shutdown. Most notably we might need to raise the upper bound of threads that exist. Also we need to investigate if we need the original purpose of the ForkJoinPool to have units of work that are forked, wait for results and then joined. Maybe we can relax the behavior here a bit.

@danieldietrich
Copy link
Contributor Author

Oh oh! 😱

Our unit tests contain Futures that block forever. This might be the reason for deadlocks when having parallelism=1 (or=2):

    @Test
    public void shouldGetValueOfUncompletedFuture() {
        final Future<?> future = Future.of(Concurrent::waitForever);
        assertThat(future.getValue()).isEqualTo(Option.none());
    }

@danieldietrich danieldietrich changed the title Changed Future.DEFAULT_EXECUTOR_SERVICE to ForkJoinPool.commonPool() Removed Promise in favor of Future.promise(). Made ForkJoinPool.commonPool() default ExecutorService. Sep 24, 2017
@danieldietrich danieldietrich changed the title Removed Promise in favor of Future.promise(). Made ForkJoinPool.commonPool() default ExecutorService. Removed Promise in favor of Future.join(). Made ForkJoinPool.commonPool() default ExecutorService. Sep 25, 2017
@codecov-io
Copy link

codecov-io commented Sep 25, 2017

Codecov Report

❗ No coverage uploaded for pull request base (master@c05946e). Click here to learn what that means.
The diff coverage is 89.4%.

Impacted file tree graph

@@           Coverage Diff            @@
##             master   #2093   +/-   ##
========================================
  Coverage          ?   97.4%           
  Complexity        ?    5196           
========================================
  Files             ?      92           
  Lines             ?   11909           
  Branches          ?    1573           
========================================
  Hits              ?   11600           
  Misses            ?     154           
  Partials          ?     155
Impacted Files Coverage Δ Complexity Δ
vavr/src/main/java/io/vavr/Value.java 93.51% <ø> (ø) 143 <0> (?)
...r/src/main/java/io/vavr/concurrent/FutureImpl.java 78.44% <79.03%> (ø) 26 <14> (?)
vavr/src/main/java/io/vavr/concurrent/Future.java 97.95% <96.62%> (ø) 120 <49> (?)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update c05946e...5a1ca80. Read the comment docs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants