From 400d8a906f0961c84b91beef0d9d972614274cf8 Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Thu, 4 Apr 2019 13:12:21 +0800 Subject: [PATCH] Optimize the consumer assignment of Kafka routine load job (#870) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. Use a data consumer group to share a single stream load pipe with multi data consumers. This will increase the consuming speed of Kafka messages, as well as reducing the task number of routine load job. Test results: * 1 consumer, 1 partitions: consume time: 4.469s, rows: 990140, bytes: 128737139. 221557 rows/s, 28M/s * 1 consumer, 3 partitions: consume time: 12.765s, rows: 2000143, bytes: 258631271. 156689 rows/s, 20M/s blocking get time(us): 12268241, blocking put time(us): 1886431 * 3 consumers, 3 partitions: consume time(all 3): 6.095s, rows: 2000503, bytes: 258631576. 328220 rows/s, 42M/s blocking get time(us): 1041639, blocking put time(us): 10356581 The next 2 cases show that we can achieve higher speed by adding more consumers. But the bottle neck transfers from Kafka consumer to Doris ingestion, so 3 consumers in a group is enough. I also add a Backend config `max_consumer_num_per_group` to change the number of consumers in a data consumer group, and default value is 3. In my test(1 Backend, 2 tablets, 1 replicas), 1 routine load task can achieve 10M/s, which is same as raw stream load. 2. Add OFFSET_BEGINNING and OFFSET_END support for Kafka routine load --- be/src/common/config.h | 3 + be/src/runtime/CMakeLists.txt | 1 + be/src/runtime/routine_load/data_consumer.cpp | 137 ++++--------- be/src/runtime/routine_load/data_consumer.h | 23 ++- .../routine_load/data_consumer_group.cpp | 181 ++++++++++++++++++ .../routine_load/data_consumer_group.h | 94 +++++++++ .../routine_load/data_consumer_pool.cpp | 33 ++++ .../runtime/routine_load/data_consumer_pool.h | 10 +- .../routine_load_task_executor.cpp | 28 +-- .../routine_load/routine_load_task_executor.h | 1 - .../stream_load/stream_load_context.cpp | 2 +- be/src/util/semaphore.hpp | 50 +++++ .../Data Manipulation/routine_load.md | 12 +- .../doris/analysis/CreateRoutineLoadStmt.java | 21 +- .../org/apache/doris/load/LoadChecker.java | 5 +- .../doris/load/routineload/KafkaProgress.java | 74 ++++--- .../load/routineload/KafkaRoutineLoadJob.java | 41 ++-- .../doris/load/routineload/KafkaTaskInfo.java | 4 +- .../load/routineload/RoutineLoadJob.java | 37 ++-- .../load/routineload/RoutineLoadManager.java | 17 +- .../routineload/RoutineLoadScheduler.java | 3 +- .../transaction/GlobalTransactionMgr.java | 18 +- .../transaction/PublishVersionDaemon.java | 3 +- .../doris/transaction/TransactionState.java | 5 +- .../transaction/TxnStateChangeListener.java | 6 +- .../routineload/KafkaRoutineLoadJobTest.java | 54 ++++-- .../routineload/RoutineLoadManagerTest.java | 5 +- .../transaction/GlobalTransactionMgrTest.java | 31 +-- 28 files changed, 651 insertions(+), 248 deletions(-) create mode 100644 be/src/runtime/routine_load/data_consumer_group.cpp create mode 100644 be/src/runtime/routine_load/data_consumer_group.h create mode 100644 be/src/util/semaphore.hpp diff --git a/be/src/common/config.h b/be/src/common/config.h index f723ec36da..633defec05 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -396,6 +396,9 @@ namespace config { // If set to true, metric calculator will run CONF_Bool(enable_metric_calculator, "false"); + + // max consumer num in one data consumer group, for routine load + CONF_Int32(max_consumer_num_per_group, "3"); } // namespace config } // namespace doris diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index f920bde6e8..7f19dcd0c7 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -98,6 +98,7 @@ add_library(Runtime STATIC stream_load/stream_load_context.cpp stream_load/stream_load_executor.cpp routine_load/data_consumer.cpp + routine_load/data_consumer_group.cpp routine_load/data_consumer_pool.cpp routine_load/routine_load_task_executor.cpp ) diff --git a/be/src/runtime/routine_load/data_consumer.cpp b/be/src/runtime/routine_load/data_consumer.cpp index e9822fea2e..ce0d08e6c4 100644 --- a/be/src/runtime/routine_load/data_consumer.cpp +++ b/be/src/runtime/routine_load/data_consumer.cpp @@ -23,8 +23,6 @@ #include #include "common/status.h" -#include "runtime/stream_load/stream_load_pipe.h" -#include "runtime/routine_load/kafka_consumer_pipe.h" #include "service/backend_options.h" #include "util/defer_op.h" #include "util/stopwatch.hpp" @@ -92,20 +90,24 @@ Status KafkaDataConsumer::init(StreamLoadContext* ctx) { return Status::OK; } -Status KafkaDataConsumer::assign_topic_partitions(StreamLoadContext* ctx) { +Status KafkaDataConsumer::assign_topic_partitions( + const std::map& begin_partition_offset, + const std::string& topic, + StreamLoadContext* ctx) { + DCHECK(_k_consumer); // create TopicPartitions std::stringstream ss; std::vector topic_partitions; - for (auto& entry : ctx->kafka_info->begin_offset) { + for (auto& entry : begin_partition_offset) { RdKafka::TopicPartition* tp1 = RdKafka::TopicPartition::create( - ctx->kafka_info->topic, entry.first, entry.second); + topic, entry.first, entry.second); topic_partitions.push_back(tp1); - ss << "partition[" << entry.first << "-" << entry.second << "] "; + ss << "[" << entry.first << ": " << entry.second << "] "; } - VLOG(1) << "assign topic partitions: " << ctx->kafka_info->topic - << ", " << ss.str(); + LOG(INFO) << "consumer: " << _id << ", grp: " << _grp_id + << " assign topic partitions: " << topic << ", " << ss.str(); // delete TopicPartition finally auto tp_deleter = [&topic_partitions] () { @@ -125,116 +127,65 @@ Status KafkaDataConsumer::assign_topic_partitions(StreamLoadContext* ctx) { return Status::OK; } -Status KafkaDataConsumer::start(StreamLoadContext* ctx) { - { - std::unique_lock l(_lock); - if (!_init) { - return Status("consumer is not initialized"); - } - } - +Status KafkaDataConsumer::group_consume( + BlockingQueue* queue, + int64_t max_running_time_ms) { _last_visit_time = time(nullptr); + int64_t left_time = max_running_time_ms; + LOG(INFO) << "start kafka consumer: " << _id << ", grp: " << _grp_id + << ", max running time(ms): " << left_time; - int64_t left_time = ctx->max_interval_s * 1000; - int64_t left_rows = ctx->max_batch_rows; - int64_t left_bytes = ctx->max_batch_size; - - std::shared_ptr kakfa_pipe = std::static_pointer_cast(ctx->body_sink); - - LOG(INFO) << "start consumer" - << ". max time(ms): " << left_time - << ", batch rows: " << left_rows - << ", batch size: " << left_bytes - << ". " << ctx->brief(); - - // copy one - std::map cmt_offset = ctx->kafka_info->cmt_offset; + int64_t received_rows = 0; + Status st = Status::OK; MonotonicStopWatch consumer_watch; MonotonicStopWatch watch; watch.start(); - Status st; while (true) { - std::unique_lock l(_lock); - if (_cancelled) { - kakfa_pipe ->cancel(); - return Status::CANCELLED; + { + std::unique_lock l(_lock); + if (_cancelled) { break; } } - if (_finished) { - kakfa_pipe ->finish(); - ctx->kafka_info->cmt_offset = std::move(cmt_offset); - return Status::OK; - } - - if (left_time <= 0 || left_rows <= 0 || left_bytes <=0) { - LOG(INFO) << "kafka consume batch done" - << ". consume time(ms)=" << ctx->max_interval_s * 1000 - left_time - << ", received rows=" << ctx->max_batch_rows - left_rows - << ", received bytes=" << ctx->max_batch_size - left_bytes - << ", kafka consume time(ms)=" << consumer_watch.elapsed_time() / 1000 / 1000; - - - if (left_bytes == ctx->max_batch_size) { - // nothing to be consumed, cancel it - // we do not allow finishing stream load pipe without data - kakfa_pipe->cancel(); - _cancelled = true; - return Status::CANCELLED; - } else { - DCHECK(left_bytes < ctx->max_batch_size); - DCHECK(left_rows < ctx->max_batch_rows); - kakfa_pipe->finish(); - ctx->kafka_info->cmt_offset = std::move(cmt_offset); - ctx->receive_bytes = ctx->max_batch_size - left_bytes; - _finished = true; - return Status::OK; - } - } + if (left_time <= 0) { break; } + bool done = false; // consume 1 message at a time consumer_watch.start(); RdKafka::Message *msg = _k_consumer->consume(1000 /* timeout, ms */); consumer_watch.stop(); switch (msg->err()) { case RdKafka::ERR_NO_ERROR: - VLOG(3) << "get kafka message" - << ", partition: " << msg->partition() - << ", offset: " << msg->offset() - << ", len: " << msg->len(); - - st = kakfa_pipe ->append_with_line_delimiter( - static_cast(msg->payload()), - static_cast(msg->len())); - if (st.ok()) { - left_rows--; - left_bytes -= msg->len(); - cmt_offset[msg->partition()] = msg->offset(); - VLOG(3) << "consume partition[" << msg->partition() - << " - " << msg->offset() << "]"; + if (!queue->blocking_put(msg)) { + // queue is shutdown + done = true; } - + ++received_rows; break; case RdKafka::ERR__TIMED_OUT: // leave the status as OK, because this may happend // if there is no data in kafka. - LOG(WARNING) << "kafka consume timeout"; + LOG(WARNING) << "kafka consume timeout: " << _id; break; default: - LOG(WARNING) << "kafka consume failed: " << msg->errstr(); + LOG(WARNING) << "kafka consume failed: " << _id + << ", msg: " << msg->errstr(); + done = true; st = Status(msg->errstr()); break; } - delete msg; - if (!st.ok()) { - kakfa_pipe ->cancel(); - return st; - } - - left_time = ctx->max_interval_s * 1000 - watch.elapsed_time() / 1000 / 1000; + left_time = max_running_time_ms - watch.elapsed_time() / 1000 / 1000; + if (done) { break; } } - return Status::OK; + LOG(INFO) << "kafka conumer done: " << _id << ", grp: " << _grp_id + << ". cancelled: " << _cancelled + << ", left time(ms): " << left_time + << ", total cost(ms): " << watch.elapsed_time() / 1000 / 1000 + << ", consume cost(ms): " << consumer_watch.elapsed_time() / 1000 / 1000 + << ", received rows: " << received_rows; + + return st; } Status KafkaDataConsumer::cancel(StreamLoadContext* ctx) { @@ -243,17 +194,13 @@ Status KafkaDataConsumer::cancel(StreamLoadContext* ctx) { return Status("consumer is not initialized"); } - if (_finished) { - return Status("consumer is already finished"); - } - _cancelled = true; + LOG(INFO) << "kafka consumer cancelled. " << _id; return Status::OK; } Status KafkaDataConsumer::reset() { std::unique_lock l(_lock); - _finished = false; _cancelled = false; return Status::OK; } diff --git a/be/src/runtime/routine_load/data_consumer.h b/be/src/runtime/routine_load/data_consumer.h index 7c4cdad0a4..b42d36c891 100644 --- a/be/src/runtime/routine_load/data_consumer.h +++ b/be/src/runtime/routine_load/data_consumer.h @@ -23,6 +23,7 @@ #include "librdkafka/rdkafkacpp.h" #include "runtime/stream_load/stream_load_context.h" +#include "util/blocking_queue.hpp" #include "util/uid_util.h" namespace doris { @@ -34,8 +35,8 @@ class StreamLoadPipe; class DataConsumer { public: DataConsumer(StreamLoadContext* ctx): + _has_grp(false), _init(false), - _finished(false), _cancelled(false), _last_visit_time(0) { } @@ -46,7 +47,7 @@ public: // init the consumer with the given parameters virtual Status init(StreamLoadContext* ctx) = 0; // start consuming - virtual Status start(StreamLoadContext* ctx) = 0; + virtual Status consume(StreamLoadContext* ctx) = 0; // cancel the consuming process. // if the consumer is not initialized, or the consuming // process is already finished, call cancel() will @@ -59,14 +60,19 @@ public: const UniqueId& id() { return _id; } time_t last_visit_time() { return _last_visit_time; } + void set_grp(const UniqueId& grp_id) { + _grp_id = grp_id; + _has_grp = true; + } protected: UniqueId _id; + UniqueId _grp_id; + bool _has_grp; // lock to protect the following bools std::mutex _lock; bool _init; - bool _finished; bool _cancelled; time_t _last_visit_time; }; @@ -120,13 +126,20 @@ public: } virtual Status init(StreamLoadContext* ctx) override; - virtual Status start(StreamLoadContext* ctx) override; + // TODO(cmy): currently do not implement single consumer start method, using group_consume + virtual Status consume(StreamLoadContext* ctx) override { return Status::OK; } virtual Status cancel(StreamLoadContext* ctx) override; // reassign partition topics virtual Status reset() override; virtual bool match(StreamLoadContext* ctx) override; - Status assign_topic_partitions(StreamLoadContext* ctx); + Status assign_topic_partitions( + const std::map& begin_partition_offset, + const std::string& topic, + StreamLoadContext* ctx); + + // start the consumer and put msgs to queue + Status group_consume(BlockingQueue* queue, int64_t max_running_time_ms); private: std::string _brokers; diff --git a/be/src/runtime/routine_load/data_consumer_group.cpp b/be/src/runtime/routine_load/data_consumer_group.cpp new file mode 100644 index 0000000000..8df8858c95 --- /dev/null +++ b/be/src/runtime/routine_load/data_consumer_group.cpp @@ -0,0 +1,181 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#include "runtime/routine_load/data_consumer_group.h" +#include "runtime/routine_load/kafka_consumer_pipe.h" + +namespace doris { + +Status KafkaDataConsumerGroup::assign_topic_partitions(StreamLoadContext* ctx) { + DCHECK(ctx->kafka_info); + DCHECK(_consumers.size() >= 1); + + // divide partitions + int consumer_size = _consumers.size(); + std::vector> divide_parts(consumer_size); + int i = 0; + for (auto& kv : ctx->kafka_info->begin_offset) { + int idx = i % consumer_size; + divide_parts[idx].emplace(kv.first, kv.second); + i++; + } + + // assign partitions to consumers equally + for (int i = 0; i < consumer_size; ++i) { + RETURN_IF_ERROR(std::static_pointer_cast(_consumers[i])->assign_topic_partitions( + divide_parts[i], ctx->kafka_info->topic, ctx)); + } + + return Status::OK; +} + +Status KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) { + Status result_st = Status::OK; + // start all consumers + for(auto& consumer : _consumers) { + if (!_thread_pool.offer( + boost::bind(&KafkaDataConsumerGroup::actual_consume, this, consumer, &_queue, ctx->max_interval_s * 1000, + [this, &result_st] (const Status& st) { + std::unique_lock lock(_mutex); + _counter--; + if (_counter == 0) { + _queue.shutdown(); + LOG(INFO) << "all consumers are finished. shutdown queue. group id: " << _grp_id; + } + if (result_st.ok() && !st.ok()) { + result_st = st; + } + }))) { + + LOG(WARNING) << "failed to submit data consumer: " << consumer->id(); + return Status("failed to submit data consumer"); + } else { + VLOG(1) << "submit a data consumer: " << consumer->id(); + } + } + + // consuming from queue and put data to stream load pipe + int64_t left_time = ctx->max_interval_s * 1000; + int64_t left_rows = ctx->max_batch_rows; + int64_t left_bytes = ctx->max_batch_size; + + std::shared_ptr kafka_pipe = std::static_pointer_cast(ctx->body_sink); + + LOG(INFO) << "start consumer group: " << _grp_id + << ". max time(ms): " << left_time + << ", batch rows: " << left_rows + << ", batch size: " << left_bytes + << ". " << ctx->brief(); + + // copy one + std::map cmt_offset = ctx->kafka_info->cmt_offset; + + MonotonicStopWatch watch; + watch.start(); + Status st; + bool eos = false; + while (true) { + if (eos || left_time <= 0 || left_rows <= 0 || left_bytes <=0) { + LOG(INFO) << "consumer group done: " << _grp_id + << ". consume time(ms)=" << ctx->max_interval_s * 1000 - left_time + << ", received rows=" << ctx->max_batch_rows - left_rows + << ", received bytes=" << ctx->max_batch_size - left_bytes + << ", eos: " << eos + << ", blocking get time(us): " << _queue.total_get_wait_time() / 1000 + << ", blocking put time(us): " << _queue.total_put_wait_time() / 1000; + + // shutdown queue + _queue.shutdown(); + // cancel all consumers + for(auto& consumer : _consumers) { consumer->cancel(ctx); } + // clean the msgs left in queue + while(true) { + RdKafka::Message* msg; + if (_queue.blocking_get(&msg)) { + delete msg; + msg = nullptr; + } else { + break; + } + } + DCHECK(_queue.get_size() == 0); + + if (!result_st.ok()) { + // some of consumers encounter errors, cancel this task + return result_st; + } + + if (left_bytes == ctx->max_batch_size) { + // nothing to be consumed, we have to cancel it, because + // we do not allow finishing stream load pipe without data + kafka_pipe->cancel(); + return Status::CANCELLED; + } else { + DCHECK(left_bytes < ctx->max_batch_size); + DCHECK(left_rows < ctx->max_batch_rows); + kafka_pipe->finish(); + ctx->kafka_info->cmt_offset = std::move(cmt_offset); + ctx->receive_bytes = ctx->max_batch_size - left_bytes; + return Status::OK; + } + } + + RdKafka::Message* msg; + bool res = _queue.blocking_get(&msg); + if (res) { + VLOG(3) << "get kafka message" + << ", partition: " << msg->partition() + << ", offset: " << msg->offset() + << ", len: " << msg->len(); + + st = kafka_pipe->append_with_line_delimiter( + static_cast(msg->payload()), + static_cast(msg->len())); + if (st.ok()) { + left_rows--; + left_bytes -= msg->len(); + cmt_offset[msg->partition()] = msg->offset(); + VLOG(3) << "consume partition[" << msg->partition() + << " - " << msg->offset() << "]"; + } else { + // failed to append this msg, we must stop + LOG(WARNING) << "failed to append msg to pipe"; + eos = true; + } + delete msg; + } else { + // queue is empty and shutdown + eos = true; + } + + left_time = ctx->max_interval_s * 1000 - watch.elapsed_time() / 1000 / 1000; + } + + return Status::OK; +} + +void KafkaDataConsumerGroup::actual_consume( + std::shared_ptr consumer, + BlockingQueue* queue, + int64_t max_running_time_ms, + ConsumeFinishCallback cb) { + Status st = std::static_pointer_cast(consumer)->group_consume(queue, max_running_time_ms); + cb(st); +} + +} // end namespace diff --git a/be/src/runtime/routine_load/data_consumer_group.h b/be/src/runtime/routine_load/data_consumer_group.h new file mode 100644 index 0000000000..002672196c --- /dev/null +++ b/be/src/runtime/routine_load/data_consumer_group.h @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "runtime/routine_load/data_consumer.h" +#include "util/blocking_queue.hpp" +#include "util/thread_pool.hpp" + +namespace doris { + +// data consumer group saves a group of data consumers. +// These data consumers share the same stream load pipe. +// This class is not thread safe. +class DataConsumerGroup { +public: + typedef std::function ConsumeFinishCallback; + + DataConsumerGroup(): + _thread_pool(3, 10) {} + + virtual ~DataConsumerGroup() { + _consumers.clear(); + } + + const UniqueId& grp_id() { return _grp_id; } + + const std::vector>& consumers() { + return _consumers; + } + + void add_consumer(std::shared_ptr consumer) { + consumer->set_grp(_grp_id); + _consumers.push_back(consumer); + ++_counter; + } + + // start all consumers + virtual Status start_all(StreamLoadContext* ctx) { return Status::OK; } + +protected: + UniqueId _grp_id; + std::vector> _consumers; + // thread pool to run each consumer in multi thread + ThreadPool _thread_pool; + // mutex to protect counter. + // the counter is init as the number of consumers. + // once a consumer is done, decrease the counter. + // when the counter becomes zero, shutdown the queue to finish + std::mutex _mutex; + int _counter; +}; + +// for kafka +class KafkaDataConsumerGroup : public DataConsumerGroup { +public: + KafkaDataConsumerGroup(): + DataConsumerGroup(), + _queue(500) {} + + virtual ~KafkaDataConsumerGroup() {} + + virtual Status start_all(StreamLoadContext* ctx) override; + // assign topic partitions to all consumers equally + Status assign_topic_partitions(StreamLoadContext* ctx); + +private: + // start a single consumer + void actual_consume( + std::shared_ptr consumer, + BlockingQueue* queue, + int64_t max_running_time_ms, + ConsumeFinishCallback cb); + +private: + // blocking queue to receive msgs from all consumers + BlockingQueue _queue; +}; + +} // end namespace doris diff --git a/be/src/runtime/routine_load/data_consumer_pool.cpp b/be/src/runtime/routine_load/data_consumer_pool.cpp index 6d6de3777b..0267fee119 100644 --- a/be/src/runtime/routine_load/data_consumer_pool.cpp +++ b/be/src/runtime/routine_load/data_consumer_pool.cpp @@ -16,6 +16,8 @@ // under the License. #include "runtime/routine_load/data_consumer_pool.h" +#include "runtime/routine_load/data_consumer_group.h" +#include "common/config.h" namespace doris { @@ -60,6 +62,31 @@ Status DataConsumerPool::get_consumer( return Status::OK; } +Status DataConsumerPool::get_consumer_grp( + StreamLoadContext* ctx, + std::shared_ptr* ret) { + if (ctx->load_src_type != TLoadSourceType::KAFKA) { + return Status("Currently nly support consumer group for Kafka data source"); + } + DCHECK(ctx->kafka_info); + + std::shared_ptr grp = std::make_shared(); + + // one data consumer group contains at least one data consumers. + int max_consumer_num = config::max_consumer_num_per_group; + size_t consumer_num = std::min((size_t) max_consumer_num, ctx->kafka_info->begin_offset.size()); + for (int i = 0; i < consumer_num; ++i) { + std::shared_ptr consumer; + RETURN_IF_ERROR(get_consumer(ctx, &consumer)); + grp->add_consumer(consumer); + } + + LOG(INFO) << "get consumer group " << grp->grp_id() << " with " + << consumer_num << " consumers"; + *ret = grp; + return Status::OK; +} + void DataConsumerPool::return_consumer(std::shared_ptr consumer) { std::unique_lock l(_lock); @@ -77,6 +104,12 @@ void DataConsumerPool::return_consumer(std::shared_ptr consumer) { return; } +void DataConsumerPool::return_consumers(DataConsumerGroup* grp) { + for (std::shared_ptr consumer : grp->consumers()) { + return_consumer(consumer); + } +} + Status DataConsumerPool::start_bg_worker() { _clean_idle_consumer_thread = std::thread( [this] { diff --git a/be/src/runtime/routine_load/data_consumer_pool.h b/be/src/runtime/routine_load/data_consumer_pool.h index 1d74002db6..16808dbb23 100644 --- a/be/src/runtime/routine_load/data_consumer_pool.h +++ b/be/src/runtime/routine_load/data_consumer_pool.h @@ -28,6 +28,7 @@ namespace doris { class DataConsumer; +class DataConsumerGroup; class Status; // DataConsumerPool saves all available data consumer @@ -47,8 +48,15 @@ public: StreamLoadContext* ctx, std::shared_ptr* ret); - // erase the specified cache + // get several consumers and put them into group + Status get_consumer_grp( + StreamLoadContext* ctx, + std::shared_ptr* ret); + + // return the consumer to the pool void return_consumer(std::shared_ptr consumer); + // return the consumers in consumer group to the pool + void return_consumers(DataConsumerGroup* grp); Status start_bg_worker(); diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index 0424b98cad..ec0afa4c55 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -19,7 +19,7 @@ #include "common/status.h" #include "runtime/exec_env.h" -#include "runtime/routine_load/data_consumer.h" +#include "runtime/routine_load/data_consumer_group.h" #include "runtime/routine_load/kafka_consumer_pipe.h" #include "runtime/stream_load/stream_load_context.h" #include "runtime/stream_load/stream_load_executor.h" @@ -109,7 +109,6 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) { delete ctx; } return Status("failed to submit routine load task"); - } else { LOG(INFO) << "submit a new routine load task: " << ctx->brief() << ", current tasks num: " << _task_map.size(); @@ -134,23 +133,30 @@ void RoutineLoadTaskExecutor::exec_task( VLOG(1) << "begin to execute routine load task: " << ctx->brief(); - // get or create data consumer - std::shared_ptr consumer; - HANDLE_ERROR(consumer_pool->get_consumer(ctx, &consumer), "failed to get consumer"); + // create data consumer group + std::shared_ptr consumer_grp; + HANDLE_ERROR(consumer_pool->get_consumer_grp(ctx, &consumer_grp), "failed to get consumers"); // create and set pipe std::shared_ptr pipe; switch (ctx->load_src_type) { - case TLoadSourceType::KAFKA: + case TLoadSourceType::KAFKA: { pipe = std::make_shared(); - std::static_pointer_cast(consumer)->assign_topic_partitions(ctx); + Status st = std::static_pointer_cast(consumer_grp)->assign_topic_partitions(ctx); + if (!st.ok()) { + err_handler(ctx, st, st.get_error_msg()); + cb(ctx); + return; + } break; - default: + } + default: { std::stringstream ss; ss << "unknown routine load task type: " << ctx->load_type; err_handler(ctx, Status::CANCELLED, ss.str()); cb(ctx); return; + } } ctx->body_sink = pipe; @@ -167,16 +173,16 @@ void RoutineLoadTaskExecutor::exec_task( #endif // start to consume, this may block a while - HANDLE_ERROR(consumer->start(ctx), "consuming failed"); + HANDLE_ERROR(consumer_grp->start_all(ctx), "consuming failed"); - // wait for consumer finished + // wait for all consumers finished HANDLE_ERROR(ctx->future.get(), "consume failed"); ctx->load_cost_nanos = MonotonicNanos() - ctx->start_nanos; // return the consumer back to pool // call this before commit txn, in case the next task can come very fast - consumer_pool->return_consumer(consumer); + consumer_pool->return_consumers(consumer_grp.get()); // commit txn HANDLE_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx), "commit failed"); diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h b/be/src/runtime/routine_load/routine_load_task_executor.h index 11a54ae155..604a83e17e 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.h +++ b/be/src/runtime/routine_load/routine_load_task_executor.h @@ -38,7 +38,6 @@ class TRoutineLoadTask; // to FE finally. class RoutineLoadTaskExecutor { public: - // paramater: task id typedef std::function ExecFinishCallback; RoutineLoadTaskExecutor(ExecEnv* exec_env): diff --git a/be/src/runtime/stream_load/stream_load_context.cpp b/be/src/runtime/stream_load/stream_load_context.cpp index 30bdcd6e7d..8a4c3f588d 100644 --- a/be/src/runtime/stream_load/stream_load_context.cpp +++ b/be/src/runtime/stream_load/stream_load_context.cpp @@ -76,7 +76,7 @@ std::string StreamLoadContext::to_json() const { std::string StreamLoadContext::brief(bool detail) const { std::stringstream ss; - ss << "id=" << id << ", txn id=" << txn_id << ", label=" << label; + ss << "id=" << id << ", job id=" << job_id << ", txn id=" << txn_id << ", label=" << label; if (detail) { switch(load_src_type) { case TLoadSourceType::KAFKA: diff --git a/be/src/util/semaphore.hpp b/be/src/util/semaphore.hpp new file mode 100644 index 0000000000..398c5ae3ed --- /dev/null +++ b/be/src/util/semaphore.hpp @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +namespace { + +class Semaphore { + public: + explicit Semaphore(int count = 0) : _count(count) { + } + + void set_count(int count) { _count = count; } + + void signal() { + std::unique_lock lock(_mutex); + ++count_; + cv_.notify_one(); + } + + void wait() { + std::unique_lock lock(_mutex); + cv_.wait(lock, [=] { return _count > 0; }); + --_count; + } + + private: + std::mutex _mutex; + std::condition_variable _cv; + int _count; +}; + +} // end namespace diff --git a/docs/help/Contents/Data Manipulation/routine_load.md b/docs/help/Contents/Data Manipulation/routine_load.md index e523c02d2c..c94977fd02 100644 --- a/docs/help/Contents/Data Manipulation/routine_load.md +++ b/docs/help/Contents/Data Manipulation/routine_load.md @@ -33,7 +33,7 @@ 指定列分隔符,如: - COLUMN TERMINATED BY "," + COLUMNS TERMINATED BY "," 默认为:\t @@ -146,11 +146,17 @@ 3. kafka_partitions/kafka_offsets - 指定需要订阅的 kafka partition,以及对应的每个 partition 的起始 offset。如果没有指定,则默认从 0 开始订阅 topic 下的所有 partition。 + 指定需要订阅的 kafka partition,以及对应的每个 partition 的起始 offset。 + + offset 可以指定从大于等于 0 的具体 offset,或者: + 1) OFFSET_BEGINNING: 从有数据的位置开始订阅。 + 2) OFFSET_END: 从末尾开始订阅。 + + 如果没有指定,则默认从 OFFSET_END 开始订阅 topic 下的所有 partition。 示例: "kafka_partitions" = "0,1,2,3", - "kafka_offsets" = "101,0,0,200" + "kafka_offsets" = "101,0,OFFSET_BEGINNING,OFFSET_END" 7. 导入数据格式样例 diff --git a/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java index 8d484445e9..296a6730a2 100644 --- a/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java @@ -23,6 +23,7 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.util.Util; import org.apache.doris.load.RoutineLoadDesc; +import org.apache.doris.load.routineload.KafkaProgress; import org.apache.doris.load.routineload.LoadDataSourceType; import org.apache.doris.load.routineload.RoutineLoadJob; @@ -372,8 +373,24 @@ public class CreateRoutineLoadStmt extends DdlStmt { } for (int i = 0; i < kafkaOffsetsStringList.length; i++) { - kafkaPartitionOffsets.get(i).second = getLongValueFromString(kafkaOffsetsStringList[i], - KAFKA_OFFSETS_PROPERTY); + // defined in librdkafka/rdkafkacpp.h + // OFFSET_BEGINNING: -2 + // OFFSET_END: -1 + try { + kafkaPartitionOffsets.get(i).second = getLongValueFromString(kafkaOffsetsStringList[i], + KAFKA_OFFSETS_PROPERTY); + if (kafkaPartitionOffsets.get(i).second < 0) { + throw new AnalysisException("Cannot specify offset smaller than 0"); + } + } catch (AnalysisException e) { + if (kafkaOffsetsStringList[i].equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING)) { + kafkaPartitionOffsets.get(i).second = KafkaProgress.OFFSET_BEGINNING_VAL; + } else if (kafkaOffsetsStringList[i].equalsIgnoreCase(KafkaProgress.OFFSET_END)) { + kafkaPartitionOffsets.get(i).second = KafkaProgress.OFFSET_END_VAL; + } else { + throw e; + } + } } } } diff --git a/fe/src/main/java/org/apache/doris/load/LoadChecker.java b/fe/src/main/java/org/apache/doris/load/LoadChecker.java index 9ac727effd..5809c68571 100644 --- a/fe/src/main/java/org/apache/doris/load/LoadChecker.java +++ b/fe/src/main/java/org/apache/doris/load/LoadChecker.java @@ -28,7 +28,7 @@ import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.Tablet; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.common.Config; -import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.UserException; import org.apache.doris.common.util.Daemon; import org.apache.doris.load.AsyncDeleteJob.DeleteState; import org.apache.doris.load.FailMsg.CancelType; @@ -52,7 +52,6 @@ import org.apache.doris.thrift.TPushType; import org.apache.doris.thrift.TTaskType; import org.apache.doris.transaction.GlobalTransactionMgr; import org.apache.doris.transaction.TabletCommitInfo; -import org.apache.doris.transaction.TransactionException; import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionStatus; @@ -330,7 +329,7 @@ public class LoadChecker extends Daemon { tabletCommitInfos.add(new TabletCommitInfo(tabletId, replica.getBackendId())); } globalTransactionMgr.commitTransaction(job.getDbId(), job.getTransactionId(), tabletCommitInfos); - } catch (MetaNotFoundException | TransactionException e) { + } catch (UserException e) { LOG.warn("errors while commit transaction [{}], cancel the job {}, reason is {}", transactionState.getTransactionId(), job, e); load.cancelLoadJob(job, CancelType.UNKNOWN, transactionState.getReason()); diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java index c43a263bbb..aada743e38 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java @@ -17,12 +17,12 @@ package org.apache.doris.load.routineload; -import com.google.gson.Gson; import org.apache.doris.common.Pair; import org.apache.doris.thrift.TKafkaRLTaskProgress; import com.google.common.base.Joiner; import com.google.common.collect.Maps; +import com.google.gson.Gson; import java.io.DataInput; import java.io.DataOutput; @@ -33,12 +33,20 @@ import java.util.Map; /** * this is description of kafka routine load progress - * the data before offset was already loaded in doris + * the data before offset was already loaded in Doris */ // {"partitionIdToOffset": {}} public class KafkaProgress extends RoutineLoadProgress { + public static final String OFFSET_BEGINNING = "OFFSET_BEGINNING"; // -2 + public static final String OFFSET_END = "OFFSET_END"; // -1 + // OFFSET_ZERO is just for show info, if user specified offset is 0 + public static final String OFFSET_ZERO = "OFFSET_ZERO"; + + public static final long OFFSET_BEGINNING_VAL = -2; + public static final long OFFSET_END_VAL = -1; // (partition id, begin offset) + // the offset the next msg to be consumed private Map partitionIdToOffset = Maps.newConcurrentMap(); public KafkaProgress() { @@ -50,10 +58,6 @@ public class KafkaProgress extends RoutineLoadProgress { this.partitionIdToOffset = tKafkaRLTaskProgress.getPartitionCmtOffset(); } - public Map getPartitionIdToOffset() { - return partitionIdToOffset; - } - public Map getPartitionIdToOffset(List partitionIds) { Map result = Maps.newHashMap(); for (Map.Entry entry : partitionIdToOffset.entrySet()) { @@ -70,33 +74,59 @@ public class KafkaProgress extends RoutineLoadProgress { partitionIdToOffset.put(partitionOffset.first, partitionOffset.second); } + public Long getOffsetByPartition(int kafkaPartition) { + return partitionIdToOffset.get(kafkaPartition); + } + + public boolean containsPartition(Integer kafkaPartition) { + return partitionIdToOffset.containsKey(kafkaPartition); + } + + public boolean hasPartition() { + return partitionIdToOffset.isEmpty(); + } + // (partition id, end offset) - // end offset = -1 while begin offset of partition is 0 + // OFFSET_ZERO: user set offset == 0, no committed msg + // OFFSET_END: user set offset = OFFSET_END, no committed msg + // OFFSET_BEGINNING: user set offset = OFFSET_BEGINNING, no committed msg + // other: current committed msg's offset + private void getReadableProgress(Map showPartitionIdToOffset) { + for (Map.Entry entry : partitionIdToOffset.entrySet()) { + if (entry.getValue() == 0) { + showPartitionIdToOffset.put(entry.getKey(), OFFSET_ZERO); + } else if (entry.getValue() == -1) { + showPartitionIdToOffset.put(entry.getKey(), OFFSET_END); + } else if (entry.getValue() == -2) { + showPartitionIdToOffset.put(entry.getKey(), OFFSET_BEGINNING); + } else { + showPartitionIdToOffset.put(entry.getKey(), "" + (entry.getValue() - 1)); + } + } + } + @Override public String toString() { - Map showPartitionIdToOffset = new HashMap<>(); - for (Map.Entry entry : partitionIdToOffset.entrySet()) { - showPartitionIdToOffset.put(entry.getKey(), entry.getValue() - 1); - } + Map showPartitionIdToOffset = Maps.newHashMap(); + getReadableProgress(showPartitionIdToOffset); return "KafkaProgress [partitionIdToOffset=" + Joiner.on("|").withKeyValueSeparator("_").join(showPartitionIdToOffset) + "]"; } @Override - public void update(RoutineLoadProgress progress) { - KafkaProgress newProgress = (KafkaProgress) progress; - newProgress.getPartitionIdToOffset().entrySet().parallelStream() - .forEach(entity -> partitionIdToOffset.put(entity.getKey(), entity.getValue() + 1)); + public String toJsonString() { + Map showPartitionIdToOffset = Maps.newHashMap(); + getReadableProgress(showPartitionIdToOffset); + Gson gson = new Gson(); + return gson.toJson(showPartitionIdToOffset); } @Override - public String toJsonString() { - Map showPartitionIdToOffset = new HashMap<>(); - for (Map.Entry entry : partitionIdToOffset.entrySet()) { - showPartitionIdToOffset.put(entry.getKey(), entry.getValue() - 1); - } - Gson gson = new Gson(); - return gson.toJson(showPartitionIdToOffset); + public void update(RoutineLoadProgress progress) { + KafkaProgress newProgress = (KafkaProgress) progress; + // + 1 to point to the next msg offset to be consumed + newProgress.partitionIdToOffset.entrySet().stream() + .forEach(entity -> this.partitionIdToOffset.put(entity.getKey(), entity.getValue() + 1)); } @Override diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index 1c533b9c43..626568e372 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -32,11 +32,9 @@ import org.apache.doris.common.io.Text; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; -import org.apache.doris.load.RoutineLoadDesc; import org.apache.doris.system.SystemInfoService; import org.apache.doris.transaction.BeginTransactionException; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -101,7 +99,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { } @Override - public void divideRoutineLoadJob(int currentConcurrentTaskNum) { + public void divideRoutineLoadJob(int currentConcurrentTaskNum) throws UserException { List result = new ArrayList<>(); writeLock(); try { @@ -113,7 +111,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { if (j % currentConcurrentTaskNum == i) { int kafkaPartition = currentKafkaPartitions.get(j); taskKafkaProgress.put(kafkaPartition, - ((KafkaProgress) progress).getPartitionIdToOffset().get(kafkaPartition)); + ((KafkaProgress) progress).getOffsetByPartition(kafkaPartition)); } } KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), id, clusterName, taskKafkaProgress); @@ -139,15 +137,18 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { SystemInfoService systemInfoService = Catalog.getCurrentSystemInfo(); int aliveBeNum = systemInfoService.getClusterBackendIds(clusterName, true).size(); int partitionNum = currentKafkaPartitions.size(); + // 3 partitions one tasks + int maxPartitionDivision = (int) Math.ceil(partitionNum / 3.0); if (desireTaskConcurrentNum == 0) { - desireTaskConcurrentNum = partitionNum; + desireTaskConcurrentNum = maxPartitionDivision; } - LOG.info("current concurrent task number is min " - + "(current size of partition {}, desire task concurrent num {}, alive be num {})", - partitionNum, desireTaskConcurrentNum, aliveBeNum); + LOG.info("current concurrent task number is min" + + "(max partition division {}, desire task concurrent num {}, alive be num {})", + maxPartitionDivision, desireTaskConcurrentNum, aliveBeNum); currentTaskConcurrentNum = - Math.min(Math.min(partitionNum, Math.min(desireTaskConcurrentNum, aliveBeNum)), DEFAULT_TASK_MAX_CONCURRENT_NUM); + Math.min(Math.min(maxPartitionDivision, Math.min(desireTaskConcurrentNum, aliveBeNum)), + DEFAULT_TASK_MAX_CONCURRENT_NUM); return currentTaskConcurrentNum; } @@ -159,7 +160,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { @Override protected boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) { if (rlTaskTxnCommitAttachment.getLoadedRows() > 0 - && ((KafkaProgress) rlTaskTxnCommitAttachment.getProgress()).getPartitionIdToOffset().isEmpty()) { + && ((KafkaProgress) rlTaskTxnCommitAttachment.getProgress()).hasPartition()) { LOG.warn(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, DebugUtil.printId(rlTaskTxnCommitAttachment.getTaskId())) .add("job_id", id) .add("loaded_rows", rlTaskTxnCommitAttachment.getLoadedRows()) @@ -171,7 +172,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { } @Override - protected void updateProgress(RLTaskTxnCommitAttachment attachment) { + protected void updateProgress(RLTaskTxnCommitAttachment attachment) throws UserException { super.updateProgress(attachment); this.progress.update(attachment.getProgress()); } @@ -188,7 +189,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { KafkaTaskInfo oldKafkaTaskInfo = (KafkaTaskInfo) routineLoadTaskInfo; // add new task KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(oldKafkaTaskInfo, - ((KafkaProgress)progress).getPartitionIdToOffset(oldKafkaTaskInfo.getPartitions())); + ((KafkaProgress) progress).getPartitionIdToOffset(oldKafkaTaskInfo.getPartitions())); // remove old task routineLoadTaskInfoList.remove(routineLoadTaskInfo); // add new task @@ -207,7 +208,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { // update current kafka partition at the same time // current kafka partitions = customKafkaPartitions == 0 ? all of partition of kafka topic : customKafkaPartitions @Override - protected boolean unprotectNeedReschedule() { + protected boolean unprotectNeedReschedule() throws UserException { // only running and need_schedule job need to be changed current kafka partitions if (this.state == JobState.RUNNING || this.state == JobState.NEED_SCHEDULE) { if (customKafkaPartitions != null && customKafkaPartitions.size() != 0) { @@ -247,7 +248,6 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { .build()); return true; } - } } else { LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) @@ -266,7 +266,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { summary.put("errorRows", Long.valueOf(errorRows)); summary.put("unselectedRows", Long.valueOf(unselectedRows)); summary.put("receivedBytes", Long.valueOf(receivedBytes)); - summary.put("taskExecuteTaskMs", Long.valueOf(totalTaskExcutionTimeMs)); + summary.put("taskExecuteTimeMs", Long.valueOf(totalTaskExcutionTimeMs)); summary.put("receivedBytesRate", Long.valueOf(receivedBytes / totalTaskExcutionTimeMs * 1000)); summary.put("loadRowsRate", Long.valueOf((totalRows - errorRows - unselectedRows) / totalTaskExcutionTimeMs * 1000)); summary.put("committedTaskNum", Long.valueOf(committedTaskNum)); @@ -315,12 +315,13 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { private void updateNewPartitionProgress() { // update the progress of new partitions for (Integer kafkaPartition : currentKafkaPartitions) { - if (!((KafkaProgress) progress).getPartitionIdToOffset().containsKey(kafkaPartition)) { - ((KafkaProgress) progress).getPartitionIdToOffset().put(kafkaPartition, 0L); + if (!((KafkaProgress) progress).containsPartition(kafkaPartition)) { + // if offset is not assigned, start from OFFSET_END + ((KafkaProgress) progress).addPartitionOffset(Pair.create(kafkaPartition, KafkaProgress.OFFSET_END_VAL)); LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) - .add("kafka_partition_id", kafkaPartition) - .add("begin_offset", 0) - .add("msg", "The new partition has been added in job")); + .add("kafka_partition_id", kafkaPartition) + .add("begin_offset", KafkaProgress.OFFSET_END) + .add("msg", "The new partition has been added in job")); } } } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java index 9f1dcd11bd..6a4d8d48fd 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java @@ -17,8 +17,6 @@ package org.apache.doris.load.routineload; -import com.google.common.collect.Lists; -import com.google.gson.Gson; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.common.AnalysisException; @@ -34,7 +32,7 @@ import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.BeginTransactionException; import com.google.common.base.Joiner; -import com.google.common.collect.Maps; +import com.google.gson.Gson; import java.util.ArrayList; import java.util.List; diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index fc955cad38..4367ac5c07 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -411,7 +411,7 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable } } - abstract void divideRoutineLoadJob(int currentConcurrentTaskNum); + abstract void divideRoutineLoadJob(int currentConcurrentTaskNum) throws UserException; public int calculateCurrentConcurrentTaskNum() throws MetaNotFoundException { return 0; @@ -448,31 +448,30 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable } // All of private method could not be call without lock - private void checkStateTransform(RoutineLoadJob.JobState desireState) - throws UnsupportedOperationException { + private void checkStateTransform(RoutineLoadJob.JobState desireState) throws UserException { switch (state) { case PAUSED: if (desireState == JobState.PAUSED) { - throw new UnsupportedOperationException("Could not transform " + state + " to " + desireState); + throw new DdlException("Could not transform " + state + " to " + desireState); } break; case STOPPED: case CANCELLED: - throw new UnsupportedOperationException("Could not transform " + state + " to " + desireState); + throw new DdlException("Could not transform " + state + " to " + desireState); default: break; } } // if rate of error data is more then max_filter_ratio, pause job - protected void updateProgress(RLTaskTxnCommitAttachment attachment) { + protected void updateProgress(RLTaskTxnCommitAttachment attachment) throws UserException { updateNumOfData(attachment.getTotalRows(), attachment.getFilteredRows(), attachment.getUnselectedRows(), attachment.getReceivedBytes(), attachment.getTaskExecutionTimeMs(), false /* not replay */); } private void updateNumOfData(long numOfTotalRows, long numOfErrorRows, long unselectedRows, long receivedBytes, - long taskExecutionTime, boolean isReplay) { + long taskExecutionTime, boolean isReplay) throws UserException { this.totalRows += numOfTotalRows; this.errorRows += numOfErrorRows; this.unselectedRows += unselectedRows; @@ -536,8 +535,12 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable } protected void replayUpdateProgress(RLTaskTxnCommitAttachment attachment) { - updateNumOfData(attachment.getTotalRows(), attachment.getFilteredRows(), attachment.getUnselectedRows(), - attachment.getReceivedBytes(), attachment.getTaskExecutionTimeMs(), true /* is replay */); + try { + updateNumOfData(attachment.getTotalRows(), attachment.getFilteredRows(), attachment.getUnselectedRows(), + attachment.getReceivedBytes(), attachment.getTaskExecutionTimeMs(), true /* is replay */); + } catch (UserException e) { + LOG.error("should not happen", e); + } } abstract RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo routineLoadTaskInfo) throws AnalysisException, @@ -601,7 +604,7 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable // the task is committed when the correct number of rows is more then 0 @Override - public ListenResult onCommitted(TransactionState txnState) throws TransactionException { + public ListenResult onCommitted(TransactionState txnState) throws UserException { ListenResult result = ListenResult.UNCHANGED; long taskBeId = -1L; writeLock(); @@ -649,7 +652,7 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable // txn will be aborted but progress will be update // progress will be update otherwise the progress will be hung @Override - public ListenResult onAborted(TransactionState txnState, String txnStatusChangeReasonString) { + public ListenResult onAborted(TransactionState txnState, String txnStatusChangeReasonString) throws UserException { ListenResult result = ListenResult.UNCHANGED; long taskBeId = -1L; writeLock(); @@ -670,7 +673,7 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable if (txnStatusChangeReason != null) { switch (txnStatusChangeReason) { case OFFSET_OUT_OF_RANGE: - updateState(JobState.CANCELLED, "be " + taskBeId + " abort task " + updateState(JobState.PAUSED, "be " + taskBeId + " abort task " + "with reason " + txnStatusChangeReason.toString(), false /* not replay */); return result; default: @@ -712,7 +715,7 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable // check task exists or not before call method private ListenResult executeCommitTask(RoutineLoadTaskInfo routineLoadTaskInfo, TransactionState txnState) - throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException { + throws UserException { ListenResult result = ListenResult.UNCHANGED; // step0: get progress from transaction state RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnState.getTxnCommitAttachment(); @@ -770,7 +773,7 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable // columns will be checked when planing } - public void updateState(JobState jobState, String reason, boolean isReplay) { + public void updateState(JobState jobState, String reason, boolean isReplay) throws UserException { writeLock(); try { unprotectUpdateState(jobState, reason, isReplay); @@ -779,7 +782,7 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable } } - protected void unprotectUpdateState(JobState jobState, String reason, boolean isReplay) { + protected void unprotectUpdateState(JobState jobState, String reason, boolean isReplay) throws UserException { LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) .add("current_job_state", getState()) .add("desire_job_state", jobState) @@ -849,7 +852,7 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable endTimestamp = System.currentTimeMillis(); } - public void update() { + public void update() throws UserException { // check if db and table exist Database database = Catalog.getCurrentCatalog().getDb(dbId); if (database == null) { @@ -906,7 +909,7 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable protected void unprotectUpdateProgress() { } - protected boolean unprotectNeedReschedule() { + protected boolean unprotectNeedReschedule() throws UserException { return false; } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index 13794008e0..1a429acce8 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -23,7 +23,6 @@ import org.apache.doris.analysis.ResumeRoutineLoadStmt; import org.apache.doris.analysis.StopRoutineLoadStmt; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; -import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; @@ -48,7 +47,6 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -198,7 +196,7 @@ public class RoutineLoadManager implements Writable { } public void pauseRoutineLoadJob(PauseRoutineLoadStmt pauseRoutineLoadStmt) - throws DdlException, AnalysisException, MetaNotFoundException { + throws UserException { RoutineLoadJob routineLoadJob = getJob(pauseRoutineLoadStmt.getDbFullName(), pauseRoutineLoadStmt.getName()); if (routineLoadJob == null) { throw new DdlException("There is not operable routine load job with name " + pauseRoutineLoadStmt.getName()); @@ -232,8 +230,7 @@ public class RoutineLoadManager implements Writable { .build()); } - public void resumeRoutineLoadJob(ResumeRoutineLoadStmt resumeRoutineLoadStmt) throws DdlException, - AnalysisException, MetaNotFoundException { + public void resumeRoutineLoadJob(ResumeRoutineLoadStmt resumeRoutineLoadStmt) throws UserException { RoutineLoadJob routineLoadJob = getJob(resumeRoutineLoadStmt.getDBFullName(), resumeRoutineLoadStmt.getName()); if (routineLoadJob == null) { throw new DdlException("There is not operable routine load job with name " + resumeRoutineLoadStmt.getName() + "."); @@ -265,7 +262,7 @@ public class RoutineLoadManager implements Writable { } public void stopRoutineLoadJob(StopRoutineLoadStmt stopRoutineLoadStmt) - throws DdlException, AnalysisException, MetaNotFoundException { + throws UserException { RoutineLoadJob routineLoadJob = getJob(stopRoutineLoadStmt.getDBFullName(), stopRoutineLoadStmt.getName()); if (routineLoadJob == null) { throw new DdlException("There is not operable routine load job with name " + stopRoutineLoadStmt.getName()); @@ -554,7 +551,7 @@ public class RoutineLoadManager implements Writable { } } - public void updateRoutineLoadJob() { + public void updateRoutineLoadJob() throws UserException { for (RoutineLoadJob routineLoadJob : idToRoutineLoadJob.values()) { if (!routineLoadJob.state.isFinalState()) { routineLoadJob.update(); @@ -571,7 +568,11 @@ public class RoutineLoadManager implements Writable { public void replayChangeRoutineLoadJob(RoutineLoadOperation operation) { RoutineLoadJob job = getJob(operation.getId()); - job.updateState(operation.getJobState(), null, true /* is replay */); + try { + job.updateState(operation.getJobState(), null, true /* is replay */); + } catch (UserException e) { + LOG.error("should not happend", e); + } LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, operation.getId()) .add("current_state", operation.getJobState()) .add("msg", "replay change routine load job") diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java index 1ec68c352a..06c18ee352 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java @@ -20,6 +20,7 @@ package org.apache.doris.load.routineload; import org.apache.doris.catalog.Catalog; import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.UserException; import org.apache.doris.common.util.Daemon; import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; @@ -58,7 +59,7 @@ public class RoutineLoadScheduler extends Daemon { } } - private void process() { + private void process() throws UserException { // update routineLoadManager.updateRoutineLoadJob(); // get need schedule routine jobs diff --git a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index 99648d5ca6..a122807145 100644 --- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -200,7 +200,7 @@ public class GlobalTransactionMgr { } public void commitTransaction(long dbId, long transactionId, List tabletCommitInfos) - throws MetaNotFoundException, TransactionException { + throws UserException { commitTransaction(dbId, transactionId, tabletCommitInfos, null); } @@ -216,14 +216,14 @@ public class GlobalTransactionMgr { * @param transactionId * @param tabletCommitInfos * @return - * @throws MetaNotFoundException + * @throws UserException * @throws TransactionCommitFailedException * @note it is necessary to optimize the `lock` mechanism and `lock` scope resulting from wait lock long time * @note callers should get db.write lock before call this api */ public void commitTransaction(long dbId, long transactionId, List tabletCommitInfos, TxnCommitAttachment txnCommitAttachment) - throws MetaNotFoundException, TransactionException { + throws UserException { if (Config.disable_load_job) { throw new TransactionCommitFailedException("disable_load_job is set to true, all load jobs are prevented"); } @@ -402,14 +402,14 @@ public class GlobalTransactionMgr { public boolean commitAndPublishTransaction(Database db, long transactionId, List tabletCommitInfos, long timeoutMillis) - throws MetaNotFoundException, TransactionException { + throws UserException { return commitAndPublishTransaction(db, transactionId, tabletCommitInfos, timeoutMillis, null); } public boolean commitAndPublishTransaction(Database db, long transactionId, List tabletCommitInfos, long timeoutMillis, TxnCommitAttachment txnCommitAttachment) - throws MetaNotFoundException, TransactionException { + throws UserException { db.writeLock(); try { commitTransaction(db.getId(), transactionId, tabletCommitInfos, txnCommitAttachment); @@ -483,7 +483,7 @@ public class GlobalTransactionMgr { * get all txns which is ready to publish * a ready-to-publish txn's partition's visible version should be ONE less than txn's commit version. */ - public List getReadyToPublishTransactions() { + public List getReadyToPublishTransactions() throws UserException { List readyPublishTransactionState = new ArrayList<>(); List allCommittedTransactionState = null; writeLock(); @@ -582,7 +582,7 @@ public class GlobalTransactionMgr { LOG.warn("db is dropped during transaction, abort transaction {}", transactionState); unprotectUpsertTransactionState(transactionState); return; - } catch (TransactionException e) { + } catch (UserException e) { LOG.warn("failed to change transaction {} status to aborted", transactionState.getTransactionId()); } finally { writeUnlock(); @@ -703,7 +703,7 @@ public class GlobalTransactionMgr { transactionState.setFinishTime(System.currentTimeMillis()); transactionState.setTransactionStatus(TransactionStatus.VISIBLE); unprotectUpsertTransactionState(transactionState); - } catch (TransactionException e) { + } catch (UserException e) { LOG.warn("failed to change transaction {} status to visible", transactionState.getTransactionId()); } finally { writeUnlock(); @@ -801,7 +801,7 @@ public class GlobalTransactionMgr { try { transactionState.setTransactionStatus(TransactionStatus.ABORTED, TransactionState.TxnStatusChangeReason.TIMEOUT.name()); - } catch (TransactionException e) { + } catch (UserException e) { LOG.warn("txn {} could not be aborted with error message {}", transactionState.getTransactionId(), e.getMessage()); continue; diff --git a/fe/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java b/fe/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java index 4eece8705c..24ee3d2758 100644 --- a/fe/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java +++ b/fe/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java @@ -21,6 +21,7 @@ import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.common.Config; +import org.apache.doris.common.UserException; import org.apache.doris.common.util.Daemon; import org.apache.doris.task.AgentBatchTask; import org.apache.doris.task.AgentTaskExecutor; @@ -57,7 +58,7 @@ public class PublishVersionDaemon extends Daemon { } } - private void publishVersion() { + private void publishVersion() throws UserException { GlobalTransactionMgr globalTransactionMgr = Catalog.getCurrentGlobalTransactionMgr(); List readyTransactionStates = globalTransactionMgr.getReadyToPublishTransactions(); if (readyTransactionStates == null || readyTransactionStates.isEmpty()) { diff --git a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java index d09ab522b3..bf3a9bfb09 100644 --- a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -20,6 +20,7 @@ package org.apache.doris.transaction; import org.apache.doris.catalog.Catalog; import org.apache.doris.common.Config; import org.apache.doris.common.FeMetaVersion; +import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.metric.MetricRepo; @@ -268,12 +269,12 @@ public class TransactionState implements Writable { } public void setTransactionStatus(TransactionStatus transactionStatus) - throws TransactionException { + throws UserException { setTransactionStatus(transactionStatus, null); } public void setTransactionStatus(TransactionStatus transactionStatus, String txnStatusChangeReason) - throws TransactionException { + throws UserException { // before status changed TxnStateChangeListener listener = Catalog.getCurrentGlobalTransactionMgr().getListenerRegistry().getListener(listenerId); if (listener != null) { diff --git a/fe/src/main/java/org/apache/doris/transaction/TxnStateChangeListener.java b/fe/src/main/java/org/apache/doris/transaction/TxnStateChangeListener.java index e286d8a951..59b643b115 100644 --- a/fe/src/main/java/org/apache/doris/transaction/TxnStateChangeListener.java +++ b/fe/src/main/java/org/apache/doris/transaction/TxnStateChangeListener.java @@ -17,6 +17,8 @@ package org.apache.doris.transaction; +import org.apache.doris.common.UserException; + public interface TxnStateChangeListener { public enum ListenResult { @@ -32,7 +34,7 @@ public interface TxnStateChangeListener { * * @param txnState */ - public ListenResult onCommitted(TransactionState txnState) throws TransactionException; + public ListenResult onCommitted(TransactionState txnState) throws UserException; public void replayOnCommitted(TransactionState txnState); @@ -55,7 +57,7 @@ public interface TxnStateChangeListener { * maybe null * @return */ - public ListenResult onAborted(TransactionState txnState, String txnStatusChangeReason); + public ListenResult onAborted(TransactionState txnState, String txnStatusChangeReason) throws UserException; public void replayOnAborted(TransactionState txnState); } diff --git a/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java b/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java index 056995b42e..647d61bf18 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java @@ -17,7 +17,6 @@ package org.apache.doris.load.routineload; -import com.sleepycat.je.tree.IN; import org.apache.doris.analysis.ColumnSeparator; import org.apache.doris.analysis.CreateRoutineLoadStmt; import org.apache.doris.analysis.LabelName; @@ -33,7 +32,6 @@ import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.Pair; -import org.apache.doris.common.SystemIdGenerator; import org.apache.doris.common.UserException; import org.apache.doris.load.RoutineLoadDesc; import org.apache.doris.qe.ConnectContext; @@ -41,7 +39,6 @@ import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TResourceInfo; import org.apache.doris.transaction.BeginTransactionException; import org.apache.doris.transaction.GlobalTransactionMgr; -import org.apache.doris.transaction.TransactionState; import com.google.common.base.Joiner; import com.google.common.collect.Lists; @@ -60,14 +57,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.UUID; import mockit.Deencapsulation; import mockit.Expectations; import mockit.Injectable; -import mockit.Mock; -import mockit.MockUp; import mockit.Mocked; import mockit.Verifications; @@ -111,28 +105,56 @@ public class KafkaRoutineLoadJobTest { @Mocked SystemInfoService systemInfoService, @Mocked Database database, @Mocked RoutineLoadDesc routineLoadDesc) throws MetaNotFoundException { - List partitionList = new ArrayList<>(); - partitionList.add(1); - partitionList.add(2); - List beIds = Lists.newArrayList(1L); + List partitionList1 = Lists.newArrayList(1, 2); + List partitionList2 = Lists.newArrayList(1, 2, 3); + List partitionList3 = Lists.newArrayList(1, 2, 3, 4); + List partitionList4 = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7); + List beIds1 = Lists.newArrayList(1L); + List beIds2 = Lists.newArrayList(1L, 2L, 3L, 4L); - String clusterName = "default"; + String clusterName1 = "default1"; + String clusterName2 = "default2"; new Expectations() { { Catalog.getCurrentSystemInfo(); result = systemInfoService; - systemInfoService.getClusterBackendIds(clusterName, true); - result = beIds; + systemInfoService.getClusterBackendIds(clusterName1, true); + result = beIds1; + systemInfoService.getClusterBackendIds(clusterName2, true); + result = beIds2; + minTimes = 0; } }; + // 2 partitions, 1 be RoutineLoadJob routineLoadJob = - new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", clusterName, 1L, + new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", clusterName1, 1L, 1L, "127.0.0.1:9020", "topic1"); Deencapsulation.setField(routineLoadJob, "consumer", kafkaConsumer); - Deencapsulation.setField(routineLoadJob, "currentKafkaPartitions", partitionList); + Deencapsulation.setField(routineLoadJob, "currentKafkaPartitions", partitionList1); Assert.assertEquals(1, routineLoadJob.calculateCurrentConcurrentTaskNum()); + + // 3 partitions, 4 be + routineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", clusterName2, 1L, + 1L, "127.0.0.1:9020", "topic1"); + Deencapsulation.setField(routineLoadJob, "consumer", kafkaConsumer); + Deencapsulation.setField(routineLoadJob, "currentKafkaPartitions", partitionList2); + Assert.assertEquals(1, routineLoadJob.calculateCurrentConcurrentTaskNum()); + + // 4 partitions, 4 be + routineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", clusterName2, 1L, + 1L, "127.0.0.1:9020", "topic1"); + Deencapsulation.setField(routineLoadJob, "consumer", kafkaConsumer); + Deencapsulation.setField(routineLoadJob, "currentKafkaPartitions", partitionList3); + Assert.assertEquals(2, routineLoadJob.calculateCurrentConcurrentTaskNum()); + + // 7 partitions, 4 be + routineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", clusterName2, 1L, + 1L, "127.0.0.1:9020", "topic1"); + Deencapsulation.setField(routineLoadJob, "consumer", kafkaConsumer); + Deencapsulation.setField(routineLoadJob, "currentKafkaPartitions", partitionList4); + Assert.assertEquals(3, routineLoadJob.calculateCurrentConcurrentTaskNum()); } @@ -143,7 +165,7 @@ public class KafkaRoutineLoadJobTest { @Injectable RoutineLoadManager routineLoadManager, @Injectable RoutineLoadTaskScheduler routineLoadTaskScheduler, @Mocked RoutineLoadDesc routineLoadDesc) - throws BeginTransactionException, LabelAlreadyUsedException, AnalysisException { + throws UserException { RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", "default", 1L, diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java index 0034ede218..e936d65fda 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java @@ -17,7 +17,8 @@ package org.apache.doris.load.routineload; -import com.sleepycat.je.tree.IN; +import static mockit.Deencapsulation.invoke; + import org.apache.doris.analysis.ColumnSeparator; import org.apache.doris.analysis.CreateRoutineLoadStmt; import org.apache.doris.analysis.LabelName; @@ -56,8 +57,6 @@ import mockit.Mock; import mockit.MockUp; import mockit.Mocked; -import static mockit.Deencapsulation.invoke; - public class RoutineLoadManagerTest { private static final Logger LOG = LogManager.getLogger(RoutineLoadManagerTest.class); diff --git a/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java index 2a9de39bd4..2738f51882 100644 --- a/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java +++ b/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java @@ -30,10 +30,9 @@ import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.Tablet; import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.DdlException; import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.LabelAlreadyUsedException; -import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.UserException; import org.apache.doris.load.routineload.KafkaProgress; import org.apache.doris.load.routineload.KafkaRoutineLoadJob; import org.apache.doris.load.routineload.KafkaTaskInfo; @@ -151,10 +150,7 @@ public class GlobalTransactionMgrTest { // all replica committed success @Test - public void testCommitTransaction1() throws MetaNotFoundException, - TransactionException, - IllegalTransactionParameterException, LabelAlreadyUsedException, - AnalysisException, BeginTransactionException { + public void testCommitTransaction1() throws UserException { FakeCatalog.setCatalog(masterCatalog); long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, CatalogTestUtil.testTxnLable1, @@ -194,10 +190,7 @@ public class GlobalTransactionMgrTest { // commit with only two replicas @Test - public void testCommitTransactionWithOneFailed() throws MetaNotFoundException, - TransactionException, - IllegalTransactionParameterException, LabelAlreadyUsedException, - AnalysisException, BeginTransactionException { + public void testCommitTransactionWithOneFailed() throws UserException { TransactionState transactionState = null; FakeCatalog.setCatalog(masterCatalog); long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, @@ -301,7 +294,7 @@ public class GlobalTransactionMgrTest { public void testCommitRoutineLoadTransaction(@Injectable TabletCommitInfo tabletCommitInfo, @Mocked KafkaConsumer kafkaConsumer, @Mocked EditLog editLog) - throws MetaNotFoundException, TransactionException, DdlException { + throws UserException { FakeCatalog.setCatalog(masterCatalog); TabletCommitInfo tabletCommitInfo1 = new TabletCommitInfo(CatalogTestUtil.testTabletId1, CatalogTestUtil.testBackendId1); @@ -354,7 +347,7 @@ public class GlobalTransactionMgrTest { Assert.assertEquals(Long.valueOf(101), Deencapsulation.getField(routineLoadJob, "currentTotalRows")); Assert.assertEquals(Long.valueOf(1), Deencapsulation.getField(routineLoadJob, "currentErrorRows")); - Assert.assertEquals(Long.valueOf(11L), ((KafkaProgress) routineLoadJob.getProgress()).getPartitionIdToOffset().get(1)); + Assert.assertEquals(Long.valueOf(11L), ((KafkaProgress) routineLoadJob.getProgress()).getOffsetByPartition(1)); // todo(ml): change to assert queue // Assert.assertEquals(1, routineLoadManager.getNeedScheduleTasksQueue().size()); // Assert.assertNotEquals("label", routineLoadManager.getNeedScheduleTasksQueue().peek().getId()); @@ -364,7 +357,7 @@ public class GlobalTransactionMgrTest { public void testCommitRoutineLoadTransactionWithErrorMax(@Injectable TabletCommitInfo tabletCommitInfo, @Mocked EditLog editLog, @Mocked KafkaConsumer kafkaConsumer) - throws TransactionException, MetaNotFoundException, DdlException { + throws UserException { FakeCatalog.setCatalog(masterCatalog); @@ -418,16 +411,13 @@ public class GlobalTransactionMgrTest { Assert.assertEquals(Long.valueOf(0), Deencapsulation.getField(routineLoadJob, "currentTotalRows")); Assert.assertEquals(Long.valueOf(0), Deencapsulation.getField(routineLoadJob, "currentErrorRows")); Assert.assertEquals(Long.valueOf(11L), - ((KafkaProgress) routineLoadJob.getProgress()).getPartitionIdToOffset().get(1)); + ((KafkaProgress) routineLoadJob.getProgress()).getOffsetByPartition(1)); // todo(ml): change to assert queue // Assert.assertEquals(0, routineLoadManager.getNeedScheduleTasksQueue().size()); Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, routineLoadJob.getState()); } - - public void testFinishTransaction() throws MetaNotFoundException, TransactionException, - IllegalTransactionParameterException, LabelAlreadyUsedException, - AnalysisException, BeginTransactionException { + public void testFinishTransaction() throws UserException { long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, CatalogTestUtil.testTxnLable1, transactionSource, @@ -468,10 +458,7 @@ public class GlobalTransactionMgrTest { } @Test - public void testFinishTransactionWithOneFailed() throws MetaNotFoundException, - TransactionException, - IllegalTransactionParameterException, LabelAlreadyUsedException, - AnalysisException, BeginTransactionException { + public void testFinishTransactionWithOneFailed() throws UserException { TransactionState transactionState = null; Partition testPartition = masterCatalog.getDb(CatalogTestUtil.testDbId1).getTable(CatalogTestUtil.testTableId1) .getPartition(CatalogTestUtil.testPartition1);