Support SSL authentication with Kafka in routine load job (#1235)
This commit is contained in:
@ -24,6 +24,7 @@
|
||||
|
||||
#include "common/status.h"
|
||||
#include "service/backend_options.h"
|
||||
#include "runtime/small_file_mgr.h"
|
||||
#include "util/defer_op.h"
|
||||
#include "util/stopwatch.hpp"
|
||||
#include "util/uid_util.h"
|
||||
@ -52,9 +53,17 @@ Status KafkaDataConsumer::init(StreamLoadContext* ctx) {
|
||||
|
||||
std::string errstr;
|
||||
auto set_conf = [&conf, &errstr](const std::string& conf_key, const std::string& conf_val) {
|
||||
if (conf->set(conf_key, conf_val, errstr) != RdKafka::Conf::CONF_OK) {
|
||||
RdKafka::Conf::ConfResult res = conf->set(conf_key, conf_val, errstr);
|
||||
if (res == RdKafka::Conf::CONF_UNKNOWN) {
|
||||
// ignore unknown config
|
||||
return Status::OK;
|
||||
} else if (errstr.find("not supported") != std::string::npos) {
|
||||
// some java-only properties may be passed to here, and librdkafak will return 'xxx' not supported
|
||||
// ignore it
|
||||
return Status::OK;
|
||||
} else if (res != RdKafka::Conf::CONF_OK) {
|
||||
std::stringstream ss;
|
||||
ss << "failed to set '" << conf_key << "'";
|
||||
ss << "PAUSE: failed to set '" << conf_key << "', value: '" << conf_val << "', err: " << errstr;
|
||||
LOG(WARNING) << ss.str();
|
||||
return Status(ss.str());
|
||||
}
|
||||
@ -73,12 +82,31 @@ Status KafkaDataConsumer::init(StreamLoadContext* ctx) {
|
||||
RETURN_IF_ERROR(set_conf("api.version.fallback.ms", "0"));
|
||||
|
||||
for (auto& item : ctx->kafka_info->properties) {
|
||||
RETURN_IF_ERROR(set_conf(item.first, item.second));
|
||||
if (boost::algorithm::starts_with(item.second, "FILE:")) {
|
||||
// file property should has format: FILE:file_id:md5
|
||||
std::vector<std::string> parts;
|
||||
boost::split(parts, item.second, boost::is_any_of(":"));
|
||||
if (parts.size() != 3) {
|
||||
return Status("PAUSE: Invalid file property of kafka: " + item.second);
|
||||
}
|
||||
int64_t file_id = std::stol(parts[1]);
|
||||
std::string file_path;
|
||||
Status st = ctx->exec_env()->small_file_mgr()->get_file(file_id, parts[2], &file_path);
|
||||
if (!st.ok()) {
|
||||
std::stringstream ss;
|
||||
ss << "PAUSE: failed to get file for config: " << item.first << ", error: " << st.get_error_msg();
|
||||
return Status(ss.str());
|
||||
}
|
||||
RETURN_IF_ERROR(set_conf(item.first, file_path));
|
||||
} else {
|
||||
RETURN_IF_ERROR(set_conf(item.first, item.second));
|
||||
}
|
||||
_custom_properties.emplace(item.first, item.second);
|
||||
}
|
||||
|
||||
if (conf->set("event_cb", &_k_event_cb, errstr) != RdKafka::Conf::CONF_OK) {
|
||||
std::stringstream ss;
|
||||
ss << "failed to set 'event_cb'";
|
||||
ss << "PAUSE: failed to set 'event_cb'";
|
||||
LOG(WARNING) << ss.str();
|
||||
return Status(ss.str());
|
||||
}
|
||||
@ -86,8 +114,8 @@ Status KafkaDataConsumer::init(StreamLoadContext* ctx) {
|
||||
// create consumer
|
||||
_k_consumer = RdKafka::KafkaConsumer::create(conf, errstr);
|
||||
if (!_k_consumer) {
|
||||
LOG(WARNING) << "failed to create kafka consumer";
|
||||
return Status("failed to create kafka consumer");
|
||||
LOG(WARNING) << "PAUSE: failed to create kafka consumer: " << errstr;
|
||||
return Status("PAUSE: failed to create kafka consumer: " + errstr);
|
||||
}
|
||||
|
||||
VLOG(3) << "finished to init kafka consumer. " << ctx->brief();
|
||||
@ -174,7 +202,7 @@ Status KafkaDataConsumer::group_consume(
|
||||
case RdKafka::ERR__TIMED_OUT:
|
||||
// leave the status as OK, because this may happend
|
||||
// if there is no data in kafka.
|
||||
LOG(WARNING) << "kafka consume timeout: " << _id;
|
||||
LOG(INFO) << "kafka consume timeout: " << _id;
|
||||
break;
|
||||
default:
|
||||
LOG(WARNING) << "kafka consume failed: " << _id
|
||||
@ -199,6 +227,66 @@ Status KafkaDataConsumer::group_consume(
|
||||
return st;
|
||||
}
|
||||
|
||||
Status KafkaDataConsumer::get_partition_meta(std::vector<int32_t>* partition_ids) {
|
||||
// create topic conf
|
||||
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
|
||||
auto conf_deleter = [tconf] () { delete tconf; };
|
||||
DeferOp delete_conf(std::bind<void>(conf_deleter));
|
||||
|
||||
// create topic
|
||||
std::string errstr;
|
||||
RdKafka::Topic *topic = RdKafka::Topic::create(_k_consumer, _topic, tconf, errstr);
|
||||
if (topic == nullptr) {
|
||||
std::stringstream ss;
|
||||
ss << "failed to create topic: " << errstr;
|
||||
LOG(WARNING) << ss.str();
|
||||
return Status(ss.str());
|
||||
}
|
||||
auto topic_deleter = [topic] () { delete topic; };
|
||||
DeferOp delete_topic(std::bind<void>(topic_deleter));
|
||||
|
||||
// get topic metadata
|
||||
RdKafka::Metadata* metadata = nullptr;
|
||||
RdKafka::ErrorCode err = _k_consumer->metadata(true/* for this topic */, topic, &metadata, 5000);
|
||||
if (err != RdKafka::ERR_NO_ERROR) {
|
||||
std::stringstream ss;
|
||||
ss << "failed to get partition meta: " << RdKafka::err2str(err);
|
||||
LOG(WARNING) << ss.str();
|
||||
return Status(ss.str());
|
||||
}
|
||||
auto meta_deleter = [metadata] () { delete metadata; };
|
||||
DeferOp delete_meta(std::bind<void>(meta_deleter));
|
||||
|
||||
// get partition ids
|
||||
RdKafka::Metadata::TopicMetadataIterator it;
|
||||
for (it = metadata->topics()->begin(); it != metadata->topics()->end(); ++it) {
|
||||
if ((*it)->topic() != _topic) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if ((*it)->err() != RdKafka::ERR_NO_ERROR) {
|
||||
std::stringstream ss;
|
||||
ss << "error: " << err2str((*it)->err());
|
||||
if ((*it)->err() == RdKafka::ERR_LEADER_NOT_AVAILABLE) {
|
||||
ss << ", try again";
|
||||
}
|
||||
LOG(WARNING) << ss.str();
|
||||
return Status(ss.str());
|
||||
}
|
||||
|
||||
RdKafka::TopicMetadata::PartitionMetadataIterator ip;
|
||||
for (ip = (*it)->partitions()->begin(); ip != (*it)->partitions()->end(); ++ip) {
|
||||
partition_ids->push_back((*ip)->id());
|
||||
}
|
||||
}
|
||||
|
||||
if (partition_ids->empty()) {
|
||||
return Status("no partition in this topic");
|
||||
}
|
||||
|
||||
return Status::OK;
|
||||
}
|
||||
|
||||
Status KafkaDataConsumer::cancel(StreamLoadContext* ctx) {
|
||||
std::unique_lock<std::mutex> l(_lock);
|
||||
if (!_init) {
|
||||
@ -225,6 +313,15 @@ bool KafkaDataConsumer::match(StreamLoadContext* ctx) {
|
||||
if (_brokers != ctx->kafka_info->brokers || _topic != ctx->kafka_info->topic) {
|
||||
return false;
|
||||
}
|
||||
// check properties
|
||||
if (_custom_properties.size() != ctx->kafka_info->properties.size()) {
|
||||
return false;
|
||||
}
|
||||
for (auto& item : ctx->kafka_info->properties) {
|
||||
if (_custom_properties.find(item.first) == _custom_properties.end()) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user