diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt index f8694275c9..24f28b10c1 100644 --- a/be/CMakeLists.txt +++ b/be/CMakeLists.txt @@ -486,8 +486,6 @@ endif() set(COMMON_THIRDPARTY rocksdb cyrus-sasl - librdkafka_cpp - librdkafka libs2 snappy Boost::date_time @@ -525,6 +523,9 @@ set(COMMON_THIRDPARTY minizip breakpad ${AWS_LIBS} + # put this after lz4 to avoid using lz4 lib in librdkafka + librdkafka_cpp + librdkafka ) if (${MAKE_TEST} STREQUAL "ON") diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index b6863b50e5..044feda7cf 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -200,8 +200,10 @@ static void init_doris_metrics(const std::vector& store_paths) { DorisMetrics::instance()->initialize(init_system_metrics, disk_devices, network_interfaces); } -void sigterm_handler(int signo) { - k_doris_exit = true; +void signal_handler(int signal) { + if (signal == SIGINT || signal == SIGTERM) { + k_doris_exit = true; + } } int install_signal(int signo, void (*handler)(int)) { @@ -219,11 +221,11 @@ int install_signal(int signo, void (*handler)(int)) { } void init_signals() { - auto ret = install_signal(SIGINT, sigterm_handler); + auto ret = install_signal(SIGINT, signal_handler); if (ret < 0) { exit(-1); } - ret = install_signal(SIGTERM, sigterm_handler); + ret = install_signal(SIGTERM, signal_handler); if (ret < 0) { exit(-1); } diff --git a/be/src/exec/s3_reader.cpp b/be/src/exec/s3_reader.cpp index ae5cc3b411..30e6daaa59 100644 --- a/be/src/exec/s3_reader.cpp +++ b/be/src/exec/s3_reader.cpp @@ -23,6 +23,7 @@ #include "common/logging.h" #include "gutil/strings/strcat.h" +#include "service/backend_options.h" #include "util/s3_util.h" namespace doris { @@ -64,7 +65,7 @@ Status S3Reader::open() { } else { std::stringstream out; out << "Error: [" << response.GetError().GetExceptionName() << ":" - << response.GetError().GetMessage(); + << response.GetError().GetMessage() << "] at " << BackendOptions::get_localhost(); return Status::InternalError(out.str()); } } @@ -99,7 +100,7 @@ Status S3Reader::readat(int64_t position, int64_t nbytes, int64_t* bytes_read, v *bytes_read = 0; std::stringstream out; out << "Error: [" << response.GetError().GetExceptionName() << ":" - << response.GetError().GetMessage(); + << response.GetError().GetMessage() << "] at " << BackendOptions::get_localhost(); LOG(INFO) << out.str(); return Status::InternalError(out.str()); } diff --git a/be/src/exec/s3_writer.cpp b/be/src/exec/s3_writer.cpp index 97545d158c..17e64a4e28 100644 --- a/be/src/exec/s3_writer.cpp +++ b/be/src/exec/s3_writer.cpp @@ -23,6 +23,7 @@ #include #include "common/logging.h" +#include "service/backend_options.h" #include "util/s3_uri.h" #include "util/s3_util.h" @@ -66,7 +67,7 @@ Status S3Writer::open() { } else { std::stringstream out; out << "Error: [" << response.GetError().GetExceptionName() << ":" - << response.GetError().GetMessage(); + << response.GetError().GetMessage() << "] at " << BackendOptions::get_localhost(); return Status::InternalError(out.str()); } } @@ -77,11 +78,13 @@ Status S3Writer::write(const uint8_t* buf, size_t buf_len, size_t* written_len) return Status::OK(); } if (!_temp_file) { - return Status::BufferAllocFailed("The internal temporary file is not writable."); + return Status::BufferAllocFailed("The internal temporary file is not writable. at " + + BackendOptions::get_localhost()); } _temp_file->write(reinterpret_cast(buf), buf_len); if (!_temp_file->good()) { - return Status::BufferAllocFailed("Could not append to the internal temporary file."); + return Status::BufferAllocFailed("Could not append to the internal temporary file. at " + + BackendOptions::get_localhost()); } *written_len = buf_len; return Status::OK(); @@ -97,7 +100,8 @@ Status S3Writer::close() { Status S3Writer::_sync() { if (!_temp_file) { - return Status::BufferAllocFailed("The internal temporary file is not writable."); + return Status::BufferAllocFailed("The internal temporary file is not writable. at " + + BackendOptions::get_localhost()); } CHECK_S3_CLIENT(_client); Aws::S3::Model::PutObjectRequest request; @@ -114,7 +118,7 @@ Status S3Writer::_sync() { } else { std::stringstream out; out << "Error: [" << response.GetError().GetExceptionName() << ":" - << response.GetError().GetMessage(); + << response.GetError().GetMessage() << "] at " << BackendOptions::get_localhost(); return Status::InternalError(out.str()); } } diff --git a/be/src/olap/wrapper_field.h b/be/src/olap/wrapper_field.h index 3d22a7b61a..51e4b3e49f 100644 --- a/be/src/olap/wrapper_field.h +++ b/be/src/olap/wrapper_field.h @@ -42,7 +42,7 @@ public: delete _rep; delete[] _owned_buf; if (_long_text_buf) { - delete _long_text_buf; + free(_long_text_buf); } } diff --git a/be/src/runtime/routine_load/data_consumer.cpp b/be/src/runtime/routine_load/data_consumer.cpp index 7522e86d22..2cf330f0b5 100644 --- a/be/src/runtime/routine_load/data_consumer.cpp +++ b/be/src/runtime/routine_load/data_consumer.cpp @@ -164,6 +164,7 @@ Status KafkaDataConsumer::assign_topic_partitions( if (err) { LOG(WARNING) << "failed to assign topic partitions: " << ctx->brief(true) << ", err: " << RdKafka::err2str(err); + _k_consumer->unassign(); return Status::InternalError("failed to assign topic partitions"); } @@ -382,6 +383,7 @@ Status KafkaDataConsumer::cancel(StreamLoadContext* ctx) { Status KafkaDataConsumer::reset() { std::unique_lock l(_lock); _cancelled = false; + _k_consumer->unassign(); // reset will be called before this consumer being returned to the pool. // so update _last_visit_time is reasonable. _last_visit_time = time(nullptr);