Replace all remaining boost::split() with strings::split() (#2302)
This commit is contained in:
@ -22,6 +22,8 @@
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "gutil/strings/split.h"
|
||||
|
||||
#include "common/status.h"
|
||||
#include "service/backend_options.h"
|
||||
#include "runtime/small_file_mgr.h"
|
||||
@ -34,14 +36,14 @@ namespace doris {
|
||||
// init kafka consumer will only set common configs such as
|
||||
// brokers, groupid
|
||||
Status KafkaDataConsumer::init(StreamLoadContext* ctx) {
|
||||
std::unique_lock<std::mutex> l(_lock);
|
||||
std::unique_lock<std::mutex> l(_lock);
|
||||
if (_init) {
|
||||
// this consumer has already been initialized.
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
|
||||
|
||||
|
||||
// conf has to be deleted finally
|
||||
auto conf_deleter = [conf] () { delete conf; };
|
||||
DeferOp delete_conf(std::bind<void>(conf_deleter));
|
||||
@ -84,8 +86,8 @@ Status KafkaDataConsumer::init(StreamLoadContext* ctx) {
|
||||
for (auto& item : ctx->kafka_info->properties) {
|
||||
if (boost::algorithm::starts_with(item.second, "FILE:")) {
|
||||
// file property should has format: FILE:file_id:md5
|
||||
std::vector<std::string> parts;
|
||||
boost::split(parts, item.second, boost::is_any_of(":"));
|
||||
std::vector<std::string> parts = strings::Split(
|
||||
item.second, ":", strings::SkipWhitespace());
|
||||
if (parts.size() != 3) {
|
||||
return Status::InternalError("PAUSE: Invalid file property of kafka: " + item.second);
|
||||
}
|
||||
@ -94,7 +96,8 @@ Status KafkaDataConsumer::init(StreamLoadContext* ctx) {
|
||||
Status st = ctx->exec_env()->small_file_mgr()->get_file(file_id, parts[2], &file_path);
|
||||
if (!st.ok()) {
|
||||
std::stringstream ss;
|
||||
ss << "PAUSE: failed to get file for config: " << item.first << ", error: " << st.get_error_msg();
|
||||
ss << "PAUSE: failed to get file for config: " << item.first
|
||||
<< ", error: " << st.get_error_msg();
|
||||
return Status::InternalError(ss.str());
|
||||
}
|
||||
RETURN_IF_ERROR(set_conf(item.first, file_path));
|
||||
@ -112,7 +115,7 @@ Status KafkaDataConsumer::init(StreamLoadContext* ctx) {
|
||||
}
|
||||
|
||||
// create consumer
|
||||
_k_consumer = RdKafka::KafkaConsumer::create(conf, errstr);
|
||||
_k_consumer = RdKafka::KafkaConsumer::create(conf, errstr);
|
||||
if (!_k_consumer) {
|
||||
LOG(WARNING) << "PAUSE: failed to create kafka consumer: " << errstr;
|
||||
return Status::InternalError("PAUSE: failed to create kafka consumer: " + errstr);
|
||||
@ -263,7 +266,7 @@ Status KafkaDataConsumer::get_partition_meta(std::vector<int32_t>* partition_ids
|
||||
if ((*it)->topic() != _topic) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
if ((*it)->err() != RdKafka::ERR_NO_ERROR) {
|
||||
std::stringstream ss;
|
||||
ss << "error: " << err2str((*it)->err());
|
||||
@ -284,7 +287,7 @@ Status KafkaDataConsumer::get_partition_meta(std::vector<int32_t>* partition_ids
|
||||
return Status::InternalError("no partition in this topic");
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status KafkaDataConsumer::cancel(StreamLoadContext* ctx) {
|
||||
@ -309,7 +312,7 @@ Status KafkaDataConsumer::commit(std::vector<RdKafka::TopicPartition*>& offset)
|
||||
if (err != RdKafka::ERR_NO_ERROR) {
|
||||
std::stringstream ss;
|
||||
ss << "failed to commit kafka offset : " << RdKafka::err2str(err);
|
||||
return Status::InternalError(ss.str());
|
||||
return Status::InternalError(ss.str());
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user