Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Callback thread safety issue #486

Closed
egolearner opened this issue May 31, 2021 · 11 comments
Closed

Callback thread safety issue #486

egolearner opened this issue May 31, 2021 · 11 comments

Comments

@egolearner
Copy link
Contributor

In our project, Java implemented callback is passed to native code. The function signature looks like std::function<bool(uint64_t)>.

        infoMap.put(new Info("std::function<bool(uint64_t)>").pointerTypes("FilterFunction"));

    public static class FilterFunction extends FunctionPointer {
        static { Loader.load(); }
        /** Pointer cast constructor. Invokes {@link Pointer#Pointer(Pointer)}. */
        public    FilterFunction(Pointer p) { super(p); }
        protected FilterFunction() { allocate(); }
        private native void allocate();
        public native boolean call(@Cast("uint64_t") long key);
    }

In our testing program, a ThreadPool is created. Each thread runs the same logic, creating one callback and passing the callback to native code. Pseudo code looks like this

        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(threadCount);
        List<Future> futures = new ArrayList<>();
        NativeClass2 processor = new ...;
        for (int i = 0; i < threadCount; i++) {
            futures.add(executor.submit(()->{
                    NativeClass1 ctx = new ...;
                    ctx.set_filter(new FilterFunction() {
                        @Override
                        public boolean call(@Cast("uint64_t") long key) {
                            return false;
                        }
                    });
                   for (Query q : queries) {
                        processor.process(ctx, q);
                   }
           });
        }

When there are 29 or less threads created, the testing program runs fine. However creating 30 or more threads will lead to crashing(core dump).

#0  0x00007efef0bb88af in raise () from /lib64/libc.so.6
#1  0x00007efef0bba4aa in abort () from /lib64/libc.so.6
#2  0x00007efef0417ee9 in os::abort(bool) () from /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.292.b10-1.1.centos7.x86_64/jre/lib/amd64/server/libjvm.so
#3  0x00007efef063f03a in VMError::report_and_die() () from /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.292.b10-1.1.centos7.x86_64/jre/lib/amd64/server/libjvm.so
#4  0x00007efef0422095 in JVM_handle_linux_signal () from /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.292.b10-1.1.centos7.x86_64/jre/lib/amd64/server/libjvm.so
#5  0x00007efef04150a8 in signalHandler(int, siginfo_t*, void*) () from /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.292.b10-1.1.centos7.x86_64/jre/lib/amd64/server/libjvm.so
#6  <signal handler called>
#7  0x00007efef01dc4bf in jni_invoke_nonstatic(JNIEnv_*, JavaValue*, _jobject*, JNICallType, _jmethodID*, JNI_ArgumentPusher*, Thread*) [clone .constprop.235] ()
   from /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.292.b10-1.1.centos7.x86_64/jre/lib/amd64/server/libjvm.so
#8  0x00007efef01e66aa in jni_CallBooleanMethodA () from /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.292.b10-1.1.centos7.x86_64/jre/lib/amd64/server/libjvm.so
#9  0x00007efde0380d48 in ?? ()
   from /home/user/.javacpp/cache/java-sdk-samples-1.0-SNAPSHOT-jar-with-dependencies.jar/com/mycompany/myproject2/linux-x86_64/libjnimyproject.so
#10 0x00007efde03c09bc in ?? ()
#0  0x00007efef0bb88af in raise () from /lib64/libc.so.6
#1  0x00007efef0bba4aa in abort () from /lib64/libc.so.6
#2  0x00007efef0417ee9 in os::abort(bool) () from /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.292.b10-1.1.centos7.x86_64/jre/lib/amd64/server/libjvm.so
#3  0x00007efef063f03a in VMError::report_and_die() () from /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.292.b10-1.1.centos7.x86_64/jre/lib/amd64/server/libjvm.so
#4  0x00007efef0422095 in JVM_handle_linux_signal () from /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.292.b10-1.1.centos7.x86_64/jre/lib/amd64/server/libjvm.so
#5  0x00007efef04150a8 in signalHandler(int, siginfo_t*, void*) () from /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.292.b10-1.1.centos7.x86_64/jre/lib/amd64/server/libjvm.so
#6  <signal handler called>
#7  0x00007efef01dc4bf in jni_invoke_nonstatic(JNIEnv_*, JavaValue*, _jobject*, JNICallType, _jmethodID*, JNI_ArgumentPusher*, Thread*) [clone .constprop.235] ()
   from /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.292.b10-1.1.centos7.x86_64/jre/lib/amd64/server/libjvm.so
#8  0x00007efef01e66aa in jni_CallBooleanMethodA () from /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.292.b10-1.1.centos7.x86_64/jre/lib/amd64/server/libjvm.so
#9  0x00007efde0380d48 in ?? ()
   from /home/user/.javacpp/cache/java-sdk-samples-1.0-SNAPSHOT-jar-with-dependencies.jar/com/mycompany/myproject2/linux-x86_64/libjnimyproject.so
#10 0x00007efde03c09bc in ?? ()
Stack: [0x00007efdd9ed8000,0x00007efdd9fd9000],  sp=0x00007efdd9fd6e00,  free space=1019k
Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native code)
V  [libjvm.so+0x7034bf]  jni_invoke_nonstatic(JNIEnv_*, JavaValue*, _jobject*, JNICallType, _jmethodID*, JNI_ArgumentPusher*, Thread*) [clone .constprop.235]+0x27f
V  [libjvm.so+0x70d6aa]  jni_CallBooleanMethodA+0xfa
C  [libjnimyproject.so+0xafd48]

