diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt index 95b448c2e8..dfe85cee40 100644 --- a/be/CMakeLists.txt +++ b/be/CMakeLists.txt @@ -386,38 +386,29 @@ if (CMAKE_CXX_COMPILER_VERSION VERSION_GREATER 7.0) endif() # For any gcc builds: -# -g: Enable symbols for profiler tools +# -g: Enable symbols for profiler tools. Produce debugging information in the operating system’s native formt # -Wno-unused-local-typedefs: Do not warn for local typedefs that are unused. set(CXX_GCC_FLAGS "${CXX_GCC_FLAGS} -g -Wno-unused-local-typedefs") # For CMAKE_BUILD_TYPE=Debug -# -ggdb: Enable gdb debugging -# Debug information is stored as dwarf2 to be as compatible as possible -# -Werror: compile warnings should be errors when using the toolchain compiler. -# Only enable for debug builds because this is what we test in pre-commit tests. -set(CXX_FLAGS_DEBUG "${CXX_GCC_FLAGS} -ggdb -O0") +set(CXX_FLAGS_DEBUG "${CXX_GCC_FLAGS} -O0") # For CMAKE_BUILD_TYPE=Release # -O3: Enable all compiler optimizations # -DNDEBUG: Turn off dchecks/asserts/debug only code. -# -gdwarf-2: Debug information is stored as dwarf2 to be as compatible as possible -set(CXX_FLAGS_RELEASE "${CXX_GCC_FLAGS} -O3 -gdwarf-2 -DNDEBUG") +set(CXX_FLAGS_RELEASE "${CXX_GCC_FLAGS} -O3 -DNDEBUG") -SET(CXX_FLAGS_ASAN "${CXX_GCC_FLAGS} -ggdb3 -O0 -gdwarf-2 -fsanitize=address -DADDRESS_SANITIZER") -SET(CXX_FLAGS_LSAN "${CXX_GCC_FLAGS} -ggdb3 -O0 -gdwarf-2 -fsanitize=leak -DLEAK_SANITIZER") +SET(CXX_FLAGS_ASAN "${CXX_GCC_FLAGS} -O0 -fsanitize=address -DADDRESS_SANITIZER") +SET(CXX_FLAGS_LSAN "${CXX_GCC_FLAGS} -O0 -fsanitize=leak -DLEAK_SANITIZER") # Set the flags to the undefined behavior sanitizer, also known as "ubsan" # Turn on sanitizer and debug symbols to get stack traces: -SET(CXX_FLAGS_UBSAN "${CXX_GCC_FLAGS} -ggdb3 -O0 -gdwarf-2 -fno-wrapv -fsanitize=undefined") -# Ignore a number of noisy errors with too many false positives: -# TODO(zc): -# SET(CXX_FLAGS_UBSAN "${CXX_FLAGS_UBSAN} -fno-sanitize=alignment,function,vptr,float-divide-by-zero,float-cast-overflow") -# Don't enforce wrapped signed integer arithmetic so that the sanitizer actually sees +SET(CXX_FLAGS_UBSAN "${CXX_GCC_FLAGS} -O0 -fno-wrapv -fsanitize=undefined") # Set the flags to the thread sanitizer, also known as "tsan" # Turn on sanitizer and debug symbols to get stack traces: # Use -Wno-builtin-declaration-mismatch to mute warnings like "new declaration ‘__tsan_atomic16 __tsan_atomic16_fetch_nand(..." -SET(CXX_FLAGS_TSAN "${CXX_GCC_FLAGS} -O0 -ggdb3 -fsanitize=thread -DTHREAD_SANITIZER -Wno-builtin-declaration-mismatch") +SET(CXX_FLAGS_TSAN "${CXX_GCC_FLAGS} -O0 -fsanitize=thread -DTHREAD_SANITIZER -Wno-builtin-declaration-mismatch") # Set compile flags based on the build type. if ("${CMAKE_BUILD_TYPE}" STREQUAL "DEBUG") diff --git a/be/src/exprs/rpc_fn_call.cpp b/be/src/exprs/rpc_fn_call.cpp index 92b67e5949..94bece7d12 100644 --- a/be/src/exprs/rpc_fn_call.cpp +++ b/be/src/exprs/rpc_fn_call.cpp @@ -204,6 +204,12 @@ Status RPCFnCall::_eval_children(ExprContext* context, TupleRow* row, _rpc_function_symbol, cntl.ErrorText()) .c_str()); } + if (!response->has_status() || !response->has_result()) { + FunctionContext* fn_ctx = context->fn_context(_fn_context_index); + fn_ctx->set_error(response->status().DebugString().c_str()); + return Status::InternalError(fmt::format( + "call rpc function {} failed: status or result is not set.", _rpc_function_symbol)); + } if (response->status().status_code() != 0) { FunctionContext* fn_ctx = context->fn_context(_fn_context_index); fn_ctx->set_error(response->status().DebugString().c_str()); diff --git a/be/src/util/brpc_client_cache.h b/be/src/util/brpc_client_cache.h index f310cd18f6..d0d8b73689 100644 --- a/be/src/util/brpc_client_cache.h +++ b/be/src/util/brpc_client_cache.h @@ -123,15 +123,22 @@ public: LOG(WARNING) << "stub is null to: " << host_port; return false; } + std::string message = "hello doris!"; PHandShakeRequest request; + request.set_hello(message); PHandShakeResponse response; brpc::Controller cntl; stub->hand_shake(&cntl, &request, &response, nullptr); - if (!cntl.Failed()) { + if (cntl.Failed()) { + LOG(WARNING) << "open brpc connection to " << host_port + << " failed: " << cntl.ErrorText(); + return false; + } else if (response.has_status() && response.has_hello() && response.hello() == message && + response.status().status_code() == 0) { return true; } else { - LOG(WARNING) << "open brpc connection to " << host_port - << " failed: " << cntl.ErrorText(); + LOG(WARNING) << "open brpc connection to " << host_port + << " failed: " << response.DebugString(); return false; } } diff --git a/be/src/vec/functions/function_rpc.cpp b/be/src/vec/functions/function_rpc.cpp index 1003e0d3f8..9208ae2295 100644 --- a/be/src/vec/functions/function_rpc.cpp +++ b/be/src/vec/functions/function_rpc.cpp @@ -290,6 +290,24 @@ void convert_col_to_pvalue(const ColumnPtr& column, const DataTypePtr& data_type } break; } + case TypeIndex::HLL: { + ptype->set_id(PGenericType::HLL); + arg->mutable_bytes_value()->Reserve(row_count); + for (size_t row_num = 0; row_num < row_count; ++row_num) { + if constexpr (nullable) { + if (column->is_null_at(row_num)) { + arg->add_bytes_value(nullptr); + } else { + StringRef data = column->get_data_at(row_num); + arg->add_bytes_value(data.data, data.size); + } + } else { + StringRef data = column->get_data_at(row_num); + arg->add_bytes_value(data.data, data.size); + } + } + break; + } default: LOG(INFO) << "unknown type: " << data_type->get_name(); ptype->set_id(PGenericType::UNKNOWN); @@ -469,6 +487,13 @@ void convert_to_column(MutableColumnPtr& column, const PValues& result) { } break; } + case PGenericType::HLL: { + column->reserve(result.bytes_value_size()); + for (int i = 0; i < result.bytes_value_size(); ++i) { + column->insert_data(result.bytes_value(i).c_str(), result.bytes_value(i).size()); + } + break; + } default: { LOG(WARNING) << "unknown PGenericType: " << result.type().DebugString(); break; @@ -517,6 +542,10 @@ Status RPCFnCall::execute(FunctionContext* context, Block& block, const ColumnNu fmt::format("call to rpc function {} failed: {}", _symbol, cntl.ErrorText()) .c_str()); } + if (!response.has_status() || !response.has_result()) { + return Status::InternalError( + fmt::format("call rpc function {} failed: status or result is not set.", _symbol)); + } if (response.status().status_code() != 0) { return Status::InternalError(fmt::format("call to rpc function {} failed: {}", _symbol, response.status().DebugString())); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java index a0e2ccb931..df609a9044 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java @@ -275,14 +275,17 @@ public class CreateFunctionStmt extends DdlStmt { .usePlaintext().build(); PFunctionServiceGrpc.PFunctionServiceBlockingStub stub = PFunctionServiceGrpc.newBlockingStub(channel); FunctionService.PCheckFunctionRequest.Builder builder = FunctionService.PCheckFunctionRequest.newBuilder(); - builder.getFunctionBuilder().setFunctionName(functionName.getFunction()); + builder.getFunctionBuilder().setFunctionName(symbol); for (Type arg : argsDef.getArgTypes()) { builder.getFunctionBuilder().addInputs(convertToPParameterType(arg)); } builder.getFunctionBuilder().setOutput(convertToPParameterType(returnType.getType())); FunctionService.PCheckFunctionResponse response = stub.checkFn(builder.build()); + if (response == null || !response.hasStatus()) { + throw new AnalysisException("cannot access function server"); + } if (response.getStatus().getStatusCode() != 0) { - throw new AnalysisException("cannot access function server:" + response.getStatus()); + throw new AnalysisException("check function [" + symbol + "] failed: " + response.getStatus()); } } function = ScalarFunction.createUdf(binaryType,