[Bug][Binlog] Fix the number of versions may exceed the limit during data synchronization (#6889)
Bug detail: #6887 To solve this problem, the commit of transaction must meet any of the following conditions to avoid commit too freqently: 1. The current accumulated event quantity is greater than the `min_sync_commit_size`. 2. The current accumulated data size is greater than the `min_bytes_sync_commit`. In addition, when the accumulated data size exceeds `max_bytes_sync_commit`, the transaction needs to be committed immediately. Before:  After: 
This commit is contained in:
@ -646,6 +646,40 @@ public class Config extends ConfigBase {
|
||||
*/
|
||||
@ConfField public static int max_sync_task_threads_num = 10;
|
||||
|
||||
|
||||
/**
|
||||
* Min event size that a sync job will commit.
|
||||
* When receiving events less than it, SyncJob will continue
|
||||
* to wait for the next batch of data until the time exceeds
|
||||
* `sync_commit_interval_second`.
|
||||
* The default value is 10000 (canal default event buffer size is 16384).
|
||||
* You should set it smaller than canal buffer size.
|
||||
*/
|
||||
@ConfField(mutable = true, masterOnly = true)
|
||||
public static long min_sync_commit_size = 10000;
|
||||
|
||||
/**
|
||||
* Min bytes that a sync job will commit.
|
||||
* When receiving bytes less than it, SyncJob will continue
|
||||
* to wait for the next batch of data until the time exceeds
|
||||
* `sync_commit_interval_second`.
|
||||
* The default value is 15 MB (canal default memory is 16 MB).
|
||||
* You should set it slightly smaller than canal memory.
|
||||
*/
|
||||
@ConfField(mutable = true, masterOnly = true)
|
||||
public static long min_bytes_sync_commit = 15 * 1024 * 1024; // 15 MB
|
||||
|
||||
/**
|
||||
* Max bytes that a sync job will commit.
|
||||
* When receiving bytes less than it, SyncJob will commit
|
||||
* all data immediately.
|
||||
* The default value is 64 MB (canal default memory is 16 MB).
|
||||
* You should set it larger than canal memory and
|
||||
* `min_bytes_sync_commit`.
|
||||
*/
|
||||
@ConfField(mutable = true, masterOnly = true)
|
||||
public static long max_bytes_sync_commit = 64 * 1024 * 1024; // 64 MB
|
||||
|
||||
/**
|
||||
* Default number of waiting jobs for routine load and version 2 of load
|
||||
* This is a desired number.
|
||||
|
||||
@ -24,7 +24,4 @@ public class CanalConfigs {
|
||||
|
||||
// Maximal waiting time for consumer to poll one batch
|
||||
public static long pollWaitingTimeoutMs = 80L;
|
||||
|
||||
// Maximal waiting time for channel to poll one batch
|
||||
public static long channelWaitingTimeoutMs = 1000L;
|
||||
}
|
||||
@ -59,7 +59,9 @@ public class CanalSyncDataConsumer extends SyncDataConsumer {
|
||||
private static Logger logger = LogManager.getLogger(CanalSyncDataConsumer.class);
|
||||
|
||||
private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
|
||||
private static final long COMMIT_MEM_SIZE = 64 * 1024 * 1024; // 64mb;
|
||||
private static final long MIN_COMMIT_EVENT_SIZE = Config.min_sync_commit_size;
|
||||
private static final long MIN_COMMIT_MEM_SIZE = Config.min_bytes_sync_commit;
|
||||
private static final long MAX_COMMIT_MEM_SIZE = Config.max_bytes_sync_commit;
|
||||
|
||||
private CanalSyncJob syncJob;
|
||||
private CanalConnector connector;
|
||||
@ -198,7 +200,8 @@ public class CanalSyncDataConsumer extends SyncDataConsumer {
|
||||
// do nothing
|
||||
}
|
||||
if (dataEvents == null) {
|
||||
if (totalSize > 0 || totalMemSize > 0) {
|
||||
// If not, continue to wait for the next batch of data
|
||||
if (totalSize >= MIN_COMMIT_EVENT_SIZE || totalMemSize >= MIN_COMMIT_MEM_SIZE) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
@ -218,7 +221,8 @@ public class CanalSyncDataConsumer extends SyncDataConsumer {
|
||||
executeOneBatch(dataEvents);
|
||||
totalSize += size;
|
||||
totalMemSize += dataEvents.getMemSize();
|
||||
if (totalMemSize >= COMMIT_MEM_SIZE) {
|
||||
// size of bytes received so far is larger than max commit memory size.
|
||||
if (totalMemSize >= MAX_COMMIT_MEM_SIZE) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -228,6 +232,7 @@ public class CanalSyncDataConsumer extends SyncDataConsumer {
|
||||
}
|
||||
}
|
||||
|
||||
// wait all channels done
|
||||
Status st = waitForTxn();
|
||||
if (!running) {
|
||||
abortForTxn("stopping client");
|
||||
|
||||
@ -60,7 +60,7 @@ public class CanalSyncJob extends SyncJob {
|
||||
protected final static String CANAL_DEBUG = "canal.debug";
|
||||
|
||||
@SerializedName(value = "remote")
|
||||
private CanalDestination remote;
|
||||
private final CanalDestination remote;
|
||||
@SerializedName(value = "username")
|
||||
private String username;
|
||||
@SerializedName(value = "password")
|
||||
@ -139,7 +139,7 @@ public class CanalSyncJob extends SyncJob {
|
||||
password = properties.get(CANAL_PASSWORD);
|
||||
}
|
||||
|
||||
// optional binlog properties
|
||||
// optional
|
||||
if (properties.containsKey(CANAL_BATCH_SIZE)) {
|
||||
try {
|
||||
batchSize = Integer.parseInt(properties.get(CANAL_BATCH_SIZE));
|
||||
@ -148,6 +148,7 @@ public class CanalSyncJob extends SyncJob {
|
||||
}
|
||||
}
|
||||
|
||||
// optional
|
||||
if (properties.containsKey(CANAL_DEBUG)) {
|
||||
debug = Boolean.parseBoolean(properties.get(CANAL_DEBUG));
|
||||
}
|
||||
|
||||
@ -34,13 +34,12 @@ import java.util.concurrent.locks.ReentrantLock;
|
||||
public class SyncCanalClient {
|
||||
protected static Logger logger = LogManager.getLogger(SyncCanalClient.class);
|
||||
|
||||
private CanalConnector connector;
|
||||
|
||||
private CanalSyncDataReceiver receiver;
|
||||
private CanalSyncDataConsumer consumer;
|
||||
private final CanalConnector connector;
|
||||
private final CanalSyncDataReceiver receiver;
|
||||
private final CanalSyncDataConsumer consumer;
|
||||
|
||||
// channel id -> channel
|
||||
private Map<Long, CanalSyncChannel> idToChannels;
|
||||
private final Map<Long, CanalSyncChannel> idToChannels;
|
||||
|
||||
protected ReentrantLock lock = new ReentrantLock(true);
|
||||
protected ReentrantLock getLock = new ReentrantLock();
|
||||
@ -53,11 +52,13 @@ public class SyncCanalClient {
|
||||
lock.unlock();
|
||||
}
|
||||
|
||||
public SyncCanalClient(CanalSyncJob syncJob, String destination, CanalConnector connector, int batchSize, boolean debug) {
|
||||
public SyncCanalClient(CanalSyncJob syncJob, String destination, CanalConnector connector,
|
||||
int batchSize, boolean debug) {
|
||||
this(syncJob, destination, connector, batchSize, debug, ".*\\..*");
|
||||
}
|
||||
|
||||
public SyncCanalClient(CanalSyncJob syncJob, String destination, CanalConnector connector, int batchSize, boolean debug, String filter) {
|
||||
public SyncCanalClient(CanalSyncJob syncJob, String destination, CanalConnector connector,
|
||||
int batchSize, boolean debug, String filter) {
|
||||
this.connector = connector;
|
||||
this.consumer = new CanalSyncDataConsumer(syncJob, connector, getLock, debug);
|
||||
this.receiver = new CanalSyncDataReceiver(syncJob, connector, destination, filter, consumer, batchSize, getLock);
|
||||
|
||||
Reference in New Issue
Block a user