The generated jni code contains static callback array, which seems not to be protected by lock or synchronizes with proper memory order. I do not know if that's ok. (Besides, I do not see usage of rptr->ptr in operator()(uint64_t arg0))

static JavaCPP_com_mycompany_myproject2_presets_myproject_00024FilterFunction JavaCPP_com_mycompany_myproject2_presets_myproject_00024FilterFunction_instances[10];

JNIEXPORT void JNICALL Java_com_mycompany_myproject2_presets_myproject_00024FilterFunction_allocate(JNIEnv* env, jobject obj) {
    obj = env->NewWeakGlobalRef(obj);
    if (obj == NULL) {
        JavaCPP_log("Error creating global reference of com.mycompany.myproject2.presets.myproject.FilterFunction instance for callback.");
        return;
    }
    JavaCPP_com_mycompany_myproject2_presets_myproject_00024FilterFunction* rptr = new (std::nothrow) JavaCPP_com_mycompany_myproject2_presets_myproject_00024FilterFunction;
    if (rptr != NULL) {
        rptr->obj = obj;
        JavaCPP_initPointer(env, obj, rptr, 1, rptr, &JavaCPP_com_mycompany_myproject2_presets_myproject_00024FilterFunction_deallocate);
        for (int i = 0; i < 10; i++) {
            if (JavaCPP_com_mycompany_myproject2_presets_myproject_00024FilterFunction_instances[i].obj == NULL) {
                rptr->ptr = JavaCPP_com_mycompany_myproject2_presets_myproject_00024FilterFunction_allocate_callbacks[i];
                JavaCPP_com_mycompany_myproject2_presets_myproject_00024FilterFunction_instances[i] = *rptr;
                break;
            }
        }
    }
}

struct JavaCPP_hidden JavaCPP_com_mycompany_myproject2_presets_myproject_00024FilterFunction {
    JavaCPP_com_mycompany_myproject2_presets_myproject_00024FilterFunction() : ptr(NULL), obj(NULL) { }
    unsigned char operator()(uint64_t arg0);
    unsigned char (*ptr)(uint64_t arg0);
    jobject obj; static jmethodID mid;
};


