From 83f6eef506071c37d028d9f269a671f07129ad4a Mon Sep 17 00:00:00 2001 From: caiconghui <55968745+caiconghui@users.noreply.github.com> Date: Mon, 10 Jan 2022 17:30:24 +0800 Subject: [PATCH] [improvement](routine-load) Make routine load work with old kafka version (#7630) Co-authored-by: caiconghui1 --- be/src/common/config.h | 5 +++++ be/src/runtime/routine_load/data_consumer.cpp | 1 + docs/en/administrator-guide/config/be_config.md | 6 ++++++ .../en/administrator-guide/load-data/routine-load-manual.md | 2 +- docs/zh-CN/administrator-guide/config/be_config.md | 6 ++++++ .../administrator-guide/load-data/routine-load-manual.md | 2 +- 6 files changed, 20 insertions(+), 2 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 15cec1b9dd..391e665f0b 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -665,6 +665,11 @@ CONF_Int32(max_minidump_file_size_mb, "200"); // Doris will only keep latest 10 minidump files by default. CONF_Int32(max_minidump_file_number, "10"); +// If the dependent Kafka version is lower than the Kafka client version that routine load depends on, +// the value set by the fallback version kafka_broker_version_fallback will be used, +// and the valid values are: 0.9.0, 0.8.2, 0.8.1, 0.8.0. +CONF_String(kafka_broker_version_fallback, "0.10.0"); + } // namespace config } // namespace doris diff --git a/be/src/runtime/routine_load/data_consumer.cpp b/be/src/runtime/routine_load/data_consumer.cpp index 8d4fcdc963..7522e86d22 100644 --- a/be/src/runtime/routine_load/data_consumer.cpp +++ b/be/src/runtime/routine_load/data_consumer.cpp @@ -77,6 +77,7 @@ Status KafkaDataConsumer::init(StreamLoadContext* ctx) { RETURN_IF_ERROR(set_conf("auto.offset.reset", "error")); RETURN_IF_ERROR(set_conf("api.version.request", "true")); RETURN_IF_ERROR(set_conf("api.version.fallback.ms", "0")); + RETURN_IF_ERROR(set_conf("broker.version.fallback", config::kafka_broker_version_fallback)); for (auto& item : ctx->kafka_info->properties) { if (starts_with(item.second, "FILE:")) { diff --git a/docs/en/administrator-guide/config/be_config.md b/docs/en/administrator-guide/config/be_config.md index e384323af4..6b18e42915 100644 --- a/docs/en/administrator-guide/config/be_config.md +++ b/docs/en/administrator-guide/config/be_config.md @@ -678,6 +678,12 @@ Default:10737418240 BloomFilter/Min/Max and other statistical information cache capacity +### `kafka_broker_version_fallback` + +Default:0.10.0 + +If the dependent Kafka version is lower than the Kafka client version that routine load depends on, the value set by the fallback version kafka_broker_version_fallback will be used, and the valid values are: 0.9.0, 0.8.2, 0.8.1, 0.8.0. + ### `load_data_reserve_hours` Default:4(hour) diff --git a/docs/en/administrator-guide/load-data/routine-load-manual.md b/docs/en/administrator-guide/load-data/routine-load-manual.md index 37aa64b210..05d47ac9ba 100644 --- a/docs/en/administrator-guide/load-data/routine-load-manual.md +++ b/docs/en/administrator-guide/load-data/routine-load-manual.md @@ -85,7 +85,7 @@ Currently we only support routine load from the Kafka system. This section detai 1. Support unauthenticated Kafka access and Kafka clusters certified by SSL. 2. The supported message format is csv text or json format. Each message is a line in csv format, and the end of the line does not contain a ** line break. -3. Only Kafka 0.10.0.0 or above is supported. +3. Kafka 0.10.0.0 (inclusive) or above is supported by default. If you want to use Kafka versions below 0.10.0.0 (0.9.0, 0.8.2, 0.8.1, 0.8.0), you need to modify the configuration of be, set the value of kafka_broker_version_fallback to be the older version, or directly set the value of property.broker.version.fallback to the old version when creating routine load. The cost of the old version is that some of the new features of routine load may not be available, such as setting the offset of the kafka partition by time. ### Create a routine load task diff --git a/docs/zh-CN/administrator-guide/config/be_config.md b/docs/zh-CN/administrator-guide/config/be_config.md index 25217966fc..a041ecd84b 100644 --- a/docs/zh-CN/administrator-guide/config/be_config.md +++ b/docs/zh-CN/administrator-guide/config/be_config.md @@ -679,6 +679,12 @@ load tablets from header failed, failed tablets size: xxx, path=xxx BloomFilter/Min/Max等统计信息缓存的容量 +### `kafka_broker_version_fallback` + +默认值:0.10.0 + +如果依赖的 kafka 版本低于routine load依赖的 kafka 客户端版本, 将使用回退版本 kafka_broker_version_fallback 设置的值,有效值为:0.9.0、0.8.2、0.8.1、0.8.0。 + ### `load_data_reserve_hours` 默认值:4 (小时) diff --git a/docs/zh-CN/administrator-guide/load-data/routine-load-manual.md b/docs/zh-CN/administrator-guide/load-data/routine-load-manual.md index 9f704f7f04..a6b47e51d2 100644 --- a/docs/zh-CN/administrator-guide/load-data/routine-load-manual.md +++ b/docs/zh-CN/administrator-guide/load-data/routine-load-manual.md @@ -85,7 +85,7 @@ FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或 1. 支持无认证的 Kafka 访问,以及通过 SSL 方式认证的 Kafka 集群。 2. 支持的消息格式为 csv, json 文本格式。csv 每一个 message 为一行,且行尾**不包含**换行符。 -3. 仅支持 Kafka 0.10.0.0(含) 以上版本。 +3. 默认支持 Kafka 0.10.0.0(含) 以上版本。如果要使用 Kafka 0.10.0.0 以下版本 (0.9.0, 0.8.2, 0.8.1, 0.8.0),需要修改 be 的配置,将 kafka_broker_version_fallback 的值设置为要兼容的旧版本,或者在创建routine load的时候直接设置 property.broker.version.fallback的值为要兼容的旧版本,使用旧版本的代价是routine load 的部分新特性可能无法使用,如根据时间设置 kafka 分区的 offset。 ### 创建例行导入任务