diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp index e9d4f31e5ca137..c686ebf495a22c 100644 --- a/be/src/util/doris_metrics.cpp +++ b/be/src/util/doris_metrics.cpp @@ -332,8 +332,8 @@ void DorisMetrics::initialize(bool init_system_metrics, const std::set& map) { + //TODO: ADD EXCEPTION CHECK. jclass hashmap_class = env->FindClass("java/util/HashMap"); jmethodID hashmap_constructor = env->GetMethodID(hashmap_class, "", "(I)V"); jobject hashmap_object = env->NewObject(hashmap_class, hashmap_constructor, map.size()); @@ -399,16 +400,26 @@ std::map JniUtil::convert_to_cpp_map(JNIEnv* env, jobj Status JniUtil::GetGlobalClassRef(JNIEnv* env, const char* class_str, jclass* class_ref) { *class_ref = NULL; - jclass local_cl = env->FindClass(class_str); - RETURN_ERROR_IF_EXC(env); + JNI_CALL_METHOD_CHECK_EXCEPTION_DELETE_REF(jclass, local_cl, env, FindClass(class_str)); RETURN_IF_ERROR(LocalToGlobalRef(env, local_cl, reinterpret_cast(class_ref))); - env->DeleteLocalRef(local_cl); - RETURN_ERROR_IF_EXC(env); return Status::OK(); } Status JniUtil::LocalToGlobalRef(JNIEnv* env, jobject local_ref, jobject* global_ref) { *global_ref = env->NewGlobalRef(local_ref); + // NewGlobalRef: + // Returns a global reference to the given obj. + // + //May return NULL if: + // obj refers to null + // the system has run out of memory + // obj was a weak global reference and has already been garbage collected + if (*global_ref == NULL) { + return Status::InternalError( + "LocalToGlobalRef fail,global ref is NULL,maybe the system has run out of memory."); + } + + //NewGlobalRef not throw exception,maybe we just need check NULL. RETURN_ERROR_IF_EXC(env); return Status::OK(); } @@ -592,7 +603,7 @@ Status JniUtil::Init() { } RETURN_IF_ERROR(init_jni_scanner_loader(env)); jvm_inited_ = true; - DorisMetrics::instance()->init_jvm_metrics(env); + DorisMetrics::instance()->init_jvm_metrics(); return Status::OK(); } diff --git a/be/src/util/jni-util.h b/be/src/util/jni-util.h index 666a5e526dfbda..116c75fcb3378d 100644 --- a/be/src/util/jni-util.h +++ b/be/src/util/jni-util.h @@ -28,6 +28,7 @@ #include "common/status.h" #include "jni_md.h" +#include "util/defer_op.h" #include "util/thrift_util.h" #ifdef USE_HADOOP_HDFS @@ -38,12 +39,21 @@ extern "C" JNIEnv* getJNIEnv(void); namespace doris { class JniUtil; -#define RETURN_ERROR_IF_EXC(env) \ - do { \ - jthrowable exc = (env)->ExceptionOccurred(); \ - if (exc != nullptr) return JniUtil::GetJniExceptionMsg(env); \ +#define RETURN_ERROR_IF_EXC(env) \ + do { \ + if (env->ExceptionCheck()) [[unlikely]] \ + return JniUtil::GetJniExceptionMsg(env); \ } while (false) +#define JNI_CALL_METHOD_CHECK_EXCEPTION_DELETE_REF(type, result, env, func) \ + type result = env->func; \ + DEFER(env->DeleteLocalRef(result)); \ + RETURN_ERROR_IF_EXC(env) + +#define JNI_CALL_METHOD_CHECK_EXCEPTION(type, result, env, func) \ + type result = env->func; \ + RETURN_ERROR_IF_EXC(env) + class JniUtil { public: static Status Init() WARN_UNUSED_RESULT; @@ -65,6 +75,10 @@ class JniUtil { return Status::OK(); } + //jclass is generally a local reference. + //Method ID and field ID values are forever. + //If you want to use the jclass across multiple threads or multiple calls into the JNI code you need + // to create a global reference to it with GetGlobalClassRef(). static Status GetGlobalClassRef(JNIEnv* env, const char* class_str, jclass* class_ref) WARN_UNUSED_RESULT; diff --git a/be/src/util/jvm_metrics.cpp b/be/src/util/jvm_metrics.cpp index fc30d1073acdc6..6855aee1ebf5e6 100644 --- a/be/src/util/jvm_metrics.cpp +++ b/be/src/util/jvm_metrics.cpp @@ -22,7 +22,9 @@ #include #include "common/config.h" +#include "util/defer_op.h" #include "util/metrics.h" + namespace doris { #define DEFINE_JVM_SIZE_BYTES_METRIC(name, type) \ @@ -78,7 +80,7 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(jvm_gc_g1_old_generation_time_ms, MetricUni const char* JvmMetrics::_s_hook_name = "jvm_metrics"; -JvmMetrics::JvmMetrics(MetricRegistry* registry, JNIEnv* env) { +JvmMetrics::JvmMetrics(MetricRegistry* registry) { DCHECK(registry != nullptr); _registry = registry; @@ -90,9 +92,12 @@ JvmMetrics::JvmMetrics(MetricRegistry* registry, JNIEnv* env) { break; } try { - _jvm_stats.init(env); + Status st = _jvm_stats.init(); + if (!st) { + LOG(WARNING) << "jvm Stats Init Fail. " << st.to_string(); + } } catch (...) { - LOG(WARNING) << "JVM STATS INIT FAIL"; + LOG(WARNING) << "jvm Stats Throw Exception Init Fail."; break; } if (!_jvm_stats.init_complete()) { @@ -133,21 +138,22 @@ JvmMetrics::JvmMetrics(MetricRegistry* registry, JNIEnv* env) { void JvmMetrics::update() { static long fail_count = 0; - bool have_exception = false; try { - _jvm_stats.refresh(this); + Status st = _jvm_stats.refresh(this); + if (!st) { + fail_count++; + LOG(WARNING) << "Jvm Stats update Fail! " << st.to_string(); + } else { + fail_count = 0; + } } catch (...) { - have_exception = true; - LOG(WARNING) << "JVM MONITOR UPDATE FAIL!"; + LOG(WARNING) << "Jvm Stats update throw Exception!"; fail_count++; } //When 30 consecutive exceptions occur, turn off jvm information collection. - if (!have_exception) { - fail_count = 0; - } if (fail_count >= 30) { - LOG(WARNING) << "JVM MONITOR CLOSE!"; + LOG(WARNING) << "Jvm Stats CLOSE!"; _jvm_stats.set_complete(false); _server_entity->deregister_hook(_s_hook_name); @@ -182,193 +188,260 @@ void JvmMetrics::update() { } } -void JvmStats::init(JNIEnv* ENV) { - env = ENV; - _managementFactoryClass = env->FindClass("java/lang/management/ManagementFactory"); - if (_managementFactoryClass == nullptr) { - LOG(WARNING) - << "Class java/lang/management/ManagementFactory Not Find.JVM monitoring fails."; - return; - } +Status JvmStats::init() { + JNIEnv* env = nullptr; + RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); - _getMemoryMXBeanMethod = env->GetStaticMethodID(_managementFactoryClass, "getMemoryMXBean", - "()Ljava/lang/management/MemoryMXBean;"); + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/management/ManagementFactory", + &_managementFactoryClass)); - _memoryUsageClass = env->FindClass("java/lang/management/MemoryUsage"); - if (_memoryUsageClass == nullptr) { - LOG(WARNING) << "Class java/lang/management/MemoryUsage Not Find.JVM monitoring fails."; - return; - } - _getMemoryUsageUsedMethod = env->GetMethodID(_memoryUsageClass, "getUsed", "()J"); - _getMemoryUsageCommittedMethod = env->GetMethodID(_memoryUsageClass, "getCommitted", "()J"); - _getMemoryUsageMaxMethod = env->GetMethodID(_memoryUsageClass, "getMax", "()J"); + JNI_CALL_METHOD_CHECK_EXCEPTION(, _getMemoryMXBeanMethod, env, + GetStaticMethodID(_managementFactoryClass, "getMemoryMXBean", + "()Ljava/lang/management/MemoryMXBean;")); - _memoryMXBeanClass = env->FindClass("java/lang/management/MemoryMXBean"); - if (_memoryMXBeanClass == nullptr) { - LOG(WARNING) << "Class java/lang/management/MemoryMXBean Not Find.JVM monitoring fails."; - return; - } - _getHeapMemoryUsageMethod = env->GetMethodID(_memoryMXBeanClass, "getHeapMemoryUsage", - "()Ljava/lang/management/MemoryUsage;"); - _getNonHeapMemoryUsageMethod = env->GetMethodID(_memoryMXBeanClass, "getNonHeapMemoryUsage", - "()Ljava/lang/management/MemoryUsage;"); + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/management/MemoryUsage", + &_memoryUsageClass)); - _getMemoryPoolMXBeansMethod = env->GetStaticMethodID( - _managementFactoryClass, "getMemoryPoolMXBeans", "()Ljava/util/List;"); + JNI_CALL_METHOD_CHECK_EXCEPTION(, _getMemoryUsageUsedMethod, env, + GetMethodID(_memoryUsageClass, "getUsed", "()J")); - _listClass = env->FindClass("java/util/List"); - if (_listClass == nullptr) { - LOG(WARNING) << "Class java/util/List Not Find.JVM monitoring fails."; - return; - } - _getListSizeMethod = env->GetMethodID(_listClass, "size", "()I"); - _getListUseIndexMethod = env->GetMethodID(_listClass, "get", "(I)Ljava/lang/Object;"); + JNI_CALL_METHOD_CHECK_EXCEPTION(, _getMemoryUsageCommittedMethod, env, + GetMethodID(_memoryUsageClass, "getCommitted", "()J")); - _memoryPoolMXBeanClass = env->FindClass("java/lang/management/MemoryPoolMXBean"); - if (_memoryPoolMXBeanClass == nullptr) { - LOG(WARNING) - << "Class java/lang/management/MemoryPoolMXBean Not Find.JVM monitoring fails."; - return; - } - _getMemoryPoolMXBeanUsageMethod = env->GetMethodID(_memoryPoolMXBeanClass, "getUsage", - "()Ljava/lang/management/MemoryUsage;"); - _getMemoryPollMXBeanPeakMethod = env->GetMethodID(_memoryPoolMXBeanClass, "getPeakUsage", - "()Ljava/lang/management/MemoryUsage;"); - _getMemoryPollMXBeanNameMethod = - env->GetMethodID(_memoryPoolMXBeanClass, "getName", "()Ljava/lang/String;"); - - _getThreadMXBeanMethod = env->GetStaticMethodID(_managementFactoryClass, "getThreadMXBean", - "()Ljava/lang/management/ThreadMXBean;"); - - _getGarbageCollectorMXBeansMethod = env->GetStaticMethodID( - _managementFactoryClass, "getGarbageCollectorMXBeans", "()Ljava/util/List;"); - - _garbageCollectorMXBeanClass = env->FindClass("java/lang/management/GarbageCollectorMXBean"); - if (_garbageCollectorMXBeanClass == nullptr) { - LOG(WARNING) << "Class java/lang/management/GarbageCollectorMXBean Not Find.JVM monitoring " - "fails."; - return; - } - _getGCNameMethod = - env->GetMethodID(_garbageCollectorMXBeanClass, "getName", "()Ljava/lang/String;"); - _getGCCollectionCountMethod = - env->GetMethodID(_garbageCollectorMXBeanClass, "getCollectionCount", "()J"); - _getGCCollectionTimeMethod = - env->GetMethodID(_garbageCollectorMXBeanClass, "getCollectionTime", "()J"); - - _threadMXBeanClass = env->FindClass("java/lang/management/ThreadMXBean"); - if (_threadMXBeanClass == nullptr) { - LOG(WARNING) << "Class java/lang/management/ThreadMXBean Not Find.JVM monitoring fails."; - return; - } - _getAllThreadIdsMethod = env->GetMethodID(_threadMXBeanClass, "getAllThreadIds", "()[J"); - _getThreadInfoMethod = env->GetMethodID(_threadMXBeanClass, "getThreadInfo", - "([JI)[Ljava/lang/management/ThreadInfo;"); - _getPeakThreadCountMethod = env->GetMethodID(_threadMXBeanClass, "getPeakThreadCount", "()I"); - - _threadInfoClass = env->FindClass("java/lang/management/ThreadInfo"); - if (_threadInfoClass == nullptr) { - LOG(WARNING) << "Class java/lang/management/ThreadInfo Not Find.JVM monitoring fails."; - return; - } + JNI_CALL_METHOD_CHECK_EXCEPTION(, _getMemoryUsageMaxMethod, env, + GetMethodID(_memoryUsageClass, "getMax", "()J")); - _getThreadStateMethod = - env->GetMethodID(_threadInfoClass, "getThreadState", "()Ljava/lang/Thread$State;"); + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/management/MemoryMXBean", + &_memoryMXBeanClass)); - _threadStateClass = env->FindClass("java/lang/Thread$State"); - if (_threadStateClass == nullptr) { - LOG(WARNING) << "Class java/lang/Thread$State Not Find.JVM monitoring fails."; - return; - } + JNI_CALL_METHOD_CHECK_EXCEPTION(, _getHeapMemoryUsageMethod, env, + GetMethodID(_memoryMXBeanClass, "getHeapMemoryUsage", + "()Ljava/lang/management/MemoryUsage;")); + JNI_CALL_METHOD_CHECK_EXCEPTION(, _getNonHeapMemoryUsageMethod, env, + GetMethodID(_memoryMXBeanClass, "getNonHeapMemoryUsage", + "()Ljava/lang/management/MemoryUsage;")); - jfieldID newThreadFieldID = - env->GetStaticFieldID(_threadStateClass, "NEW", "Ljava/lang/Thread$State;"); - jfieldID runnableThreadFieldID = - env->GetStaticFieldID(_threadStateClass, "RUNNABLE", "Ljava/lang/Thread$State;"); - jfieldID blockedThreadFieldID = - env->GetStaticFieldID(_threadStateClass, "BLOCKED", "Ljava/lang/Thread$State;"); - jfieldID waitingThreadFieldID = - env->GetStaticFieldID(_threadStateClass, "WAITING", "Ljava/lang/Thread$State;"); - jfieldID timedWaitingThreadFieldID = - env->GetStaticFieldID(_threadStateClass, "TIMED_WAITING", "Ljava/lang/Thread$State;"); - jfieldID terminatedThreadFieldID = - env->GetStaticFieldID(_threadStateClass, "TERMINATED", "Ljava/lang/Thread$State;"); - - _newThreadStateObj = env->GetStaticObjectField(_threadStateClass, newThreadFieldID); - _runnableThreadStateObj = env->GetStaticObjectField(_threadStateClass, runnableThreadFieldID); - _blockedThreadStateObj = env->GetStaticObjectField(_threadStateClass, blockedThreadFieldID); - _waitingThreadStateObj = env->GetStaticObjectField(_threadStateClass, waitingThreadFieldID); - _timedWaitingThreadStateObj = - env->GetStaticObjectField(_threadStateClass, timedWaitingThreadFieldID); - _terminatedThreadStateObj = - env->GetStaticObjectField(_threadStateClass, terminatedThreadFieldID); + JNI_CALL_METHOD_CHECK_EXCEPTION( + , _getMemoryPoolMXBeansMethod, env, + GetStaticMethodID(_managementFactoryClass, "getMemoryPoolMXBeans", + "()Ljava/util/List;")); - LOG(INFO) << "Start JVM monitoring."; + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/util/List", &_listClass)); + + JNI_CALL_METHOD_CHECK_EXCEPTION(, _getListSizeMethod, env, + GetMethodID(_listClass, "size", "()I")); + + JNI_CALL_METHOD_CHECK_EXCEPTION(, _getListUseIndexMethod, env, + GetMethodID(_listClass, "get", "(I)Ljava/lang/Object;")); + + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/management/MemoryPoolMXBean", + &_memoryPoolMXBeanClass)); + + JNI_CALL_METHOD_CHECK_EXCEPTION(, _getMemoryPoolMXBeanUsageMethod, env, + GetMethodID(_memoryPoolMXBeanClass, "getUsage", + "()Ljava/lang/management/MemoryUsage;")); + + JNI_CALL_METHOD_CHECK_EXCEPTION(, _getMemoryPollMXBeanPeakMethod, env, + GetMethodID(_memoryPoolMXBeanClass, "getPeakUsage", + "()Ljava/lang/management/MemoryUsage;")); + JNI_CALL_METHOD_CHECK_EXCEPTION( + , _getMemoryPollMXBeanNameMethod, env, + GetMethodID(_memoryPoolMXBeanClass, "getName", "()Ljava/lang/String;")); + + JNI_CALL_METHOD_CHECK_EXCEPTION(, _getThreadMXBeanMethod, env, + GetStaticMethodID(_managementFactoryClass, "getThreadMXBean", + "()Ljava/lang/management/ThreadMXBean;")); + + JNI_CALL_METHOD_CHECK_EXCEPTION( + , _getGarbageCollectorMXBeansMethod, env, + GetStaticMethodID(_managementFactoryClass, "getGarbageCollectorMXBeans", + "()Ljava/util/List;")); + + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/management/GarbageCollectorMXBean", + &_garbageCollectorMXBeanClass)); + + JNI_CALL_METHOD_CHECK_EXCEPTION( + , _getGCNameMethod, env, + GetMethodID(_garbageCollectorMXBeanClass, "getName", "()Ljava/lang/String;")); + + JNI_CALL_METHOD_CHECK_EXCEPTION( + , _getGCCollectionCountMethod, env, + GetMethodID(_garbageCollectorMXBeanClass, "getCollectionCount", "()J")); + + JNI_CALL_METHOD_CHECK_EXCEPTION( + , _getGCCollectionTimeMethod, env, + GetMethodID(_garbageCollectorMXBeanClass, "getCollectionTime", "()J")); + + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/management/ThreadMXBean", + &_threadMXBeanClass)); + + JNI_CALL_METHOD_CHECK_EXCEPTION(, + + _getAllThreadIdsMethod, env, + GetMethodID(_threadMXBeanClass, "getAllThreadIds", "()[J")); + + JNI_CALL_METHOD_CHECK_EXCEPTION(, + + _getThreadInfoMethod, env, + GetMethodID(_threadMXBeanClass, "getThreadInfo", + "([JI)[Ljava/lang/management/ThreadInfo;")); + + JNI_CALL_METHOD_CHECK_EXCEPTION(, + + _getPeakThreadCountMethod, env, + GetMethodID(_threadMXBeanClass, "getPeakThreadCount", "()I")); + + RETURN_IF_ERROR( + JniUtil::GetGlobalClassRef(env, "java/lang/management/ThreadInfo", &_threadInfoClass)); + + JNI_CALL_METHOD_CHECK_EXCEPTION( + , + + _getThreadStateMethod, env, + GetMethodID(_threadInfoClass, "getThreadState", "()Ljava/lang/Thread$State;")); + + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Thread$State", &_threadStateClass)); + + JNI_CALL_METHOD_CHECK_EXCEPTION( + jfieldID, newThreadFieldID, env, + GetStaticFieldID(_threadStateClass, "NEW", "Ljava/lang/Thread$State;")); + + JNI_CALL_METHOD_CHECK_EXCEPTION( + jfieldID, runnableThreadFieldID, env, + GetStaticFieldID(_threadStateClass, "RUNNABLE", "Ljava/lang/Thread$State;")); + + JNI_CALL_METHOD_CHECK_EXCEPTION( + jfieldID, blockedThreadFieldID, env, + GetStaticFieldID(_threadStateClass, "BLOCKED", "Ljava/lang/Thread$State;")); + JNI_CALL_METHOD_CHECK_EXCEPTION( + jfieldID, waitingThreadFieldID, env, + GetStaticFieldID(_threadStateClass, "WAITING", "Ljava/lang/Thread$State;")); + + JNI_CALL_METHOD_CHECK_EXCEPTION( + jfieldID, timedWaitingThreadFieldID, env, + GetStaticFieldID(_threadStateClass, "TIMED_WAITING", "Ljava/lang/Thread$State;")); + JNI_CALL_METHOD_CHECK_EXCEPTION( + jfieldID, terminatedThreadFieldID, env, + GetStaticFieldID(_threadStateClass, "TERMINATED", "Ljava/lang/Thread$State;")); + + JNI_CALL_METHOD_CHECK_EXCEPTION(jobject, newThreadStateObj, env, + GetStaticObjectField(_threadStateClass, newThreadFieldID)); + RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, newThreadStateObj, &_newThreadStateObj)); + + JNI_CALL_METHOD_CHECK_EXCEPTION(jobject, runnableThreadStateObj, env, + GetStaticObjectField(_threadStateClass, runnableThreadFieldID)); + RETURN_IF_ERROR( + JniUtil::LocalToGlobalRef(env, runnableThreadStateObj, &_runnableThreadStateObj)); + + JNI_CALL_METHOD_CHECK_EXCEPTION(jobject, blockedThreadStateObj, env, + GetStaticObjectField(_threadStateClass, blockedThreadFieldID)); + RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, blockedThreadStateObj, &_blockedThreadStateObj)); + + JNI_CALL_METHOD_CHECK_EXCEPTION(jobject, waitingThreadStateObj, env, + GetStaticObjectField(_threadStateClass, waitingThreadFieldID)); + RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, waitingThreadStateObj, &_waitingThreadStateObj)); + + JNI_CALL_METHOD_CHECK_EXCEPTION( + jobject, timedWaitingThreadStateObj, env, + GetStaticObjectField(_threadStateClass, timedWaitingThreadFieldID)); + RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, timedWaitingThreadStateObj, + &_timedWaitingThreadStateObj)); + + JNI_CALL_METHOD_CHECK_EXCEPTION( + jobject, terminatedThreadStateObj, env, + GetStaticObjectField(_threadStateClass, terminatedThreadFieldID)); + RETURN_IF_ERROR( + JniUtil::LocalToGlobalRef(env, terminatedThreadStateObj, &_terminatedThreadStateObj)); _init_complete = true; - return; + + LOG(INFO) << "Start JVM monitoring."; + return Status::OK(); } -void JvmStats::refresh(JvmMetrics* jvm_metrics) { +Status JvmStats::refresh(JvmMetrics* jvm_metrics) { if (!_init_complete) { - return; + return Status::InternalError("Jvm Stats not init complete."); } - Status st = JniUtil::GetJNIEnv(&env); - if (!st.ok()) { - LOG(WARNING) << "JVM STATS GET JNI ENV FAIL"; - return; - } + JNIEnv* env = nullptr; + RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); + + JNI_CALL_METHOD_CHECK_EXCEPTION_DELETE_REF( + jobject, memoryMXBeanObj, env, + CallStaticObjectMethod(_managementFactoryClass, _getMemoryMXBeanMethod)); + + JNI_CALL_METHOD_CHECK_EXCEPTION_DELETE_REF( + jobject, heapMemoryUsageObj, env, + CallObjectMethod(memoryMXBeanObj, _getHeapMemoryUsageMethod)); - jobject memoryMXBeanObj = - env->CallStaticObjectMethod(_managementFactoryClass, _getMemoryMXBeanMethod); + JNI_CALL_METHOD_CHECK_EXCEPTION(jlong, heapMemoryUsed, env, + CallLongMethod(heapMemoryUsageObj, _getMemoryUsageUsedMethod)); - jobject heapMemoryUsageObj = env->CallObjectMethod(memoryMXBeanObj, _getHeapMemoryUsageMethod); + JNI_CALL_METHOD_CHECK_EXCEPTION( + jlong, heapMemoryCommitted, env, + CallLongMethod(heapMemoryUsageObj, _getMemoryUsageCommittedMethod)); - jlong heapMemoryUsed = env->CallLongMethod(heapMemoryUsageObj, _getMemoryUsageUsedMethod); - jlong heapMemoryCommitted = - env->CallLongMethod(heapMemoryUsageObj, _getMemoryUsageCommittedMethod); - jlong heapMemoryMax = env->CallLongMethod(heapMemoryUsageObj, _getMemoryUsageMaxMethod); + JNI_CALL_METHOD_CHECK_EXCEPTION(jlong, heapMemoryMax, env, + CallLongMethod(heapMemoryUsageObj, _getMemoryUsageMaxMethod)); jvm_metrics->jvm_heap_size_bytes_used->set_value(heapMemoryUsed < 0 ? 0 : heapMemoryUsed); jvm_metrics->jvm_heap_size_bytes_committed->set_value( heapMemoryCommitted < 0 ? 0 : heapMemoryCommitted); jvm_metrics->jvm_heap_size_bytes_max->set_value(heapMemoryMax < 0 ? 0 : heapMemoryMax); - jobject nonHeapMemoryUsageObj = - env->CallObjectMethod(memoryMXBeanObj, _getNonHeapMemoryUsageMethod); + JNI_CALL_METHOD_CHECK_EXCEPTION_DELETE_REF( + jobject, nonHeapMemoryUsageObj, env, + CallObjectMethod(memoryMXBeanObj, _getNonHeapMemoryUsageMethod)); - jlong nonHeapMemoryCommitted = - env->CallLongMethod(nonHeapMemoryUsageObj, _getMemoryUsageCommittedMethod); - jlong nonHeapMemoryUsed = env->CallLongMethod(nonHeapMemoryUsageObj, _getMemoryUsageUsedMethod); + JNI_CALL_METHOD_CHECK_EXCEPTION( + jlong, nonHeapMemoryCommitted, env, + CallLongMethod(nonHeapMemoryUsageObj, _getMemoryUsageCommittedMethod)); + + JNI_CALL_METHOD_CHECK_EXCEPTION( + jlong, nonHeapMemoryUsed, env, + CallLongMethod(nonHeapMemoryUsageObj, _getMemoryUsageUsedMethod)); jvm_metrics->jvm_non_heap_size_bytes_committed->set_value( nonHeapMemoryCommitted < 0 ? 0 : nonHeapMemoryCommitted); jvm_metrics->jvm_non_heap_size_bytes_used->set_value(nonHeapMemoryUsed < 0 ? 0 : nonHeapMemoryUsed); - jobject memoryPoolMXBeansList = - env->CallStaticObjectMethod(_managementFactoryClass, _getMemoryPoolMXBeansMethod); + JNI_CALL_METHOD_CHECK_EXCEPTION_DELETE_REF( + jobject, memoryPoolMXBeansList, env, + CallStaticObjectMethod(_managementFactoryClass, _getMemoryPoolMXBeansMethod)); - jint size = env->CallIntMethod(memoryPoolMXBeansList, _getListSizeMethod); + JNI_CALL_METHOD_CHECK_EXCEPTION(jint, size, env, + CallIntMethod(memoryPoolMXBeansList, _getListSizeMethod)); for (int i = 0; i < size; ++i) { - jobject memoryPoolMXBean = - env->CallObjectMethod(memoryPoolMXBeansList, _getListUseIndexMethod, i); - jobject usageObject = - env->CallObjectMethod(memoryPoolMXBean, _getMemoryPoolMXBeanUsageMethod); + JNI_CALL_METHOD_CHECK_EXCEPTION_DELETE_REF( + jobject, memoryPoolMXBean, env, + CallObjectMethod(memoryPoolMXBeansList, _getListUseIndexMethod, i)); + + JNI_CALL_METHOD_CHECK_EXCEPTION_DELETE_REF( + jobject, usageObject, env, + CallObjectMethod(memoryPoolMXBean, _getMemoryPoolMXBeanUsageMethod)); + + JNI_CALL_METHOD_CHECK_EXCEPTION(jlong, used, env, + CallLongMethod(usageObject, _getMemoryUsageUsedMethod)); + + JNI_CALL_METHOD_CHECK_EXCEPTION(jlong, max, env, + CallLongMethod(usageObject, _getMemoryUsageMaxMethod)); - jlong used = env->CallLongMethod(usageObject, _getMemoryUsageUsedMethod); - jlong max = env->CallLongMethod(usageObject, _getMemoryUsageMaxMethod); + JNI_CALL_METHOD_CHECK_EXCEPTION_DELETE_REF( + jobject, peakUsageObject, env, + CallObjectMethod(memoryPoolMXBean, _getMemoryPollMXBeanPeakMethod)); - jobject peakUsageObject = - env->CallObjectMethod(memoryPoolMXBean, _getMemoryPollMXBeanPeakMethod); + JNI_CALL_METHOD_CHECK_EXCEPTION(jlong, peakUsed, env, + CallLongMethod(peakUsageObject, _getMemoryUsageUsedMethod)); - jlong peakUsed = env->CallLongMethod(peakUsageObject, _getMemoryUsageUsedMethod); + JNI_CALL_METHOD_CHECK_EXCEPTION_DELETE_REF( + jobject, name, env, + CallObjectMethod(memoryPoolMXBean, _getMemoryPollMXBeanNameMethod)); - jstring name = - (jstring)env->CallObjectMethod(memoryPoolMXBean, _getMemoryPollMXBeanNameMethod); - const char* nameStr = env->GetStringUTFChars(name, nullptr); + const char* nameStr = env->GetStringUTFChars( + (jstring)name, nullptr); // GetStringUTFChars not throw exception if (nameStr != nullptr) { auto it = _memoryPoolName.find(nameStr); if (it == _memoryPoolName.end()) { @@ -385,36 +458,46 @@ void JvmStats::refresh(JvmMetrics* jvm_metrics) { jvm_metrics->jvm_old_size_bytes_max->set_value(max < 0 ? 0 : max); } - env->ReleaseStringUTFChars(name, nameStr); + env->ReleaseStringUTFChars((jstring)name, + nameStr); // ReleaseStringUTFChars not throw exception } - env->DeleteLocalRef(memoryPoolMXBean); - env->DeleteLocalRef(usageObject); - env->DeleteLocalRef(peakUsageObject); } + JNI_CALL_METHOD_CHECK_EXCEPTION_DELETE_REF( + jobject, threadMXBean, env, + CallStaticObjectMethod(_managementFactoryClass, _getThreadMXBeanMethod)); - jobject threadMXBean = - env->CallStaticObjectMethod(_managementFactoryClass, _getThreadMXBeanMethod); + JNI_CALL_METHOD_CHECK_EXCEPTION_DELETE_REF( + jobject, threadIdsObject, env, CallObjectMethod(threadMXBean, _getAllThreadIdsMethod)); - jlongArray threadIds = (jlongArray)env->CallObjectMethod(threadMXBean, _getAllThreadIdsMethod); - jint threadCount = env->GetArrayLength(threadIds); + auto threadIds = (jlongArray)threadIdsObject; - jobjectArray threadInfos = - (jobjectArray)env->CallObjectMethod(threadMXBean, _getThreadInfoMethod, threadIds, 0); + JNI_CALL_METHOD_CHECK_EXCEPTION(jint, threadCount, env, GetArrayLength(threadIds)); + + JNI_CALL_METHOD_CHECK_EXCEPTION_DELETE_REF( + jobject, threadInfos, env, + CallObjectMethod(threadMXBean, _getThreadInfoMethod, (jlongArray)threadIds, 0)); int threadsNew = 0, threadsRunnable = 0, threadsBlocked = 0, threadsWaiting = 0, threadsTimedWaiting = 0, threadsTerminated = 0; - jint peakThreadCount = env->CallIntMethod(threadMXBean, _getPeakThreadCountMethod); + + JNI_CALL_METHOD_CHECK_EXCEPTION(jint, peakThreadCount, env, + CallIntMethod(threadMXBean, _getPeakThreadCountMethod)); jvm_metrics->jvm_thread_peak_count->set_value(peakThreadCount < 0 ? 0 : peakThreadCount); jvm_metrics->jvm_thread_count->set_value(threadCount < 0 ? 0 : threadCount); for (int i = 0; i < threadCount; i++) { - jobject threadInfo = env->GetObjectArrayElement(threadInfos, i); + JNI_CALL_METHOD_CHECK_EXCEPTION(jobject, threadInfo, env, + GetObjectArrayElement((jobjectArray)threadInfos, i)); + if (threadInfo == nullptr) { continue; } - jobject threadState = env->CallObjectMethod(threadInfo, _getThreadStateMethod); + JNI_CALL_METHOD_CHECK_EXCEPTION_DELETE_REF( + jobject, threadState, env, CallObjectMethod(threadInfo, _getThreadStateMethod)); + + //IsSameObject not throw exception if (env->IsSameObject(threadState, _newThreadStateObj)) { threadsNew++; } else if (env->IsSameObject(threadState, _runnableThreadStateObj)) { @@ -428,8 +511,6 @@ void JvmStats::refresh(JvmMetrics* jvm_metrics) { } else if (env->IsSameObject(threadState, _terminatedThreadStateObj)) { threadsTerminated++; } - env->DeleteLocalRef(threadInfo); - env->DeleteLocalRef(threadState); } jvm_metrics->jvm_thread_new_count->set_value(threadsNew < 0 ? 0 : threadsNew); @@ -441,18 +522,27 @@ void JvmStats::refresh(JvmMetrics* jvm_metrics) { jvm_metrics->jvm_thread_terminated_count->set_value(threadsTerminated < 0 ? 0 : threadsTerminated); - jobject gcMXBeansList = - env->CallStaticObjectMethod(_managementFactoryClass, _getGarbageCollectorMXBeansMethod); + JNI_CALL_METHOD_CHECK_EXCEPTION_DELETE_REF( + jobject, gcMXBeansList, env, + CallStaticObjectMethod(_managementFactoryClass, _getGarbageCollectorMXBeansMethod)); - jint numCollectors = env->CallIntMethod(gcMXBeansList, _getListSizeMethod); + JNI_CALL_METHOD_CHECK_EXCEPTION(jint, numCollectors, env, + CallIntMethod(gcMXBeansList, _getListSizeMethod)); for (int i = 0; i < numCollectors; i++) { - jobject gcMXBean = env->CallObjectMethod(gcMXBeansList, _getListUseIndexMethod, i); + JNI_CALL_METHOD_CHECK_EXCEPTION_DELETE_REF( + jobject, gcMXBean, env, CallObjectMethod(gcMXBeansList, _getListUseIndexMethod, i)); + + JNI_CALL_METHOD_CHECK_EXCEPTION_DELETE_REF(jobject, gcName, env, + CallObjectMethod(gcMXBean, _getGCNameMethod)); - jstring gcName = (jstring)env->CallObjectMethod(gcMXBean, _getGCNameMethod); - jlong gcCollectionCount = env->CallLongMethod(gcMXBean, _getGCCollectionCountMethod); - jlong gcCollectionTime = env->CallLongMethod(gcMXBean, _getGCCollectionTimeMethod); - const char* gcNameStr = env->GetStringUTFChars(gcName, NULL); + JNI_CALL_METHOD_CHECK_EXCEPTION(jlong, gcCollectionCount, env, + CallLongMethod(gcMXBean, _getGCCollectionCountMethod)); + + JNI_CALL_METHOD_CHECK_EXCEPTION(jlong, gcCollectionTime, env, + CallLongMethod(gcMXBean, _getGCCollectionTimeMethod)); + + const char* gcNameStr = env->GetStringUTFChars((jstring)gcName, NULL); if (gcNameStr != nullptr) { if (strcmp(gcNameStr, "G1 Young Generation") == 0) { jvm_metrics->jvm_gc_g1_young_generation_count->set_value(gcCollectionCount); @@ -463,31 +553,40 @@ void JvmStats::refresh(JvmMetrics* jvm_metrics) { jvm_metrics->jvm_gc_g1_old_generation_time_ms->set_value(gcCollectionTime); } - env->ReleaseStringUTFChars(gcName, gcNameStr); + env->ReleaseStringUTFChars((jstring)gcName, gcNameStr); } - env->DeleteLocalRef(gcMXBean); } - env->DeleteLocalRef(memoryMXBeanObj); - env->DeleteLocalRef(heapMemoryUsageObj); - env->DeleteLocalRef(nonHeapMemoryUsageObj); - env->DeleteLocalRef(memoryPoolMXBeansList); - env->DeleteLocalRef(threadMXBean); - env->DeleteLocalRef(gcMXBeansList); + + return Status::OK(); } JvmStats::~JvmStats() { if (!_init_complete) { return; } try { - env->DeleteLocalRef(_newThreadStateObj); - env->DeleteLocalRef(_runnableThreadStateObj); - env->DeleteLocalRef(_blockedThreadStateObj); - env->DeleteLocalRef(_waitingThreadStateObj); - env->DeleteLocalRef(_timedWaitingThreadStateObj); - env->DeleteLocalRef(_terminatedThreadStateObj); + JNIEnv* env = nullptr; + Status st = JniUtil::GetJNIEnv(&env); + if (!st.ok()) { + return; + } + env->DeleteGlobalRef(_managementFactoryClass); + env->DeleteGlobalRef(_memoryUsageClass); + env->DeleteGlobalRef(_memoryMXBeanClass); + env->DeleteGlobalRef(_listClass); + env->DeleteGlobalRef(_memoryPoolMXBeanClass); + env->DeleteGlobalRef(_threadMXBeanClass); + env->DeleteGlobalRef(_threadInfoClass); + env->DeleteGlobalRef(_threadStateClass); + env->DeleteGlobalRef(_garbageCollectorMXBeanClass); + + env->DeleteGlobalRef(_newThreadStateObj); + env->DeleteGlobalRef(_runnableThreadStateObj); + env->DeleteGlobalRef(_blockedThreadStateObj); + env->DeleteGlobalRef(_waitingThreadStateObj); + env->DeleteGlobalRef(_timedWaitingThreadStateObj); + env->DeleteGlobalRef(_terminatedThreadStateObj); } catch (...) { - // When be is killed, DeleteLocalRef may fail. // In order to exit more gracefully, we catch the exception here. } } diff --git a/be/src/util/jvm_metrics.h b/be/src/util/jvm_metrics.h index 459a3cbf938f79..e5d631c65f02e9 100644 --- a/be/src/util/jvm_metrics.h +++ b/be/src/util/jvm_metrics.h @@ -27,7 +27,6 @@ class JvmMetrics; class JvmStats { private: - JNIEnv* env = nullptr; jclass _managementFactoryClass = nullptr; jmethodID _getMemoryMXBeanMethod = nullptr; jclass _memoryUsageClass = nullptr; @@ -96,17 +95,16 @@ class JvmStats { bool _init_complete = false; public: - // JvmStats(JNIEnv* ENV); - void init(JNIEnv* ENV); + Status init(); bool init_complete() const { return _init_complete; } void set_complete(bool val) { _init_complete = val; } - void refresh(JvmMetrics* jvm_metrics); + Status refresh(JvmMetrics* jvm_metrics); ~JvmStats(); }; class JvmMetrics { public: - JvmMetrics(MetricRegistry* registry, JNIEnv* env); + JvmMetrics(MetricRegistry* registry); ~JvmMetrics() = default; void update(); diff --git a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h index d314cba7a656a9..44f1889efd3525 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h +++ b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h @@ -148,6 +148,7 @@ struct AggregateJavaUdafData { jbyteArray arr = env->NewByteArray(len); env->SetByteArrayRegion(arr, 0, len, reinterpret_cast(serialize_data.data())); env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_merge_id, place, arr); + RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env)); jbyte* pBytes = env->GetByteArrayElements(arr, nullptr); env->ReleaseByteArrayElements(arr, pBytes, JNI_ABORT); env->DeleteLocalRef(arr); diff --git a/be/src/vec/exec/jni_connector.cpp b/be/src/vec/exec/jni_connector.cpp index f06524944ffc51..a7b0d5144ee623 100644 --- a/be/src/vec/exec/jni_connector.cpp +++ b/be/src/vec/exec/jni_connector.cpp @@ -80,16 +80,13 @@ Status JniConnector::open(RuntimeState* state, RuntimeProfile* profile) { batch_size = _state->batch_size(); } RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); - if (env == nullptr) { - return Status::InternalError("Failed to get/create JVM"); - } SCOPED_TIMER(_open_scanner_time); _scanner_params.emplace("time_zone", _state->timezone()); RETURN_IF_ERROR(_init_jni_scanner(env, batch_size)); // Call org.apache.doris.common.jni.JniScanner#open env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_open); - _scanner_opened = true; RETURN_ERROR_IF_EXC(env); + _scanner_opened = true; return Status::OK(); } diff --git a/be/src/vec/exec/vjdbc_connector.cpp b/be/src/vec/exec/vjdbc_connector.cpp index 20660a4cbe6ba2..6c8491ef213085 100644 --- a/be/src/vec/exec/vjdbc_connector.cpp +++ b/be/src/vec/exec/vjdbc_connector.cpp @@ -95,26 +95,23 @@ Status JdbcConnector::open(RuntimeState* state, bool read) { RETURN_IF_ERROR(JniUtil::get_jni_scanner_class(env, JDBC_EXECUTOR_FACTORY_CLASS, &_executor_factory_clazz)); - _executor_factory_ctor_id = - env->GetStaticMethodID(_executor_factory_clazz, "getExecutorClass", - "(Lorg/apache/doris/thrift/TOdbcTableType;)Ljava/lang/String;"); - if (_executor_factory_ctor_id == nullptr) { - return Status::InternalError("Failed to find method ID for getExecutorClass"); - } + JNI_CALL_METHOD_CHECK_EXCEPTION( + , _executor_factory_ctor_id, env, + GetStaticMethodID(_executor_factory_clazz, "getExecutorClass", + "(Lorg/apache/doris/thrift/TOdbcTableType;)Ljava/lang/String;")); jobject jtable_type = _get_java_table_type(env, _conn_param.table_type); - jstring executor_name = (jstring)env->CallStaticObjectMethod( - _executor_factory_clazz, _executor_factory_ctor_id, jtable_type); - if (executor_name == nullptr) { - return Status::InternalError("getExecutorClass returned null"); - } - const char* executor_name_str = env->GetStringUTFChars(executor_name, nullptr); + JNI_CALL_METHOD_CHECK_EXCEPTION_DELETE_REF( + jobject, executor_name, env, + CallStaticObjectMethod(_executor_factory_clazz, _executor_factory_ctor_id, + jtable_type)); + + const char* executor_name_str = env->GetStringUTFChars((jstring)executor_name, nullptr); RETURN_IF_ERROR(JniUtil::get_jni_scanner_class(env, executor_name_str, &_executor_clazz)); env->DeleteLocalRef(jtable_type); - env->ReleaseStringUTFChars(executor_name, executor_name_str); - env->DeleteLocalRef(executor_name); + env->ReleaseStringUTFChars((jstring)executor_name, executor_name_str); #undef GET_BASIC_JAVA_CLAZZ RETURN_IF_ERROR(_register_func_id(env)); @@ -190,14 +187,16 @@ Status JdbcConnector::test_connection() { RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_test_connection_id); - return JniUtil::GetJniExceptionMsg(env); + RETURN_ERROR_IF_EXC(env); + return Status::OK(); } Status JdbcConnector::clean_datasource() { JNIEnv* env = nullptr; RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_clean_datasource_id); - return JniUtil::GetJniExceptionMsg(env); + RETURN_ERROR_IF_EXC(env); + return Status::OK(); } Status JdbcConnector::query() { @@ -305,7 +304,7 @@ Status JdbcConnector::exec_stmt_write(Block* block, const VExprContextSPtrs& out env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz, _executor_stmt_write_id, hashmap_object); env->DeleteLocalRef(hashmap_object); - RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env)); + RETURN_ERROR_IF_EXC(env); *num_rows_sent = block->rows(); return Status::OK(); } @@ -315,7 +314,7 @@ Status JdbcConnector::begin_trans() { JNIEnv* env = nullptr; RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_begin_trans_id); - RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env)); + RETURN_ERROR_IF_EXC(env); _is_in_transaction = true; } return Status::OK(); @@ -328,7 +327,8 @@ Status JdbcConnector::abort_trans() { JNIEnv* env = nullptr; RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_abort_trans_id); - return JniUtil::GetJniExceptionMsg(env); + RETURN_ERROR_IF_EXC(env); + return Status::OK(); } Status JdbcConnector::finish_trans() { @@ -336,7 +336,7 @@ Status JdbcConnector::finish_trans() { JNIEnv* env = nullptr; RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_finish_trans_id); - RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env)); + RETURN_ERROR_IF_EXC(env); _is_in_transaction = false; } return Status::OK(); diff --git a/be/src/vec/exprs/table_function/udf_table_function.cpp b/be/src/vec/exprs/table_function/udf_table_function.cpp index 82e727b3f5dee9..35357f7c9357e1 100644 --- a/be/src/vec/exprs/table_function/udf_table_function.cpp +++ b/be/src/vec/exprs/table_function/udf_table_function.cpp @@ -48,9 +48,6 @@ UDFTableFunction::UDFTableFunction(const TFunction& t_fn) : TableFunction(), _t_ Status UDFTableFunction::open() { JNIEnv* env = nullptr; RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); - if (env == nullptr) { - return Status::InternalError("Failed to get/create JVM"); - } _jni_ctx = std::make_shared(); // Add a scoped cleanup jni reference object. This cleans up local refs made below. JniLocalFrame jni_frame; @@ -70,14 +67,22 @@ Status UDFTableFunction::open() { RETURN_IF_ERROR(jni_frame.push(env)); RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes)); RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, EXECUTOR_CLASS, &_jni_ctx->executor_cl)); - _jni_ctx->executor_ctor_id = - env->GetMethodID(_jni_ctx->executor_cl, "", EXECUTOR_CTOR_SIGNATURE); - _jni_ctx->executor_evaluate_id = - env->GetMethodID(_jni_ctx->executor_cl, "evaluate", EXECUTOR_EVALUATE_SIGNATURE); - _jni_ctx->executor_close_id = - env->GetMethodID(_jni_ctx->executor_cl, "close", EXECUTOR_CLOSE_SIGNATURE); - _jni_ctx->executor = env->NewObject(_jni_ctx->executor_cl, _jni_ctx->executor_ctor_id, - ctor_params_bytes); + + JNI_CALL_METHOD_CHECK_EXCEPTION( + , _jni_ctx->executor_ctor_id, env, + GetMethodID(_jni_ctx->executor_cl, "", EXECUTOR_CTOR_SIGNATURE)); + + JNI_CALL_METHOD_CHECK_EXCEPTION( + , _jni_ctx->executor_evaluate_id, env, + GetMethodID(_jni_ctx->executor_cl, "evaluate", EXECUTOR_EVALUATE_SIGNATURE)); + + JNI_CALL_METHOD_CHECK_EXCEPTION( + , _jni_ctx->executor_close_id, env, + GetMethodID(_jni_ctx->executor_cl, "close", EXECUTOR_CLOSE_SIGNATURE)); + + JNI_CALL_METHOD_CHECK_EXCEPTION( + , _jni_ctx->executor, env, + NewObject(_jni_ctx->executor_cl, _jni_ctx->executor_ctor_id, ctor_params_bytes)); jbyte* pBytes = env->GetByteArrayElements(ctor_params_bytes, nullptr); env->ReleaseByteArrayElements(ctor_params_bytes, pBytes, JNI_ABORT); env->DeleteLocalRef(ctor_params_bytes); @@ -123,9 +128,10 @@ Status UDFTableFunction::process_init(Block* block, RuntimeState* state) { jobject output_map = JniUtil::convert_to_java_map(env, output_params); DCHECK(_jni_ctx != nullptr); DCHECK(_jni_ctx->executor != nullptr); - long output_address = env->CallLongMethod(_jni_ctx->executor, _jni_ctx->executor_evaluate_id, - input_map, output_map); - RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env)); + JNI_CALL_METHOD_CHECK_EXCEPTION( + long, output_address, env, + CallLongMethod(_jni_ctx->executor, _jni_ctx->executor_evaluate_id, input_map, + output_map)); env->DeleteLocalRef(input_map); env->DeleteLocalRef(output_map); RETURN_IF_ERROR(JniConnector::fill_block(block, {_result_column_idx}, output_address)); diff --git a/be/src/vec/functions/function_java_udf.cpp b/be/src/vec/functions/function_java_udf.cpp index 86daf5ebf3bb0a..e2c441b660201d 100644 --- a/be/src/vec/functions/function_java_udf.cpp +++ b/be/src/vec/functions/function_java_udf.cpp @@ -42,9 +42,6 @@ JavaFunctionCall::JavaFunctionCall(const TFunction& fn, const DataTypes& argumen Status JavaFunctionCall::open(FunctionContext* context, FunctionContext::FunctionStateScope scope) { JNIEnv* env = nullptr; RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); - if (env == nullptr) { - return Status::InternalError("Failed to get/create JVM"); - } if (scope == FunctionContext::FunctionStateScope::THREAD_LOCAL) { SCOPED_TIMER(context->get_udf_execute_timer());