unsigned char JavaCPP_com_mycompany_myproject2_presets_myproject_00024FilterFunction::operator()(uint64_t arg0) {
    jboolean rarg = 0;
    JNIEnv* env;
    bool attached = JavaCPP_getEnv(&env);
    if (env == NULL) {
        goto end;
    }
{
    jvalue args[1];
    args[0].j = (jlong)arg0;
    if (obj == NULL) {
        obj = JavaCPP_createPointer(env, 120);
        obj = obj == NULL ? NULL : env->NewGlobalRef(obj);
        if (obj == NULL) {
            JavaCPP_log("Error creating global reference of com.mycompany.myproject2.presets.myproject.FilterFunction instance for callback.");
        } else {
            env->SetLongField(obj, JavaCPP_addressFID, ptr_to_jlong(this));
        }
        for (int i = 0; i < 10; i++) {
            if (this == &JavaCPP_com_mycompany_myproject2_presets_myproject_00024FilterFunction_instances[i]) {
                ptr = JavaCPP_com_mycompany_myproject2_presets_myproject_00024FilterFunction_allocate_callbacks[i];
                break;
            }
        }
    }
    if (mid == NULL) {
        mid = JavaCPP_getMethodID(env, 120, "call", "(J)Z");
    }
    if (obj == NULL) {
        JavaCPP_log("Function pointer object is NULL in callback for com.mycompany.myproject2.presets.myproject.FilterFunction.");
    } else if (mid == NULL) {
        JavaCPP_log("Error getting method ID of function caller \"public native boolean com.mycompany.myproject2.presets.myproject$FilterFunction.call(long)\" for callback.");
    } else {
        rarg = env->CallBooleanMethodA(obj, mid, args);
    }
}
end:
    JavaCPP_detach(attached);
    return rarg;
}
@saudet
Copy link
Member

saudet commented May 31, 2021

The instances of FilterFunction may be getting GCed. Make sure to keep a reference to them, for example, in a field somewhere.

@egolearner
Copy link
Contributor Author

egolearner commented Jun 1, 2021

