From bfdc41d37b28b13c4a844d3967ca14c5ca5a72b9 Mon Sep 17 00:00:00 2001 From: Yongqiang YANG <98214048+dataroaring@users.noreply.github.com> Date: Sun, 28 Jan 2024 17:52:39 +0800 Subject: [PATCH] [fix](ccr) handle large binlog (#30435) --- .../main/java/org/apache/doris/common/Config.java | 5 +++++ .../org/apache/doris/binlog/BinlogManager.java | 14 +++++++++++++- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index bc849e24bd..1231312cf5 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2145,6 +2145,11 @@ public class Config extends ConfigBase { "Whether to enable binlog feature"}) public static boolean enable_feature_binlog = false; + @ConfField(mutable = false, masterOnly = false, varType = VariableAnnotation.EXPERIMENTAL, description = { + "设置 binlog 消息最字节长度", + "Set the maximum byte length of binlog message"}) + public static int max_binlog_messsage_size = 1024 * 1024 * 1024; + @ConfField(mutable = true, masterOnly = true, description = { "是否禁止使用 WITH REOSOURCE 语句创建 Catalog。", "Whether to disable creating catalog with WITH RESOURCE statement."}) diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java index b819d8444e..6ac3ba3b3a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java @@ -457,10 +457,22 @@ public class BinlogManager { int length = dis.readInt(); byte[] data = new byte[length]; dis.readFully(data); - TMemoryInputTransport transport = new TMemoryInputTransport(data); + Boolean isLargeBinlog = length > 8 * 1024 * 1024; + if (isLargeBinlog) { + LOG.info("a large binlog length {}", length); + } + + TMemoryInputTransport transport = new TMemoryInputTransport(); + transport.getConfiguration().setMaxMessageSize(Config.max_binlog_messsage_size); + transport.reset(data); + TBinaryProtocol protocol = new TBinaryProtocol(transport); TBinlog binlog = new TBinlog(); binlog.read(protocol); + + if (isLargeBinlog) { + LOG.info("a large binlog length {} type {}", length, binlog.type); + } return binlog; }