From 7b50409ada4ba77370775c19354c9eef852cac1e Mon Sep 17 00:00:00 2001 From: xy720 <22125576+xy720@users.noreply.github.com> Date: Sat, 23 Oct 2021 16:47:32 +0800 Subject: [PATCH] [Bug][Binlog] Fix the number of versions may exceed the limit during data synchronization (#6889) Bug detail: #6887 To solve this problem, the commit of transaction must meet any of the following conditions to avoid commit too freqently: 1. The current accumulated event quantity is greater than the `min_sync_commit_size`. 2. The current accumulated data size is greater than the `min_bytes_sync_commit`. In addition, when the accumulated data size exceeds `max_bytes_sync_commit`, the transaction needs to be committed immediately. Before: ![a5e0a2ba01ec4935144253fe0a364af7](https://user-images.githubusercontent.com/22125576/137933545-77018e89-fa2e-4d45-ae5d-84638cc0506a.png) After: ![4577ec53afa47452c847bd01fa7db56c](https://user-images.githubusercontent.com/22125576/137933592-146bef90-1346-47e4-996e-4f30a25d73bc.png) --- .../load-data/binlog-load-manual.md | 33 +++++++++++++++++- .../load-data/binlog-load-manual.md | 34 ++++++++++++++++++- .../java/org/apache/doris/common/Config.java | 34 +++++++++++++++++++ .../doris/load/sync/canal/CanalConfigs.java | 3 -- .../sync/canal/CanalSyncDataConsumer.java | 11 ++++-- .../doris/load/sync/canal/CanalSyncJob.java | 5 +-- .../load/sync/canal/SyncCanalClient.java | 15 ++++---- 7 files changed, 118 insertions(+), 17 deletions(-) diff --git a/docs/en/administrator-guide/load-data/binlog-load-manual.md b/docs/en/administrator-guide/load-data/binlog-load-manual.md index 90db6da5ae..a13e52000d 100644 --- a/docs/en/administrator-guide/load-data/binlog-load-manual.md +++ b/docs/en/administrator-guide/load-data/binlog-load-manual.md @@ -104,7 +104,7 @@ A Send task is a request from Channel to Be, which contains the data of the same Channel controls the begin, commit and abort of transaction of single table. In a transaction, the consumer may distribute multiple Batches of data to a channel, so multiple send tasks may be generated. These tasks will not actually take effect until the transaction is committed successfully. -When certain conditions are met (for example, a certain period of time was passed, an empty batch is received), the Consumer will block and notify each channel to try commit the transaction. +When certain conditions are met (for example, a certain period of time was passed, reach the maximun data size of commit), the Consumer will block and notify each channel to try commit the transaction. If and only if all channels are committed successfully, Canal Server will be notified by the ACK request and Canal Client continue to get and consume data. @@ -456,6 +456,25 @@ You can use `HELP STOP SYNC JOB;`, `HELP PAUSE SYNC JOB`; And `HELP RESUME SYNC ## Related Parameters +### Canal configuration + +* `canal.ip` + + canal server's ip address + +* `canal.port` + + canal server's port + +* `canal.instance.memory.buffer.size` + + The queue length of the store ring queue, must be set to the power of 2, the default length is 16384. This value is equal to the maximum number of events that can be cached on the canal side and directly determines the maximum number of events that can be accommodated in a transaction on the Doris side. It is recommended to make it large enough to prevent the upper limit of the amount of data that can be accommodated in a transaction on the Doris side from being too small, resulting in too frequent transaction submission and data version accumulation. + +* `canal.instance.memory.buffer.memunit` + + The default space occupied by an event at the canal end, default value is 1024 bytes. This value multiplied by `canal.instance.memory.buffer.size` is equal to the maximum space of the store. For example, if the queue length of the store is 16384, the space of the store is 16MB. However, the actual size of an event is not actually equal to this value, but is determined by the number of rows of data in the event and the length of each row of data. For example, the insert event of a table with only two columns is only 30 bytes, but the delete event may reach thousands of bytes. This is because the number of rows of delete event is usually more than that of insert event. + + ### Fe configuration The following configuration belongs to the system level configuration of SyncJob. The configuration value can be modified in configuration file fe.conf. @@ -468,6 +487,18 @@ The following configuration belongs to the system level configuration of SyncJob Maximum interval time between commit transactions. If there is still data in the channel that has not been committed after this time, the consumer will notify the channel to commit the transaction. +* `min_sync_commit_size` + + The minimum number of events required to commit a transaction. If the number of events received by Fe is less than it, Fe will continue to wait for the next batch of data until the time exceeds `sync_commit_interval_second`. The default value is 10000 events. If you want to modify this configuration, please ensure that this value is less than the `canal.instance.memory.buffer.size` configuration on the canal side (16384 by default). Otherwise, Fe will try to get more events than the length of the store queue without ack, causing the store queue to block until timeout. + +* `min_bytes_sync_commit` + + The minimum data size required to commit a transaction. If the data size received by Fe is smaller than it, it will continue to wait for the next batch of data until the time exceeds `sync_commit_interval_second`. The default value is 15MB. If you want to modify this configuration, please ensure that this value is less than the product `canal.instance.memory.buffer.size` and `canal.instance.memory.buffer.memunit` on the canal side (16MB by default). Otherwise, Fe will try to get data from canal larger than the store space without ack, causing the store queue to block until timeout. + +* `max_bytes_sync_commit` + + The maximum size of the data when the transaction is committed. If the data size received by Fe is larger than it, it will immediately commit the transaction and send the accumulated data. The default value is 64MB. If you want to modify this configuration, please ensure that this value is greater than the product of `canal.instance.memory.buffer.size` and `canal.instance.memory.buffer.mmemunit` on the canal side (16MB by default) and `min_bytes_sync_commit`。 + * `max_sync_task_threads_num` The maximum number of threads in the SyncJobs' thread pool. There is only one thread pool in the whole Fe for synchronization, which is used to process the tasks created by all SyncJobs in the Fe. diff --git a/docs/zh-CN/administrator-guide/load-data/binlog-load-manual.md b/docs/zh-CN/administrator-guide/load-data/binlog-load-manual.md index 3ed946b85f..8862a0a113 100644 --- a/docs/zh-CN/administrator-guide/load-data/binlog-load-manual.md +++ b/docs/zh-CN/administrator-guide/load-data/binlog-load-manual.md @@ -105,7 +105,7 @@ client中的receiver将负责通过Get命令接收数据,每获取到一个数 channel控制着单个表事务的开始、提交、终止。一个事务周期内,一般会从consumer获取到多个batch的数据,因此会产生多个向BE发送数据的子任务Task,在提交事务成功前,这些Task不会实际生效。 -满足一定条件时(比如超过一定时间、获取到了空的batch),consumer将会阻塞并通知各个channel提交事务。 +满足一定条件时(比如超过一定时间、达到提交最大数据大小),consumer将会阻塞并通知各个channel提交事务。 当且仅当所有channel都提交成功,才会通过Ack命令通知canal并继续获取并消费数据。 @@ -434,6 +434,26 @@ ALTER TABLE canal_test.test1 ENABLE FEATURE "BATCH_DELETE"; ## 相关参数 +### CANAL配置 + +下面配置属于canal端的配置,主要通过修改 conf 目录下的 canal.properties 调整配置值。 + +* `canal.ip` + + canal server的ip地址 + +* `canal.port` + + canal server的端口 + +* `canal.instance.memory.buffer.size` + + canal端的store环形队列的队列长度,必须设为2的幂次方,默认长度16384。此值等于canal端能缓存event数量的最大值,也直接决定了Doris端一个事务内所能容纳的最大event数量。建议将它改的足够大,防止Doris端一个事务内能容纳的数据量上限太小,导致提交事务太过频繁造成数据的版本堆积。 + +* `canal.instance.memory.buffer.memunit` + + canal端默认一个event所占的空间,默认空间为1024 bytes。此值乘上store环形队列的队列长度等于store的空间最大值,比如store队列长度为16384,则store的空间为16MB。但是,一个event的实际大小并不等于此值,而是由这个event内有多少行数据和每行数据的长度决定的,比如一张只有两列的表的insert event只有30字节,但delete event可能达到数千字节,这是因为通常delete event的行数比insert event多。 + ### FE配置 下面配置属于数据同步作业的系统级别配置,主要通过修改 fe.conf 来调整配置值。 @@ -446,6 +466,18 @@ ALTER TABLE canal_test.test1 ENABLE FEATURE "BATCH_DELETE"; 提交事务的最大时间间隔。若超过了这个时间channel中还有数据没有提交,consumer会通知channel提交事务。 +* `min_sync_commit_size` + + 提交事务需满足的最小event数量。若Fe接收到的event数量小于它,会继续等待下一批数据直到时间超过了`sync_commit_interval_second `为止。默认值是10000个events,如果你想修改此配置,请确保此值小于canal端的`canal.instance.memory.buffer.size`配置(默认16384),否则在ack前Fe会尝试获取比store队列长度更多的event,导致store队列阻塞至超时为止。 + +* `min_bytes_sync_commit` + + 提交事务需满足的最小数据大小。若Fe接收到的数据大小小于它,会继续等待下一批数据直到时间超过了`sync_commit_interval_second `为止。默认值是15MB,如果你想修改此配置,请确保此值小于canal端的`canal.instance.memory.buffer.size`和`canal.instance.memory.buffer.memunit`的乘积(默认16MB),否则在ack前Fe会尝试获取比store空间更大的数据,导致store队列阻塞至超时为止。 + +* `max_bytes_sync_commit` + + 提交事务时的数据大小的最大值。若Fe接收到的数据大小大于它,会立即提交事务并发送已积累的数据。默认值是64MB,如果你想修改此配置,请确保此值大于canal端的`canal.instance.memory.buffer.size`和`canal.instance.memory.buffer.memunit`的乘积(默认16MB)和`min_bytes_sync_commit`。 + * `max_sync_task_threads_num` 数据同步作业线程池中的最大线程数量。此线程池整个FE中只有一个,用于处理FE中所有数据同步作业向BE发送数据的任务task,线程池的实现在`SyncTaskPool`类。 diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index f67bee960b..d79363a574 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -646,6 +646,40 @@ public class Config extends ConfigBase { */ @ConfField public static int max_sync_task_threads_num = 10; + + /** + * Min event size that a sync job will commit. + * When receiving events less than it, SyncJob will continue + * to wait for the next batch of data until the time exceeds + * `sync_commit_interval_second`. + * The default value is 10000 (canal default event buffer size is 16384). + * You should set it smaller than canal buffer size. + */ + @ConfField(mutable = true, masterOnly = true) + public static long min_sync_commit_size = 10000; + + /** + * Min bytes that a sync job will commit. + * When receiving bytes less than it, SyncJob will continue + * to wait for the next batch of data until the time exceeds + * `sync_commit_interval_second`. + * The default value is 15 MB (canal default memory is 16 MB). + * You should set it slightly smaller than canal memory. + */ + @ConfField(mutable = true, masterOnly = true) + public static long min_bytes_sync_commit = 15 * 1024 * 1024; // 15 MB + + /** + * Max bytes that a sync job will commit. + * When receiving bytes less than it, SyncJob will commit + * all data immediately. + * The default value is 64 MB (canal default memory is 16 MB). + * You should set it larger than canal memory and + * `min_bytes_sync_commit`. + */ + @ConfField(mutable = true, masterOnly = true) + public static long max_bytes_sync_commit = 64 * 1024 * 1024; // 64 MB + /** * Default number of waiting jobs for routine load and version 2 of load * This is a desired number. diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalConfigs.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalConfigs.java index 841acb638e..394bb67c22 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalConfigs.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalConfigs.java @@ -24,7 +24,4 @@ public class CanalConfigs { // Maximal waiting time for consumer to poll one batch public static long pollWaitingTimeoutMs = 80L; - - // Maximal waiting time for channel to poll one batch - public static long channelWaitingTimeoutMs = 1000L; } \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncDataConsumer.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncDataConsumer.java index bdfc7327d5..16b897dc08 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncDataConsumer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncDataConsumer.java @@ -59,7 +59,9 @@ public class CanalSyncDataConsumer extends SyncDataConsumer { private static Logger logger = LogManager.getLogger(CanalSyncDataConsumer.class); private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss"; - private static final long COMMIT_MEM_SIZE = 64 * 1024 * 1024; // 64mb; + private static final long MIN_COMMIT_EVENT_SIZE = Config.min_sync_commit_size; + private static final long MIN_COMMIT_MEM_SIZE = Config.min_bytes_sync_commit; + private static final long MAX_COMMIT_MEM_SIZE = Config.max_bytes_sync_commit; private CanalSyncJob syncJob; private CanalConnector connector; @@ -198,7 +200,8 @@ public class CanalSyncDataConsumer extends SyncDataConsumer { // do nothing } if (dataEvents == null) { - if (totalSize > 0 || totalMemSize > 0) { + // If not, continue to wait for the next batch of data + if (totalSize >= MIN_COMMIT_EVENT_SIZE || totalMemSize >= MIN_COMMIT_MEM_SIZE) { break; } try { @@ -218,7 +221,8 @@ public class CanalSyncDataConsumer extends SyncDataConsumer { executeOneBatch(dataEvents); totalSize += size; totalMemSize += dataEvents.getMemSize(); - if (totalMemSize >= COMMIT_MEM_SIZE) { + // size of bytes received so far is larger than max commit memory size. + if (totalMemSize >= MAX_COMMIT_MEM_SIZE) { break; } } @@ -228,6 +232,7 @@ public class CanalSyncDataConsumer extends SyncDataConsumer { } } + // wait all channels done Status st = waitForTxn(); if (!running) { abortForTxn("stopping client"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncJob.java index 7c1b80028c..3235c2c492 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncJob.java @@ -60,7 +60,7 @@ public class CanalSyncJob extends SyncJob { protected final static String CANAL_DEBUG = "canal.debug"; @SerializedName(value = "remote") - private CanalDestination remote; + private final CanalDestination remote; @SerializedName(value = "username") private String username; @SerializedName(value = "password") @@ -139,7 +139,7 @@ public class CanalSyncJob extends SyncJob { password = properties.get(CANAL_PASSWORD); } - // optional binlog properties + // optional if (properties.containsKey(CANAL_BATCH_SIZE)) { try { batchSize = Integer.parseInt(properties.get(CANAL_BATCH_SIZE)); @@ -148,6 +148,7 @@ public class CanalSyncJob extends SyncJob { } } + // optional if (properties.containsKey(CANAL_DEBUG)) { debug = Boolean.parseBoolean(properties.get(CANAL_DEBUG)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/SyncCanalClient.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/SyncCanalClient.java index 421c46d5e2..e1c1eb7282 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/SyncCanalClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/SyncCanalClient.java @@ -34,13 +34,12 @@ import java.util.concurrent.locks.ReentrantLock; public class SyncCanalClient { protected static Logger logger = LogManager.getLogger(SyncCanalClient.class); - private CanalConnector connector; - - private CanalSyncDataReceiver receiver; - private CanalSyncDataConsumer consumer; + private final CanalConnector connector; + private final CanalSyncDataReceiver receiver; + private final CanalSyncDataConsumer consumer; // channel id -> channel - private Map idToChannels; + private final Map idToChannels; protected ReentrantLock lock = new ReentrantLock(true); protected ReentrantLock getLock = new ReentrantLock(); @@ -53,11 +52,13 @@ public class SyncCanalClient { lock.unlock(); } - public SyncCanalClient(CanalSyncJob syncJob, String destination, CanalConnector connector, int batchSize, boolean debug) { + public SyncCanalClient(CanalSyncJob syncJob, String destination, CanalConnector connector, + int batchSize, boolean debug) { this(syncJob, destination, connector, batchSize, debug, ".*\\..*"); } - public SyncCanalClient(CanalSyncJob syncJob, String destination, CanalConnector connector, int batchSize, boolean debug, String filter) { + public SyncCanalClient(CanalSyncJob syncJob, String destination, CanalConnector connector, + int batchSize, boolean debug, String filter) { this.connector = connector; this.consumer = new CanalSyncDataConsumer(syncJob, connector, getLock, debug); this.receiver = new CanalSyncDataReceiver(syncJob, connector, destination, filter, consumer, batchSize, getLock);