@saudet
It seems to work after pseudo code is changed to this, creating FilterFunction instances before thread execution.

       ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(threadCount);
       List<Future> futures = new ArrayList<>();
       NativeClass2 processor = new ...;
       NativeClass1 ctxes = new NativeClass1 [threadCount];
       for (int i = 0; i < threadCount; i++) {
            ctxes[i] = ...;
       }
        for (int i = 0; i < threadCount; i++) {
            futures.add(executor.submit(()->{
                    NativeClass1 ctx = ctxes[i];
                   for (Query q : queries) {
                        processor.process(ctx, q);
                   }
           });
        }
        System.out.println(ctxes.length);

I do not quite understand why FilterFunction got GCed previously. ctx is still in scope and processor.process(ctx, q) is stilling using ctx. Would you please provide more information or relative link to this?

@saudet
Copy link
Member

saudet commented Jun 1, 2021

NativeClass1 is a C++ class without knowledge of JNI, it can't hold references to Java objects.

@egolearner
Copy link
Contributor Author

I see the problem.
Thanks a lot @saudet

@egolearner
Copy link
Contributor Author

I see huge lock contention with java callback when thread number is high.
perf top result
image
gperftools result
image
Is this the expected behaviour? @saudet

@egolearner
Copy link
Contributor Author

seems the lock contention is from here

static JavaCPP_noinline bool JavaCPP_getEnv(JNIEnv** env) {
    bool attached = false;
    JavaVM *vm = JavaCPP_vm;
    if (vm == NULL) {
            JavaCPP_log("Could not get any created JavaVM.");
            *env = NULL;
            return false;
    }
#if defined(__linux__) || defined(__APPLE__)
    pthread_mutex_lock(&JavaCPP_lock);
    pthread_once(&JavaCPP_once, JavaCPP_create_pthread_key);
    if ((*env = (JNIEnv *)pthread_getspecific(JavaCPP_current_env)) != NULL) {
        attached = true;
        goto done;
    }
#endif

JavaCPP_getEnv is called by FilterFunction::operator()(uint64_t arg0)

unsigned char JavaCPP_com_mycompany_myproject2_presets_myproject_00024FilterFunction::operator()(uint64_t arg0) {
    jboolean rarg = 0;
    JNIEnv* env;
    bool attached = JavaCPP_getEnv(&env);
    if (env == NULL) {
        goto end;
    }

@egolearner egolearner reopened this Jun 1, 2021
@saudet
Copy link
Member

saudet commented Jun 1, 2021

That code was added by @tmm1 to have threads automatically detach themselves when they exit.
There's probably a way to do this that involves less contention, but someone will have to look into this...
In the meantime, if your threads never exit, we could add a flag to let you skip over all that.

@egolearner
Copy link
Contributor Author

Can we change the implementation to use __thread or C++11 thread_local, which is simpler to use and does not involve lock contention.

In my scenario, thread exit with the program or a little earlier before the program.

@egolearner
Copy link
Contributor Author

I see why pthread specific is used from #243 .
However, it seems it's unnecessary to use pthread_mutex around pthread specific.
Would you clarify your purpose of using pthread_mutex @tmm1 ?
Also previous discussions regarding the pthread mutex #243 (comment)

static JavaCPP_noinline bool JavaCPP_getEnv(JNIEnv** env) {
    bool attached = false;
    JavaVM *vm = JavaCPP_vm;
    if (vm == NULL) {
            JavaCPP_log("Could not get any created JavaVM.");
            *env = NULL;
            return false;
    }
#if defined(__linux__) || defined(__APPLE__)
    pthread_mutex_lock(&JavaCPP_lock);
    pthread_once(&JavaCPP_once, JavaCPP_create_pthread_key);
    if ((*env = (JNIEnv *)pthread_getspecific(JavaCPP_current_env)) != NULL) {
        attached = true;
        goto done;
    }
#endif
    if (vm->GetEnv((void**)env, JNI_VERSION_1_6) != JNI_OK) {
        struct {
            JNIEnv **env;
            operator JNIEnv**() { return env; } // Android JNI
            operator void**() { return (void**)env; } // standard JNI
        } env2 = { env };
        JavaVMAttachArgs args;
        args.version = JNI_VERSION_1_6;
        args.group = NULL;
        char name[64] = {0};
#ifdef _WIN32
        sprintf(name, "JavaCPP Thread ID %lu", GetCurrentThreadId());
#elif defined(__APPLE__)
        sprintf(name, "JavaCPP Thread ID %u", pthread_mach_thread_np(pthread_self()));
#else
        sprintf(name, "JavaCPP Thread ID %lu", pthread_self());
#endif
        args.name = name;
        if (vm->AttachCurrentThread(env2, &args) != JNI_OK) {
            JavaCPP_log("Could not attach the JavaVM to the current thread.");
            *env = NULL;
            goto done;
        }
#if defined(__linux__) || defined(__APPLE__)
        pthread_setspecific(JavaCPP_current_env, *env);
#endif
        attached = true;
    }
    if (JavaCPP_vm == NULL) {
        if (JNI_OnLoad(vm, NULL) < 0) {
            JavaCPP_detach(attached);
            *env = NULL;
            goto done;
        }
    }
done:
#if defined(__linux__) || defined(__APPLE__)
    pthread_mutex_unlock(&JavaCPP_lock);
#endif
    return attached;
}

saudet added a commit that referenced this issue Jun 1, 2021
…ads on callbacks (issue #486)

 * Pick up `@Allocator`, `@CriticalRegion`, `@NoException` in `Generator` from `@Properties(inherit=classes)` as well (issue #484)
@saudet
Copy link
Member

saudet commented Jun 1, 2021

Can we change the implementation to use __thread or C++11 thread_local, which is simpler to use and does not involve lock contention.

We need pthreads anyway to detach threads on exit. I don't think C++ offers something like that, yet.

In my scenario, thread exit with the program or a little earlier before the program.

Ok, in that case then we can skip all that. I've made it possible to do that in commit e5495a2 by defining NO_JNI_DETACH_THREAD, for example, by annotating classes with @Platform(define="NO_JNI_DETACH_THREAD").

I see why pthread specific is used from #243 .
However, it seems it's unnecessary to use pthread_mutex around pthread specific.
Would you clarify your purpose of using pthread_mutex @tmm1 ?
Also previous discussions regarding the pthread mutex #243 (comment)

Could you remove the lock and see how it fares in your application? If it doesn't seem to cause problems, I'm fine with removing it. In any case, please open a pull request with what works for you. Thanks!

@egolearner
Copy link
Contributor Author

Removing the lock make our benchmark 5 times faster.

#489 is created.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants