From 95cdb7cc0caba77d2a16cbff5d4717b8354bb7a1 Mon Sep 17 00:00:00 2001 From: xy720 <22125576+xy720@users.noreply.github.com> Date: Fri, 17 Sep 2021 10:01:27 +0800 Subject: [PATCH] [Enhance] [Binlog] Reduce thread number of SyncJob to save resources (#6418) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit is going to reduce thread number of SyncJob . 1、Submit send task to thread pool to send data. 2、Submit eof task to thread pool to block and wake up client to commit transactions. 3、Use SerialExecutorService to ensure correct order of sent data in every channel. Besides,some bugs have been fixed in this commit 1、Failed to resume syncJob. 2、Failed to do sync data when set multiple tables in a syncJob. 3、In a cluster with multiple Fe, master may hang up after creating syncJob. --- fe/fe-core/src/main/cup/sql_parser.cup | 2 + .../doris/analysis/ChannelDescription.java | 10 ++ .../org/apache/doris/catalog/Catalog.java | 2 +- .../java/org/apache/doris/common/Config.java | 5 + .../apache/doris/load/sync/SyncChannel.java | 23 +-- .../doris/load/sync/SyncChannelCallback.java | 2 - .../doris/load/sync/SyncChannelHandle.java | 53 ------- .../apache/doris/load/sync/SyncChecker.java | 3 +- .../org/apache/doris/load/sync/SyncJob.java | 4 + .../apache/doris/load/sync/SyncLifeCycle.java | 5 + .../load/sync/canal/CanalSyncChannel.java | 94 +++++------ .../sync/canal/CanalSyncDataConsumer.java | 36 +++-- .../doris/load/sync/canal/CanalSyncJob.java | 25 ++- .../load/sync/canal/SyncCanalClient.java | 54 ++----- .../load/sync/position/PositionMeta.java | 3 +- .../doris/qe/InsertStreamTxnExecutor.java | 3 +- .../org/apache/doris/task/SerialExecutor.java | 146 ++++++++++++++++++ .../doris/task/SerialExecutorService.java | 80 ++++++++++ .../{load/sync => task}/SyncPendingTask.java | 4 +- .../java/org/apache/doris/task/SyncTask.java | 71 +++++++++ .../org/apache/doris/task/SyncTaskPool.java | 51 ++++++ .../load/sync/canal/CanalSyncDataTest.java | 17 +- .../doris/task/SerialExecutorServiceTest.java | 133 ++++++++++++++++ 23 files changed, 618 insertions(+), 208 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/task/SerialExecutor.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/task/SerialExecutorService.java rename fe/fe-core/src/main/java/org/apache/doris/{load/sync => task}/SyncPendingTask.java (95%) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/task/SyncTask.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/task/SyncTaskPool.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/task/SerialExecutorServiceTest.java diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 5a62e6704c..c8d28a4016 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -5264,6 +5264,8 @@ keyword ::= {: RESULT = id; :} | KW_ISOLATION:id {: RESULT = id; :} + | KW_JOB:id + {: RESULT = id; :} | KW_ENCRYPTKEY:id {: RESULT = id; :} | KW_ENCRYPTKEYS:id diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ChannelDescription.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ChannelDescription.java index 51eae33f54..c926fae2b1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ChannelDescription.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ChannelDescription.java @@ -60,6 +60,8 @@ public class ChannelDescription implements Writable { // column names of source table @SerializedName(value = "colNames") private final List colNames; + @SerializedName(value = "channelId") + private long channelId; public ChannelDescription(String srcDatabase, String srcTableName, String targetTable, PartitionNames partitionNames, List colNames) { this.srcDatabase = srcDatabase; @@ -119,6 +121,14 @@ public class ChannelDescription implements Writable { } } + public void setChannelId(long channelId) { + this.channelId = channelId; + } + + public long getChannelId() { + return this.channelId; + } + public String getTargetTable() { return targetTable; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index 265e1da8ac..1c4414958f 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -1318,7 +1318,7 @@ public class Catalog { ExportChecker.init(Config.export_checker_interval_second * 1000L); ExportChecker.startAll(); // Sync checker - SyncChecker.init(Config.sync_checker_interval_second); + SyncChecker.init(Config.sync_checker_interval_second * 1000L); SyncChecker.startAll(); // Tablet checker and scheduler tabletChecker.start(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 2eaf139268..645e1d4929 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -635,6 +635,11 @@ public class Config extends ConfigBase { */ @ConfField public static int sync_checker_interval_second = 5; + /** + * max num of thread to handle sync task in sync task thread-pool. + */ + @ConfField public static int max_sync_task_threads_num = 10; + /** * Default number of waiting jobs for routine load and version 2 of load * This is a desired number. diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannel.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannel.java index 85644b13f8..8a47385816 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannel.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannel.java @@ -18,7 +18,6 @@ package org.apache.doris.load.sync; import org.apache.doris.analysis.PartitionNames; -import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.OlapTable; import org.apache.doris.common.UserException; @@ -32,7 +31,7 @@ import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; -public class SyncChannel extends SyncLifeCycle { +public class SyncChannel { private static final Logger LOG = LogManager.getLogger(SyncChannel.class); protected long id; @@ -46,8 +45,8 @@ public class SyncChannel extends SyncLifeCycle { protected String srcTable; protected SyncChannelCallback callback; - public SyncChannel(SyncJob syncJob, Database db, OlapTable table, List columns, String srcDataBase, String srcTable) { - this.id = Catalog.getCurrentCatalog().getNextId(); + public SyncChannel(long id, SyncJob syncJob, Database db, OlapTable table, List columns, String srcDataBase, String srcTable) { + this.id = id; this.jobId = syncJob.getId(); this.db = db; this.tbl = table; @@ -57,22 +56,6 @@ public class SyncChannel extends SyncLifeCycle { this.srcTable = srcTable.toLowerCase(); } - @Override - public void start() { - super.start(); - LOG.info("channel {} has been started. dest table: {}, mysql src table: {}.{}", id, targetTable, srcDataBase, srcTable); - } - - @Override - public void stop() { - super.stop(); - LOG.info("channel {} has been stopped. dest table: {}, mysql src table: {}.{}", id, targetTable, srcDataBase, srcTable); - } - - @Override - public void process() { - } - public void beginTxn(long batchId) throws UserException, TException, TimeoutException, InterruptedException, ExecutionException { } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannelCallback.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannelCallback.java index 8b2f239236..2cdf717467 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannelCallback.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannelCallback.java @@ -19,8 +19,6 @@ package org.apache.doris.load.sync; public interface SyncChannelCallback { - public boolean state(); - public void onFinished(long channelId); public void onFailed(String errMsg); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannelHandle.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannelHandle.java index 4e3a397c99..8f7721ed5b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannelHandle.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannelHandle.java @@ -24,14 +24,11 @@ import org.apache.doris.thrift.TStatusCode; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.concurrent.atomic.AtomicBoolean; - public class SyncChannelHandle implements SyncChannelCallback { private Logger LOG = LogManager.getLogger(SyncChannelHandle.class); // channel id -> dummy value(-1) private MarkedCountDownLatch latch; - private Sync sync = new Sync(); public void reset(int size) { this.latch = new MarkedCountDownLatch<>(size); @@ -41,19 +38,6 @@ public class SyncChannelHandle implements SyncChannelCallback { latch.addMark(channel.getId(), -1L); } - public void set(Boolean mutex) { - if (mutex) { - this.sync.innerSetTrue(); - } else { - this.sync.innerSetFalse(); - } - } - - @Override - public boolean state() { - return this.sync.innerState(); - } - @Override public void onFinished(long channelId) { this.latch.markedCountDown(channelId, -1L); @@ -71,41 +55,4 @@ public class SyncChannelHandle implements SyncChannelCallback { public Status getStatus() { return latch.getStatus(); } - - // This class describes the inner state. - private final class Sync { - private AtomicBoolean state; - - boolean innerState() { - return this.state.get(); - } - - public boolean getState() { - return state.get(); - } - - void innerSetTrue() { - boolean s; - do { - s = getState(); - if (s) { - return; - } - } while(!state.compareAndSet(s, true)); - } - - void innerSetFalse() { - boolean s; - do { - s = getState(); - if (!s) { - return; - } - } while(!state.compareAndSet(s, false)); - } - - private Sync() { - state = new AtomicBoolean(false); - } - } } \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java index 830c449f41..78da903d11 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java @@ -25,6 +25,7 @@ import org.apache.doris.task.MasterTaskExecutor; import com.google.common.collect.Maps; +import org.apache.doris.task.SyncPendingTask; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -67,7 +68,7 @@ public class SyncChecker extends MasterDaemon { @Override protected void runAfterCatalogReady() { - LOG.debug("start check export jobs. job state: {}", jobState.name()); + LOG.debug("start check sync jobs. job state: {}", jobState.name()); switch (jobState) { case PENDING: runPendingJobs(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJob.java index 3ce5b48b0d..9ceb04453e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJob.java @@ -307,6 +307,10 @@ public abstract class SyncJob implements Writable { public void setChannelDescriptions(List channelDescriptions) { this.channelDescriptions = channelDescriptions; + // set channel id + for (ChannelDescription channelDescription : channelDescriptions) { + channelDescription.setChannelId(Catalog.getCurrentCatalog().getNextId()); + } } public long getId() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncLifeCycle.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncLifeCycle.java index 3c98137fa7..9109c92bed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncLifeCycle.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncLifeCycle.java @@ -62,6 +62,11 @@ public abstract class SyncLifeCycle { this.running = false; if (thread != null) { + // Deadlock prevention + if (thread == Thread.currentThread()) { + return; + } + try { thread.join(); } catch (InterruptedException e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java index 8b5dc54c70..266d52f5d1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java @@ -32,6 +32,8 @@ import org.apache.doris.load.sync.model.Data; import org.apache.doris.proto.InternalService; import org.apache.doris.qe.InsertStreamTxnExecutor; import org.apache.doris.service.FrontendOptions; +import org.apache.doris.task.SyncTask; +import org.apache.doris.task.SyncTaskPool; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TMergeType; @@ -49,7 +51,6 @@ import com.alibaba.otter.canal.protocol.CanalEntry; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import com.google.common.collect.Queues; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -58,8 +59,6 @@ import org.apache.thrift.TException; import java.util.List; import java.util.UUID; import java.util.concurrent.ExecutionException; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class CanalSyncChannel extends SyncChannel { @@ -69,44 +68,47 @@ public class CanalSyncChannel extends SyncChannel { private static final String DELETE_CONDITION = DELETE_COLUMN + "=1"; private static final String NULL_VALUE_FOR_LOAD = "\\N"; + private final int index; + private long timeoutSecond; private long lastBatchId; - private LinkedBlockingQueue> pendingQueue; + private Data batchBuffer; private InsertStreamTxnExecutor txnExecutor; - public CanalSyncChannel(SyncJob syncJob, Database db, OlapTable table, List columns, String srcDataBase, String srcTable) { - super(syncJob, db, table, columns, srcDataBase, srcTable); + public CanalSyncChannel(long id, SyncJob syncJob, Database db, OlapTable table, List columns, String srcDataBase, String srcTable) { + super(id, syncJob, db, table, columns, srcDataBase, srcTable); + this.index = SyncTaskPool.getNextIndex(); this.batchBuffer = new Data<>(); - this.pendingQueue = Queues.newLinkedBlockingQueue(128); this.lastBatchId = -1L; this.timeoutSecond = -1L; } - public void process() { - while (running) { - if (!isTxnInit()) { - continue; - } - // if txn has begun, send all data in queue - if (isTxnBegin()) { - while (!pendingQueue.isEmpty()) { - try { - Data rows = pendingQueue.poll(CanalConfigs.channelWaitingTimeoutMs, TimeUnit.MILLISECONDS); - if (rows != null) { - sendData(rows); - } - } catch (Exception e) { - String errMsg = "encounter exception in channel, channel " + id + ", " + - "msg: " + e.getMessage() + ", table: " + targetTable; - LOG.error(errMsg); - callback.onFailed(errMsg); - } - } - } - if (callback.state()) { - callback.onFinished(id); - } + private final static class SendTask extends SyncTask { + private final InsertStreamTxnExecutor executor; + private final Data rows; + + public SendTask(long signature, int index, SyncChannelCallback callback, Data rows, InsertStreamTxnExecutor executor) { + super(signature, index, callback); + this.executor = executor; + this.rows = rows; + } + + public void exec() throws Exception { + TransactionEntry txnEntry = executor.getTxnEntry(); + txnEntry.setDataToSend(rows.getDatas()); + executor.sendData(); + } + } + + private final static class EOFTask extends SyncTask { + + public EOFTask(long signature, int index, SyncChannelCallback callback) { + super(signature, index, callback); + } + + public void exec() throws Exception { + callback.onFinished(signature); } } @@ -189,10 +191,10 @@ public class CanalSyncChannel extends SyncChannel { throw e; } finally { this.batchBuffer = new Data<>(); - this.pendingQueue.clear(); updateBatchId(-1L); } } + @Override public void commitTxn() throws TException, TimeoutException, InterruptedException, ExecutionException { if (!isTxnBegin()) { @@ -213,10 +215,10 @@ public class CanalSyncChannel extends SyncChannel { throw e; } finally { this.batchBuffer = new Data<>(); - this.pendingQueue.clear(); updateBatchId(-1L); } } + @Override public void initTxn(long timeoutSecond) { if (!isTxnInit()) { @@ -254,7 +256,12 @@ public class CanalSyncChannel extends SyncChannel { } } - private void execute(long batchId, CanalEntry.EventType eventType, List columns) { + public void submitEOF() { + EOFTask task = new EOFTask(id, index, callback); + SyncTaskPool.submit(task); + } + + public void execute(long batchId, CanalEntry.EventType eventType, List columns) { InternalService.PDataRow row = parseRow(eventType, columns); try { Preconditions.checkState(isTxnInit()); @@ -262,7 +269,8 @@ public class CanalSyncChannel extends SyncChannel { if (!isTxnBegin()) { beginTxn(batchId); } else { - this.pendingQueue.put(this.batchBuffer); + SendTask task = new SendTask(id, index, callback, batchBuffer, txnExecutor); + SyncTaskPool.submit(task); this.batchBuffer = new Data<>(); } updateBatchId(batchId); @@ -294,19 +302,13 @@ public class CanalSyncChannel extends SyncChannel { return row.build(); } - private void sendData(Data rows) throws TException, TimeoutException, - InterruptedException, ExecutionException { - Preconditions.checkState(isTxnBegin()); - TransactionEntry txnEntry = txnExecutor.getTxnEntry(); - txnEntry.setDataToSend(rows.getDatas()); - this.txnExecutor.sendData(); - } - public void flushData() throws TException, TimeoutException, InterruptedException, ExecutionException { - if (batchBuffer.isNotEmpty()) { - sendData(batchBuffer); - batchBuffer = new Data<>(); + if (this.batchBuffer.isNotEmpty()) { + TransactionEntry txnEntry = txnExecutor.getTxnEntry(); + txnEntry.setDataToSend(batchBuffer.getDatas()); + this.txnExecutor.sendData(); + this.batchBuffer = new Data<>(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncDataConsumer.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncDataConsumer.java index 10c393bb4c..bdfc7327d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncDataConsumer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncDataConsumer.java @@ -102,7 +102,6 @@ public class CanalSyncDataConsumer extends SyncDataConsumer { @Override public void beginForTxn() { - handle.set(false); handle.reset(idToChannels.size()); for (CanalSyncChannel channel : idToChannels.values()) { channel.initTxn(Config.max_stream_load_timeout_second); @@ -161,15 +160,16 @@ public class CanalSyncDataConsumer extends SyncDataConsumer { } public Status waitForTxn() { + for (CanalSyncChannel channel : idToChannels.values()) { + channel.submitEOF(); + } + Status st = Status.CANCELLED; - handle.set(true); try { handle.join(); st = handle.getStatus(); } catch (InterruptedException e) { logger.warn("InterruptedException: ", e); - } finally { - handle.set(false); } return st; } @@ -190,7 +190,7 @@ public class CanalSyncDataConsumer extends SyncDataConsumer { long totalMemSize = 0L; long startTime = System.currentTimeMillis(); beginForTxn(); - while (true) { + while (running) { Events dataEvents = null; try { dataEvents = dataBlockingQueue.poll(CanalConfigs.pollWaitingTimeoutMs, TimeUnit.MILLISECONDS); @@ -227,7 +227,12 @@ public class CanalSyncDataConsumer extends SyncDataConsumer { break; } } + Status st = waitForTxn(); + if (!running) { + abortForTxn("stopping client"); + continue; + } if (st.ok()) { commitForTxn(); } else { @@ -260,7 +265,7 @@ public class CanalSyncDataConsumer extends SyncDataConsumer { } int startIndex = 0; - // if last ack position is null, it is the first time to consume batch (startOffset = 0) + // if last ack position is null, it is the first time to consume batch EntryPosition lastAckPosition = positionMeta.getAckPosition(); if (lastAckPosition != null) { EntryPosition firstPosition = EntryPosition.createPosition(entries.get(0)); @@ -303,14 +308,18 @@ public class CanalSyncDataConsumer extends SyncDataConsumer { EntryPosition startPosition = dataEvents.getPositionRange().getStart(); EntryPosition endPosition = dataEvents.getPositionRange().getEnd(); for (CanalSyncChannel channel : idToChannels.values()) { + String key = CanalUtils.getFullName(channel.getSrcDataBase(), channel.getSrcTable()); + // if last commit position is null, it is the first time to execute batch EntryPosition commitPosition = positionMeta.getCommitPosition(channel.getId()); - String key = channel.getSrcDataBase() + "." + channel.getSrcTable(); - if (commitPosition.compareTo(startPosition) < 0) { + if (commitPosition != null) { + if (commitPosition.compareTo(startPosition) < 0) { + preferChannels.put(key, channel); + } else if (commitPosition.compareTo(endPosition) < 0) { + secondaryChannels.put(key, channel); + } + } else { preferChannels.put(key, channel); } - else if (commitPosition.compareTo(endPosition) < 0) { - secondaryChannels.put(key, channel); - } } // distribute data to channels @@ -405,13 +414,16 @@ public class CanalSyncDataConsumer extends SyncDataConsumer { private void rollback() { holdGetLock(); try { - connector.rollback(); // Wait for the receiver to put the last message into the queue before clearing queue try { Thread.sleep(1000L); } catch (InterruptedException e) { // ignore } + + if (!ackBatches.isEmpty()) { + connector.rollback(); + } } finally { releaseGetLock(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncJob.java index dce10bab55..c22b9f8e46 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncJob.java @@ -109,8 +109,8 @@ public class CanalSyncJob extends SyncJob { colNames.add(column.getName()); } } - CanalSyncChannel syncChannel = new CanalSyncChannel(this, db, olapTable, colNames, - channelDescription.getSrcDatabase(), channelDescription.getSrcTableName()); + CanalSyncChannel syncChannel = new CanalSyncChannel(channelDescription.getChannelId(), this, db, + olapTable, colNames, channelDescription.getSrcDatabase(), channelDescription.getSrcTableName()); if (channelDescription.getPartitionNames() != null) { syncChannel.setPartitions(channelDescription.getPartitionNames()); } @@ -183,7 +183,9 @@ public class CanalSyncJob extends SyncJob { public void execute() throws UserException { LOG.info("try to start canal client. Remote ip: {}, remote port: {}, debug: {}", ip, port, debug); // init - init(); + if (!isInit()) { + init(); + } // start client unprotectedStartClient(); } @@ -193,10 +195,12 @@ public class CanalSyncJob extends SyncJob { LOG.info("Cancel canal sync job {}. MsgType: {}, errMsg: {}", id, msgType.name(), errMsg); failMsg = new SyncFailMsg(msgType, errMsg); switch (msgType) { - case USER_CANCEL: case SUBMIT_FAIL: case RUN_FAIL: + unprotectedStopClient(JobState.PAUSED); + break; case UNKNOWN: + case USER_CANCEL: unprotectedStopClient(JobState.CANCELLED); break; default: @@ -228,11 +232,7 @@ public class CanalSyncJob extends SyncJob { return; } if (client != null) { - if (jobState == JobState.CANCELLED) { - client.shutdown(true); - } else { - client.shutdown(false); - } + client.shutdown(true); } updateState(jobState, false); LOG.info("client has been stopped. id: {}, jobName: {}" , id, jobName); @@ -251,15 +251,12 @@ public class CanalSyncJob extends SyncJob { JobState jobState = info.getJobState(); switch (jobState) { case RUNNING: - client.startup(); - updateState(JobState.RUNNING, true); + updateState(JobState.PENDING, true); break; case PAUSED: - client.shutdown(false); updateState(JobState.PAUSED, true); break; case CANCELLED: - client.shutdown(true); updateState(JobState.CANCELLED, true); break; } @@ -300,4 +297,4 @@ public class CanalSyncJob extends SyncJob { + ", finishTimeMs=" + TimeUtils.longToTimeString(finishTimeMs) + "]"; } -} \ No newline at end of file +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/SyncCanalClient.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/SyncCanalClient.java index 33cb8cf8b7..421c46d5e2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/SyncCanalClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/SyncCanalClient.java @@ -53,8 +53,6 @@ public class SyncCanalClient { lock.unlock(); } - private ShutDownWorker shutDownWorker; - public SyncCanalClient(CanalSyncJob syncJob, String destination, CanalConnector connector, int batchSize, boolean debug) { this(syncJob, destination, connector, batchSize, debug, ".*\\..*"); } @@ -71,13 +69,9 @@ public class SyncCanalClient { Preconditions.checkState(!idToChannels.isEmpty(), "no channel is registered"); lock(); try { - // 1.start all threads in channel - for (CanalSyncChannel channel : idToChannels.values()) { - channel.start(); - } - // 2. start executor + // 1. start executor consumer.start(); - // 3. start receiver + // 2. start receiver receiver.start(); } finally { unlock(); @@ -85,43 +79,17 @@ public class SyncCanalClient { logger.info("canal client has been started."); } - // Stop client asynchronously public void shutdown(boolean needCleanUp) { - this.shutDownWorker = new ShutDownWorker(needCleanUp); - shutDownWorker.shutdown(); - logger.info("canal client shutdown worker has been started."); - } - - public class ShutDownWorker implements Runnable { - public Thread thread; - public boolean needCleanUp; - - public ShutDownWorker(boolean needCleanUp) { - this.thread = new Thread(this, "ShutDownWorker"); - this.needCleanUp = needCleanUp; - } - - public void shutdown() { - thread.start(); - } - - @Override - public void run() { - lock(); - try { - // 1. stop receiver - receiver.stop(); - // 2. stop executor - consumer.stop(needCleanUp); - // 3. stop channels - for (CanalSyncChannel channel : idToChannels.values()) { - channel.stop(); - } - } finally { - unlock(); - } - logger.info("canal client has been stopped."); + lock(); + try { + // 1. stop receiver + receiver.stop(); + // 2. stop executor + consumer.stop(needCleanUp); + } finally { + unlock(); } + logger.info("canal client has been stopped."); } public void registerChannels(List channels) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/position/PositionMeta.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/position/PositionMeta.java index 4d68315c33..d4d8f718da 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/position/PositionMeta.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/position/PositionMeta.java @@ -38,6 +38,7 @@ public class PositionMeta { this.batches = Maps.newHashMap(); this.commitPositions = Maps.newHashMap(); } + public void addBatch(long batchId, PositionRange range) { updateMaxBatchId(batchId); batches.put(batchId, range); @@ -76,7 +77,7 @@ public class PositionMeta { } public T getLatestPosition() { - if (batches.isEmpty()) { + if (!batches.containsKey(maxBatchId)) { return null; } else { return batches.get(maxBatchId).getEnd(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java index 85ddd1f7c2..97153954df 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java @@ -159,9 +159,10 @@ public class InsertStreamTxnExecutor { if (code != TStatusCode.OK) { throw new TException("failed to insert data: " + result.getStatus().getErrorMsgsList()); } - txnEntry.clearDataToSend(); } catch (RpcException e) { throw new TException(e); + } finally { + txnEntry.clearDataToSend(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/SerialExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/task/SerialExecutor.java new file mode 100644 index 0000000000..22d3a3d364 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/task/SerialExecutor.java @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.task; + +import com.google.common.base.Preconditions; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +public class SerialExecutor extends AbstractExecutorService { + + private final ExecutorService taskPool; + + private final ReentrantLock lock = new ReentrantLock(); + private final Condition terminating = lock.newCondition(); + + private final BlockingQueue tasks = new LinkedBlockingQueue<>(); + private Runnable active; + + private boolean shutdown; + + public SerialExecutor(final ExecutorService executor) { + Preconditions.checkNotNull(executor); + this.taskPool = executor; + } + + public void execute(final Runnable r) { + lock.lock(); + try { + checkPoolIsRunning(); + tasks.add(new Runnable() { + public void run() { + try { + r.run(); + } finally { + scheduleNext(); + } + } + }); + if (active == null) { + scheduleNext(); + } + } finally { + lock.unlock(); + } + } + + private void checkPoolIsRunning() { + Preconditions.checkState(lock.isHeldByCurrentThread()); + if (shutdown) { + throw new RejectedExecutionException("SerialExecutor is already shutdown"); + } + } + + public void shutdown() { + lock.lock(); + try { + shutdown = true; + } finally { + lock.unlock(); + } + } + + public List shutdownNow() { + lock.lock(); + try { + shutdown = true; + List result = new ArrayList<>(); + tasks.drainTo(result); + return result; + } finally { + lock.unlock(); + } + } + + public boolean isShutdown() { + lock.lock(); + try { + return shutdown; + } finally { + lock.unlock(); + } + } + + public boolean isTerminated() { + lock.lock(); + try { + return shutdown && active == null; + } finally { + lock.unlock(); + } + } + + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + lock.lock(); + try { + long waitUntil = System.nanoTime() + unit.toNanos(timeout); + long remainingTime; + while ((remainingTime = waitUntil - System.nanoTime()) > 0) { + if (shutdown && active == null) { + break; + } + terminating.awaitNanos(remainingTime); + } + return remainingTime > 0; + } finally { + lock.unlock(); + } + } + + private void scheduleNext() { + lock.lock(); + try { + if ((active = tasks.poll()) != null) { + taskPool.execute(active); + } else if (shutdown) { + terminating.signalAll(); + } + } finally { + lock.unlock(); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/SerialExecutorService.java b/fe/fe-core/src/main/java/org/apache/doris/task/SerialExecutorService.java new file mode 100644 index 0000000000..dcdf2690e8 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/task/SerialExecutorService.java @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.task; + +import org.apache.doris.common.ThreadPoolManager; + +import java.util.concurrent.ExecutorService; + +/** + * This executor service ensures that all tasks submitted to + * the same slot are executed in the order of submission. + */ +public class SerialExecutorService { + + public interface SerialRunnable extends Runnable { + int getIndex(); + } + + private final int numOfSlots; + private final ExecutorService taskPool; + private final SerialExecutor[] slots; + + private SerialExecutorService(int numOfSlots, ExecutorService taskPool) { + this.numOfSlots = numOfSlots; + this.slots = new SerialExecutor[numOfSlots]; + this.taskPool = taskPool; + for (int i = 0; i < numOfSlots; i++) { + slots[i] = new SerialExecutor(taskPool); + } + } + + public SerialExecutorService(int numOfSlots) { + this(numOfSlots, ThreadPoolManager.newDaemonFixedThreadPool( + numOfSlots, 256, "sync-task-pool", true)); + } + + public void submit(Runnable command) { + int index = getIndex(command); + if (isSlotIndex(index)) { + SerialExecutor serialEx = slots[index]; + serialEx.execute(command); + } else { + taskPool.execute(command); + } + } + + private int getIndex(Runnable command) { + int index = -1; + if (command instanceof SerialRunnable) { + index = (((SerialRunnable) command).getIndex()); + } + return index; + } + + private boolean isSlotIndex(int index) { + return index >= 0 && index < numOfSlots; + } + + public void close() { + for (int i = 0; i < numOfSlots; i++) { + final SerialExecutor serialEx = slots[i]; + serialEx.shutdown(); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncPendingTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/SyncPendingTask.java similarity index 95% rename from fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncPendingTask.java rename to fe/fe-core/src/main/java/org/apache/doris/task/SyncPendingTask.java index e9b7695233..c61d9e69c4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncPendingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/SyncPendingTask.java @@ -15,12 +15,12 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.load.sync; +package org.apache.doris.task; import org.apache.doris.common.UserException; import org.apache.doris.load.sync.SyncFailMsg.MsgType; +import org.apache.doris.load.sync.SyncJob; import org.apache.doris.load.sync.SyncJob.JobState; -import org.apache.doris.task.MasterTask; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/SyncTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/SyncTask.java new file mode 100644 index 0000000000..cdae68b0ea --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/task/SyncTask.java @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.task; + +import org.apache.doris.load.sync.SyncChannelCallback; +import org.apache.doris.task.SerialExecutorService.SerialRunnable; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * SyncTask is a runnable to submit to SerialExecutorService. Each + * SyncTask will have an index to submit to the corresponding slot + * in the SerialExecutorService. And SerialExecutorService ensures + * that all SyncTasks submitted with the same index are always + * executed in the order of submission. + */ +public abstract class SyncTask implements SerialRunnable { + private static final Logger LOG = LogManager.getLogger(SyncTask.class); + + protected long signature; + /** + * Each index corresponds to a slot in the SerialExecutorService. + * It should only be assigned by the getNextIndex() method in the + * SyncTaskPool. SyncTasks with the same index are always executed + * in the order of submission. + */ + protected int index; + protected SyncChannelCallback callback; + + public SyncTask(long signature, int index, SyncChannelCallback callback) { + this.signature = signature; + this.index = index; + this.callback = callback; + } + + @Override + public void run() { + try { + exec(); + } catch (Exception e) { + String errMsg = "channel " + signature + ", " + "msg: " + e.getMessage(); + LOG.error("sync task exec error: {}", errMsg); + callback.onFailed(errMsg); + } + } + + public int getIndex() { + return this.index; + } + + /** + * implement in child + */ + protected abstract void exec() throws Exception; +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/SyncTaskPool.java b/fe/fe-core/src/main/java/org/apache/doris/task/SyncTaskPool.java new file mode 100644 index 0000000000..fb2309d49b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/task/SyncTaskPool.java @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.task; + +import org.apache.doris.common.Config; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.IntUnaryOperator; + +public class SyncTaskPool { + private static final int NUM_OF_SLOTS = Config.max_sync_task_threads_num; + private static final SerialExecutorService EXECUTOR = new SerialExecutorService(NUM_OF_SLOTS); + private static final AtomicInteger nextIndex = new AtomicInteger(); + + public static void submit(Runnable task) { + if (task == null) { + return; + } + EXECUTOR.submit(task); + } + + /** + * Gets the next index loop from 0 to @NUM_OF_SLOTS - 1 + */ + public static int getNextIndex() { + return nextIndex.updateAndGet(new IntUnaryOperator() { + @Override + public int applyAsInt(int operand) { + if (++operand >= NUM_OF_SLOTS) { + operand = 0; + } + return operand; + } + }); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java index 8a893c1d6b..343a0fff4f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java @@ -73,6 +73,7 @@ public class CanalSyncDataTest { private long offset = 0; private long nextId = 1000L; private int batchSize = 8192; + private long channelId = 100001L; ReentrantLock getLock; @@ -220,13 +221,12 @@ public class CanalSyncDataTest { CanalSyncDataReceiver receiver = new CanalSyncDataReceiver( syncJob, connector, "test", "mysql_db.mysql_tbl", consumer, 8192, getLock); CanalSyncChannel channel = new CanalSyncChannel( - syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl"); + channelId, syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl"); Map idToChannels = Maps.newHashMap(); idToChannels.put(channel.getId(), channel); consumer.setChannels(idToChannels); - channel.start(); consumer.start(); receiver.start(); @@ -235,7 +235,6 @@ public class CanalSyncDataTest { } finally { receiver.stop(); consumer.stop(); - channel.stop(); } Assert.assertEquals("position:N/A", consumer.getPositionInfo()); @@ -295,13 +294,12 @@ public class CanalSyncDataTest { CanalSyncDataReceiver receiver = new CanalSyncDataReceiver( syncJob, connector, "test", "mysql_db.mysql_tbl", consumer, 8192, getLock); CanalSyncChannel channel = new CanalSyncChannel( - syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl"); + channelId, syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl"); Map idToChannels = Maps.newHashMap(); idToChannels.put(channel.getId(), channel); consumer.setChannels(idToChannels); - channel.start(); consumer.start(); receiver.start(); @@ -310,7 +308,6 @@ public class CanalSyncDataTest { } finally { receiver.stop(); consumer.stop(); - channel.stop(); } LOG.info(consumer.getPositionInfo()); @@ -360,13 +357,12 @@ public class CanalSyncDataTest { CanalSyncDataReceiver receiver = new CanalSyncDataReceiver( syncJob, connector, "test", "mysql_db.mysql_tbl", consumer, 8192, getLock); CanalSyncChannel channel = new CanalSyncChannel( - syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl"); + channelId, syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl"); Map idToChannels = Maps.newHashMap(); idToChannels.put(channel.getId(), channel); consumer.setChannels(idToChannels); - channel.start(); consumer.start(); receiver.start(); @@ -375,7 +371,6 @@ public class CanalSyncDataTest { } finally { receiver.stop(); consumer.stop(); - channel.stop(); } Assert.assertEquals("position:N/A", consumer.getPositionInfo()); @@ -444,13 +439,12 @@ public class CanalSyncDataTest { CanalSyncDataReceiver receiver = new CanalSyncDataReceiver( syncJob, connector, "test", "mysql_db.mysql_tbl", consumer, 8192, getLock); CanalSyncChannel channel = new CanalSyncChannel( - syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl"); + channelId, syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl"); Map idToChannels = Maps.newHashMap(); idToChannels.put(channel.getId(), channel); consumer.setChannels(idToChannels); - channel.start(); consumer.start(); receiver.start(); @@ -459,7 +453,6 @@ public class CanalSyncDataTest { } finally { receiver.stop(); consumer.stop(); - channel.stop(); } Assert.assertEquals("position:N/A", consumer.getPositionInfo()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/task/SerialExecutorServiceTest.java b/fe/fe-core/src/test/java/org/apache/doris/task/SerialExecutorServiceTest.java new file mode 100644 index 0000000000..cc9f500cec --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/task/SerialExecutorServiceTest.java @@ -0,0 +1,133 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.task; + +import org.apache.doris.load.sync.SyncChannelCallback; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class SerialExecutorServiceTest { + private static final Logger LOG = LoggerFactory.getLogger(MasterTaskExecutorTest.class); + private static final int NUM_OF_SLOTS = 10; + private static final int THREAD_NUM = 10; + + private static SerialExecutorService taskPool; + // thread signature -> tasks submit serial + private static Map> submitSerial; + // thread signature -> tasks execute serial + private static Map> execSerial; + + @Before + public void setUp() { + taskPool = new SerialExecutorService(NUM_OF_SLOTS); + submitSerial = new ConcurrentHashMap<>(); + execSerial = new ConcurrentHashMap<>(); + } + + @After + public void tearDown() { + if (taskPool != null) { + taskPool.close(); + } + } + + @Test + public void testSubmit() { + for (long i = 0; i < THREAD_NUM; i++) { + if (!submitSerial.containsKey(i)) { + submitSerial.put(i, new ArrayList<>()); + } + SubmitThread thread = new SubmitThread("Thread-" + i, i, submitSerial.get(i)); + thread.start(); + } + + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + + // The submission order of the same signature should be equal to the execution order + Assert.assertEquals(submitSerial.size(), THREAD_NUM); + Assert.assertEquals(submitSerial.size(), execSerial.size()); + for (long i = 0; i < THREAD_NUM; i++) { + Assert.assertTrue(submitSerial.containsKey(i)); + Assert.assertTrue(execSerial.containsKey(i)); + List submitSerialList = submitSerial.get(i); + List execSerialList = execSerial.get(i); + Assert.assertEquals(submitSerialList.size(), execSerialList.size()); + for (int j = 0; j < submitSerialList.size(); j++) { + Assert.assertEquals(submitSerialList.get(j), execSerialList.get(j)); + } + } + } + + private static class TestSyncTask extends SyncTask { + public int serial; + + public TestSyncTask(long signature, int index, int serial, SyncChannelCallback callback) { + super(signature, index, callback); + this.serial = serial; + } + + @Override + protected void exec() { + LOG.info("run exec. signature: {}, index: {}, serial: {}", signature, index, serial); + if (!execSerial.containsKey(signature)) { + execSerial.put(signature, new ArrayList<>()); + } + execSerial.get(signature).add(serial); + } + } + + private static class SubmitThread extends Thread { + private int index = SyncTaskPool.getNextIndex(); + private long signature; + private List submitSerialList; + + public SubmitThread(String name, long signature, List submitSerialList) { + super(name); + this.signature = signature; + this.submitSerialList = submitSerialList; + } + + public void run() { + for (int i = 0; i < 100; i++) { + TestSyncTask task = new TestSyncTask(signature, index, i, new SyncChannelCallback() { + @Override + public void onFinished(long channelId) { + } + @Override + public void onFailed(String errMsg) { + } + }); + submitSerialList.add(i); + taskPool.submit(task); + } + } + } +}