diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 5a62e6704c..c8d28a4016 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -5264,6 +5264,8 @@ keyword ::= {: RESULT = id; :} | KW_ISOLATION:id {: RESULT = id; :} + | KW_JOB:id + {: RESULT = id; :} | KW_ENCRYPTKEY:id {: RESULT = id; :} | KW_ENCRYPTKEYS:id diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ChannelDescription.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ChannelDescription.java index 51eae33f54..c926fae2b1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ChannelDescription.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ChannelDescription.java @@ -60,6 +60,8 @@ public class ChannelDescription implements Writable { // column names of source table @SerializedName(value = "colNames") private final List colNames; + @SerializedName(value = "channelId") + private long channelId; public ChannelDescription(String srcDatabase, String srcTableName, String targetTable, PartitionNames partitionNames, List colNames) { this.srcDatabase = srcDatabase; @@ -119,6 +121,14 @@ public class ChannelDescription implements Writable { } } + public void setChannelId(long channelId) { + this.channelId = channelId; + } + + public long getChannelId() { + return this.channelId; + } + public String getTargetTable() { return targetTable; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index 265e1da8ac..1c4414958f 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -1318,7 +1318,7 @@ public class Catalog { ExportChecker.init(Config.export_checker_interval_second * 1000L); ExportChecker.startAll(); // Sync checker - SyncChecker.init(Config.sync_checker_interval_second); + SyncChecker.init(Config.sync_checker_interval_second * 1000L); SyncChecker.startAll(); // Tablet checker and scheduler tabletChecker.start(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 2eaf139268..645e1d4929 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -635,6 +635,11 @@ public class Config extends ConfigBase { */ @ConfField public static int sync_checker_interval_second = 5; + /** + * max num of thread to handle sync task in sync task thread-pool. + */ + @ConfField public static int max_sync_task_threads_num = 10; + /** * Default number of waiting jobs for routine load and version 2 of load * This is a desired number. diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannel.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannel.java index 85644b13f8..8a47385816 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannel.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannel.java @@ -18,7 +18,6 @@ package org.apache.doris.load.sync; import org.apache.doris.analysis.PartitionNames; -import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.OlapTable; import org.apache.doris.common.UserException; @@ -32,7 +31,7 @@ import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; -public class SyncChannel extends SyncLifeCycle { +public class SyncChannel { private static final Logger LOG = LogManager.getLogger(SyncChannel.class); protected long id; @@ -46,8 +45,8 @@ public class SyncChannel extends SyncLifeCycle { protected String srcTable; protected SyncChannelCallback callback; - public SyncChannel(SyncJob syncJob, Database db, OlapTable table, List columns, String srcDataBase, String srcTable) { - this.id = Catalog.getCurrentCatalog().getNextId(); + public SyncChannel(long id, SyncJob syncJob, Database db, OlapTable table, List columns, String srcDataBase, String srcTable) { + this.id = id; this.jobId = syncJob.getId(); this.db = db; this.tbl = table; @@ -57,22 +56,6 @@ public class SyncChannel extends SyncLifeCycle { this.srcTable = srcTable.toLowerCase(); } - @Override - public void start() { - super.start(); - LOG.info("channel {} has been started. dest table: {}, mysql src table: {}.{}", id, targetTable, srcDataBase, srcTable); - } - - @Override - public void stop() { - super.stop(); - LOG.info("channel {} has been stopped. dest table: {}, mysql src table: {}.{}", id, targetTable, srcDataBase, srcTable); - } - - @Override - public void process() { - } - public void beginTxn(long batchId) throws UserException, TException, TimeoutException, InterruptedException, ExecutionException { } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannelCallback.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannelCallback.java index 8b2f239236..2cdf717467 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannelCallback.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannelCallback.java @@ -19,8 +19,6 @@ package org.apache.doris.load.sync; public interface SyncChannelCallback { - public boolean state(); - public void onFinished(long channelId); public void onFailed(String errMsg); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannelHandle.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannelHandle.java index 4e3a397c99..8f7721ed5b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannelHandle.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannelHandle.java @@ -24,14 +24,11 @@ import org.apache.doris.thrift.TStatusCode; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.concurrent.atomic.AtomicBoolean; - public class SyncChannelHandle implements SyncChannelCallback { private Logger LOG = LogManager.getLogger(SyncChannelHandle.class); // channel id -> dummy value(-1) private MarkedCountDownLatch latch; - private Sync sync = new Sync(); public void reset(int size) { this.latch = new MarkedCountDownLatch<>(size); @@ -41,19 +38,6 @@ public class SyncChannelHandle implements SyncChannelCallback { latch.addMark(channel.getId(), -1L); } - public void set(Boolean mutex) { - if (mutex) { - this.sync.innerSetTrue(); - } else { - this.sync.innerSetFalse(); - } - } - - @Override - public boolean state() { - return this.sync.innerState(); - } - @Override public void onFinished(long channelId) { this.latch.markedCountDown(channelId, -1L); @@ -71,41 +55,4 @@ public class SyncChannelHandle implements SyncChannelCallback { public Status getStatus() { return latch.getStatus(); } - - // This class describes the inner state. - private final class Sync { - private AtomicBoolean state; - - boolean innerState() { - return this.state.get(); - } - - public boolean getState() { - return state.get(); - } - - void innerSetTrue() { - boolean s; - do { - s = getState(); - if (s) { - return; - } - } while(!state.compareAndSet(s, true)); - } - - void innerSetFalse() { - boolean s; - do { - s = getState(); - if (!s) { - return; - } - } while(!state.compareAndSet(s, false)); - } - - private Sync() { - state = new AtomicBoolean(false); - } - } } \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java index 830c449f41..78da903d11 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java @@ -25,6 +25,7 @@ import org.apache.doris.task.MasterTaskExecutor; import com.google.common.collect.Maps; +import org.apache.doris.task.SyncPendingTask; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -67,7 +68,7 @@ public class SyncChecker extends MasterDaemon { @Override protected void runAfterCatalogReady() { - LOG.debug("start check export jobs. job state: {}", jobState.name()); + LOG.debug("start check sync jobs. job state: {}", jobState.name()); switch (jobState) { case PENDING: runPendingJobs(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJob.java index 3ce5b48b0d..9ceb04453e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJob.java @@ -307,6 +307,10 @@ public abstract class SyncJob implements Writable { public void setChannelDescriptions(List channelDescriptions) { this.channelDescriptions = channelDescriptions; + // set channel id + for (ChannelDescription channelDescription : channelDescriptions) { + channelDescription.setChannelId(Catalog.getCurrentCatalog().getNextId()); + } } public long getId() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncLifeCycle.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncLifeCycle.java index 3c98137fa7..9109c92bed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncLifeCycle.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncLifeCycle.java @@ -62,6 +62,11 @@ public abstract class SyncLifeCycle { this.running = false; if (thread != null) { + // Deadlock prevention + if (thread == Thread.currentThread()) { + return; + } + try { thread.join(); } catch (InterruptedException e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java index 8b5dc54c70..266d52f5d1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java @@ -32,6 +32,8 @@ import org.apache.doris.load.sync.model.Data; import org.apache.doris.proto.InternalService; import org.apache.doris.qe.InsertStreamTxnExecutor; import org.apache.doris.service.FrontendOptions; +import org.apache.doris.task.SyncTask; +import org.apache.doris.task.SyncTaskPool; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TMergeType; @@ -49,7 +51,6 @@ import com.alibaba.otter.canal.protocol.CanalEntry; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import com.google.common.collect.Queues; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -58,8 +59,6 @@ import org.apache.thrift.TException; import java.util.List; import java.util.UUID; import java.util.concurrent.ExecutionException; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class CanalSyncChannel extends SyncChannel { @@ -69,44 +68,47 @@ public class CanalSyncChannel extends SyncChannel { private static final String DELETE_CONDITION = DELETE_COLUMN + "=1"; private static final String NULL_VALUE_FOR_LOAD = "\\N"; + private final int index; + private long timeoutSecond; private long lastBatchId; - private LinkedBlockingQueue> pendingQueue; + private Data batchBuffer; private InsertStreamTxnExecutor txnExecutor; - public CanalSyncChannel(SyncJob syncJob, Database db, OlapTable table, List columns, String srcDataBase, String srcTable) { - super(syncJob, db, table, columns, srcDataBase, srcTable); + public CanalSyncChannel(long id, SyncJob syncJob, Database db, OlapTable table, List columns, String srcDataBase, String srcTable) { + super(id, syncJob, db, table, columns, srcDataBase, srcTable); + this.index = SyncTaskPool.getNextIndex(); this.batchBuffer = new Data<>(); - this.pendingQueue = Queues.newLinkedBlockingQueue(128); this.lastBatchId = -1L; this.timeoutSecond = -1L; } - public void process() { - while (running) { - if (!isTxnInit()) { - continue; - } - // if txn has begun, send all data in queue - if (isTxnBegin()) { - while (!pendingQueue.isEmpty()) { - try { - Data rows = pendingQueue.poll(CanalConfigs.channelWaitingTimeoutMs, TimeUnit.MILLISECONDS); - if (rows != null) { - sendData(rows); - } - } catch (Exception e) { - String errMsg = "encounter exception in channel, channel " + id + ", " + - "msg: " + e.getMessage() + ", table: " + targetTable; - LOG.error(errMsg); - callback.onFailed(errMsg); - } - } - } - if (callback.state()) { - callback.onFinished(id); - } + private final static class SendTask extends SyncTask { + private final InsertStreamTxnExecutor executor; + private final Data rows; + + public SendTask(long signature, int index, SyncChannelCallback callback, Data rows, InsertStreamTxnExecutor executor) { + super(signature, index, callback); + this.executor = executor; + this.rows = rows; + } + + public void exec() throws Exception { + TransactionEntry txnEntry = executor.getTxnEntry(); + txnEntry.setDataToSend(rows.getDatas()); + executor.sendData(); + } + } + + private final static class EOFTask extends SyncTask { + + public EOFTask(long signature, int index, SyncChannelCallback callback) { + super(signature, index, callback); + } + + public void exec() throws Exception { + callback.onFinished(signature); } } @@ -189,10 +191,10 @@ public class CanalSyncChannel extends SyncChannel { throw e; } finally { this.batchBuffer = new Data<>(); - this.pendingQueue.clear(); updateBatchId(-1L); } } + @Override public void commitTxn() throws TException, TimeoutException, InterruptedException, ExecutionException { if (!isTxnBegin()) { @@ -213,10 +215,10 @@ public class CanalSyncChannel extends SyncChannel { throw e; } finally { this.batchBuffer = new Data<>(); - this.pendingQueue.clear(); updateBatchId(-1L); } } + @Override public void initTxn(long timeoutSecond) { if (!isTxnInit()) { @@ -254,7 +256,12 @@ public class CanalSyncChannel extends SyncChannel { } } - private void execute(long batchId, CanalEntry.EventType eventType, List columns) { + public void submitEOF() { + EOFTask task = new EOFTask(id, index, callback); + SyncTaskPool.submit(task); + } + + public void execute(long batchId, CanalEntry.EventType eventType, List columns) { InternalService.PDataRow row = parseRow(eventType, columns); try { Preconditions.checkState(isTxnInit()); @@ -262,7 +269,8 @@ public class CanalSyncChannel extends SyncChannel { if (!isTxnBegin()) { beginTxn(batchId); } else { - this.pendingQueue.put(this.batchBuffer); + SendTask task = new SendTask(id, index, callback, batchBuffer, txnExecutor); + SyncTaskPool.submit(task); this.batchBuffer = new Data<>(); } updateBatchId(batchId); @@ -294,19 +302,13 @@ public class CanalSyncChannel extends SyncChannel { return row.build(); } - private void sendData(Data rows) throws TException, TimeoutException, - InterruptedException, ExecutionException { - Preconditions.checkState(isTxnBegin()); - TransactionEntry txnEntry = txnExecutor.getTxnEntry(); - txnEntry.setDataToSend(rows.getDatas()); - this.txnExecutor.sendData(); - } - public void flushData() throws TException, TimeoutException, InterruptedException, ExecutionException { - if (batchBuffer.isNotEmpty()) { - sendData(batchBuffer); - batchBuffer = new Data<>(); + if (this.batchBuffer.isNotEmpty()) { + TransactionEntry txnEntry = txnExecutor.getTxnEntry(); + txnEntry.setDataToSend(batchBuffer.getDatas()); + this.txnExecutor.sendData(); + this.batchBuffer = new Data<>(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncDataConsumer.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncDataConsumer.java index 10c393bb4c..bdfc7327d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncDataConsumer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncDataConsumer.java @@ -102,7 +102,6 @@ public class CanalSyncDataConsumer extends SyncDataConsumer { @Override public void beginForTxn() { - handle.set(false); handle.reset(idToChannels.size()); for (CanalSyncChannel channel : idToChannels.values()) { channel.initTxn(Config.max_stream_load_timeout_second); @@ -161,15 +160,16 @@ public class CanalSyncDataConsumer extends SyncDataConsumer { } public Status waitForTxn() { + for (CanalSyncChannel channel : idToChannels.values()) { + channel.submitEOF(); + } + Status st = Status.CANCELLED; - handle.set(true); try { handle.join(); st = handle.getStatus(); } catch (InterruptedException e) { logger.warn("InterruptedException: ", e); - } finally { - handle.set(false); } return st; } @@ -190,7 +190,7 @@ public class CanalSyncDataConsumer extends SyncDataConsumer { long totalMemSize = 0L; long startTime = System.currentTimeMillis(); beginForTxn(); - while (true) { + while (running) { Events dataEvents = null; try { dataEvents = dataBlockingQueue.poll(CanalConfigs.pollWaitingTimeoutMs, TimeUnit.MILLISECONDS); @@ -227,7 +227,12 @@ public class CanalSyncDataConsumer extends SyncDataConsumer { break; } } + Status st = waitForTxn(); + if (!running) { + abortForTxn("stopping client"); + continue; + } if (st.ok()) { commitForTxn(); } else { @@ -260,7 +265,7 @@ public class CanalSyncDataConsumer extends SyncDataConsumer { } int startIndex = 0; - // if last ack position is null, it is the first time to consume batch (startOffset = 0) + // if last ack position is null, it is the first time to consume batch EntryPosition lastAckPosition = positionMeta.getAckPosition(); if (lastAckPosition != null) { EntryPosition firstPosition = EntryPosition.createPosition(entries.get(0)); @@ -303,14 +308,18 @@ public class CanalSyncDataConsumer extends SyncDataConsumer { EntryPosition startPosition = dataEvents.getPositionRange().getStart(); EntryPosition endPosition = dataEvents.getPositionRange().getEnd(); for (CanalSyncChannel channel : idToChannels.values()) { + String key = CanalUtils.getFullName(channel.getSrcDataBase(), channel.getSrcTable()); + // if last commit position is null, it is the first time to execute batch EntryPosition commitPosition = positionMeta.getCommitPosition(channel.getId()); - String key = channel.getSrcDataBase() + "." + channel.getSrcTable(); - if (commitPosition.compareTo(startPosition) < 0) { + if (commitPosition != null) { + if (commitPosition.compareTo(startPosition) < 0) { + preferChannels.put(key, channel); + } else if (commitPosition.compareTo(endPosition) < 0) { + secondaryChannels.put(key, channel); + } + } else { preferChannels.put(key, channel); } - else if (commitPosition.compareTo(endPosition) < 0) { - secondaryChannels.put(key, channel); - } } // distribute data to channels @@ -405,13 +414,16 @@ public class CanalSyncDataConsumer extends SyncDataConsumer { private void rollback() { holdGetLock(); try { - connector.rollback(); // Wait for the receiver to put the last message into the queue before clearing queue try { Thread.sleep(1000L); } catch (InterruptedException e) { // ignore } + + if (!ackBatches.isEmpty()) { + connector.rollback(); + } } finally { releaseGetLock(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncJob.java index dce10bab55..c22b9f8e46 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncJob.java @@ -109,8 +109,8 @@ public class CanalSyncJob extends SyncJob { colNames.add(column.getName()); } } - CanalSyncChannel syncChannel = new CanalSyncChannel(this, db, olapTable, colNames, - channelDescription.getSrcDatabase(), channelDescription.getSrcTableName()); + CanalSyncChannel syncChannel = new CanalSyncChannel(channelDescription.getChannelId(), this, db, + olapTable, colNames, channelDescription.getSrcDatabase(), channelDescription.getSrcTableName()); if (channelDescription.getPartitionNames() != null) { syncChannel.setPartitions(channelDescription.getPartitionNames()); } @@ -183,7 +183,9 @@ public class CanalSyncJob extends SyncJob { public void execute() throws UserException { LOG.info("try to start canal client. Remote ip: {}, remote port: {}, debug: {}", ip, port, debug); // init - init(); + if (!isInit()) { + init(); + } // start client unprotectedStartClient(); } @@ -193,10 +195,12 @@ public class CanalSyncJob extends SyncJob { LOG.info("Cancel canal sync job {}. MsgType: {}, errMsg: {}", id, msgType.name(), errMsg); failMsg = new SyncFailMsg(msgType, errMsg); switch (msgType) { - case USER_CANCEL: case SUBMIT_FAIL: case RUN_FAIL: + unprotectedStopClient(JobState.PAUSED); + break; case UNKNOWN: + case USER_CANCEL: unprotectedStopClient(JobState.CANCELLED); break; default: @@ -228,11 +232,7 @@ public class CanalSyncJob extends SyncJob { return; } if (client != null) { - if (jobState == JobState.CANCELLED) { - client.shutdown(true); - } else { - client.shutdown(false); - } + client.shutdown(true); } updateState(jobState, false); LOG.info("client has been stopped. id: {}, jobName: {}" , id, jobName); @@ -251,15 +251,12 @@ public class CanalSyncJob extends SyncJob { JobState jobState = info.getJobState(); switch (jobState) { case RUNNING: - client.startup(); - updateState(JobState.RUNNING, true); + updateState(JobState.PENDING, true); break; case PAUSED: - client.shutdown(false); updateState(JobState.PAUSED, true); break; case CANCELLED: - client.shutdown(true); updateState(JobState.CANCELLED, true); break; } @@ -300,4 +297,4 @@ public class CanalSyncJob extends SyncJob { + ", finishTimeMs=" + TimeUtils.longToTimeString(finishTimeMs) + "]"; } -} \ No newline at end of file +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/SyncCanalClient.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/SyncCanalClient.java index 33cb8cf8b7..421c46d5e2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/SyncCanalClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/SyncCanalClient.java @@ -53,8 +53,6 @@ public class SyncCanalClient { lock.unlock(); } - private ShutDownWorker shutDownWorker; - public SyncCanalClient(CanalSyncJob syncJob, String destination, CanalConnector connector, int batchSize, boolean debug) { this(syncJob, destination, connector, batchSize, debug, ".*\\..*"); } @@ -71,13 +69,9 @@ public class SyncCanalClient { Preconditions.checkState(!idToChannels.isEmpty(), "no channel is registered"); lock(); try { - // 1.start all threads in channel - for (CanalSyncChannel channel : idToChannels.values()) { - channel.start(); - } - // 2. start executor + // 1. start executor consumer.start(); - // 3. start receiver + // 2. start receiver receiver.start(); } finally { unlock(); @@ -85,43 +79,17 @@ public class SyncCanalClient { logger.info("canal client has been started."); } - // Stop client asynchronously public void shutdown(boolean needCleanUp) { - this.shutDownWorker = new ShutDownWorker(needCleanUp); - shutDownWorker.shutdown(); - logger.info("canal client shutdown worker has been started."); - } - - public class ShutDownWorker implements Runnable { - public Thread thread; - public boolean needCleanUp; - - public ShutDownWorker(boolean needCleanUp) { - this.thread = new Thread(this, "ShutDownWorker"); - this.needCleanUp = needCleanUp; - } - - public void shutdown() { - thread.start(); - } - - @Override - public void run() { - lock(); - try { - // 1. stop receiver - receiver.stop(); - // 2. stop executor - consumer.stop(needCleanUp); - // 3. stop channels - for (CanalSyncChannel channel : idToChannels.values()) { - channel.stop(); - } - } finally { - unlock(); - } - logger.info("canal client has been stopped."); + lock(); + try { + // 1. stop receiver + receiver.stop(); + // 2. stop executor + consumer.stop(needCleanUp); + } finally { + unlock(); } + logger.info("canal client has been stopped."); } public void registerChannels(List channels) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/position/PositionMeta.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/position/PositionMeta.java index 4d68315c33..d4d8f718da 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/position/PositionMeta.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/position/PositionMeta.java @@ -38,6 +38,7 @@ public class PositionMeta { this.batches = Maps.newHashMap(); this.commitPositions = Maps.newHashMap(); } + public void addBatch(long batchId, PositionRange range) { updateMaxBatchId(batchId); batches.put(batchId, range); @@ -76,7 +77,7 @@ public class PositionMeta { } public T getLatestPosition() { - if (batches.isEmpty()) { + if (!batches.containsKey(maxBatchId)) { return null; } else { return batches.get(maxBatchId).getEnd(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java index 85ddd1f7c2..97153954df 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java @@ -159,9 +159,10 @@ public class InsertStreamTxnExecutor { if (code != TStatusCode.OK) { throw new TException("failed to insert data: " + result.getStatus().getErrorMsgsList()); } - txnEntry.clearDataToSend(); } catch (RpcException e) { throw new TException(e); + } finally { + txnEntry.clearDataToSend(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/SerialExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/task/SerialExecutor.java new file mode 100644 index 0000000000..22d3a3d364 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/task/SerialExecutor.java @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.task; + +import com.google.common.base.Preconditions; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +public class SerialExecutor extends AbstractExecutorService { + + private final ExecutorService taskPool; + + private final ReentrantLock lock = new ReentrantLock(); + private final Condition terminating = lock.newCondition(); + + private final BlockingQueue tasks = new LinkedBlockingQueue<>(); + private Runnable active; + + private boolean shutdown; + + public SerialExecutor(final ExecutorService executor) { + Preconditions.checkNotNull(executor); + this.taskPool = executor; + } + + public void execute(final Runnable r) { + lock.lock(); + try { + checkPoolIsRunning(); + tasks.add(new Runnable() { + public void run() { + try { + r.run(); + } finally { + scheduleNext(); + } + } + }); + if (active == null) { + scheduleNext(); + } + } finally { + lock.unlock(); + } + } + + private void checkPoolIsRunning() { + Preconditions.checkState(lock.isHeldByCurrentThread()); + if (shutdown) { + throw new RejectedExecutionException("SerialExecutor is already shutdown"); + } + } + + public void shutdown() { + lock.lock(); + try { + shutdown = true; + } finally { + lock.unlock(); + } + } + + public List shutdownNow() { + lock.lock(); + try { + shutdown = true; + List result = new ArrayList<>(); + tasks.drainTo(result); + return result; + } finally { + lock.unlock(); + } + } + + public boolean isShutdown() { + lock.lock(); + try { + return shutdown; + } finally { + lock.unlock(); + } + } + + public boolean isTerminated() { + lock.lock(); + try { + return shutdown && active == null; + } finally { + lock.unlock(); + } + } + + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + lock.lock(); + try { + long waitUntil = System.nanoTime() + unit.toNanos(timeout); + long remainingTime; + while ((remainingTime = waitUntil - System.nanoTime()) > 0) { + if (shutdown && active == null) { + break; + } + terminating.awaitNanos(remainingTime); + } + return remainingTime > 0; + } finally { + lock.unlock(); + } + } + + private void scheduleNext() { + lock.lock(); + try { + if ((active = tasks.poll()) != null) { + taskPool.execute(active); + } else if (shutdown) { + terminating.signalAll(); + } + } finally { + lock.unlock(); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/SerialExecutorService.java b/fe/fe-core/src/main/java/org/apache/doris/task/SerialExecutorService.java new file mode 100644 index 0000000000..dcdf2690e8 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/task/SerialExecutorService.java @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.task; + +import org.apache.doris.common.ThreadPoolManager; + +import java.util.concurrent.ExecutorService; + +/** + * This executor service ensures that all tasks submitted to + * the same slot are executed in the order of submission. + */ +public class SerialExecutorService { + + public interface SerialRunnable extends Runnable { + int getIndex(); + } + + private final int numOfSlots; + private final ExecutorService taskPool; + private final SerialExecutor[] slots; + + private SerialExecutorService(int numOfSlots, ExecutorService taskPool) { + this.numOfSlots = numOfSlots; + this.slots = new SerialExecutor[numOfSlots]; + this.taskPool = taskPool; + for (int i = 0; i < numOfSlots; i++) { + slots[i] = new SerialExecutor(taskPool); + } + } + + public SerialExecutorService(int numOfSlots) { + this(numOfSlots, ThreadPoolManager.newDaemonFixedThreadPool( + numOfSlots, 256, "sync-task-pool", true)); + } + + public void submit(Runnable command) { + int index = getIndex(command); + if (isSlotIndex(index)) { + SerialExecutor serialEx = slots[index]; + serialEx.execute(command); + } else { + taskPool.execute(command); + } + } + + private int getIndex(Runnable command) { + int index = -1; + if (command instanceof SerialRunnable) { + index = (((SerialRunnable) command).getIndex()); + } + return index; + } + + private boolean isSlotIndex(int index) { + return index >= 0 && index < numOfSlots; + } + + public void close() { + for (int i = 0; i < numOfSlots; i++) { + final SerialExecutor serialEx = slots[i]; + serialEx.shutdown(); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncPendingTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/SyncPendingTask.java similarity index 95% rename from fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncPendingTask.java rename to fe/fe-core/src/main/java/org/apache/doris/task/SyncPendingTask.java index e9b7695233..c61d9e69c4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncPendingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/SyncPendingTask.java @@ -15,12 +15,12 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.load.sync; +package org.apache.doris.task; import org.apache.doris.common.UserException; import org.apache.doris.load.sync.SyncFailMsg.MsgType; +import org.apache.doris.load.sync.SyncJob; import org.apache.doris.load.sync.SyncJob.JobState; -import org.apache.doris.task.MasterTask; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/SyncTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/SyncTask.java new file mode 100644 index 0000000000..cdae68b0ea --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/task/SyncTask.java @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.task; + +import org.apache.doris.load.sync.SyncChannelCallback; +import org.apache.doris.task.SerialExecutorService.SerialRunnable; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * SyncTask is a runnable to submit to SerialExecutorService. Each + * SyncTask will have an index to submit to the corresponding slot + * in the SerialExecutorService. And SerialExecutorService ensures + * that all SyncTasks submitted with the same index are always + * executed in the order of submission. + */ +public abstract class SyncTask implements SerialRunnable { + private static final Logger LOG = LogManager.getLogger(SyncTask.class); + + protected long signature; + /** + * Each index corresponds to a slot in the SerialExecutorService. + * It should only be assigned by the getNextIndex() method in the + * SyncTaskPool. SyncTasks with the same index are always executed + * in the order of submission. + */ + protected int index; + protected SyncChannelCallback callback; + + public SyncTask(long signature, int index, SyncChannelCallback callback) { + this.signature = signature; + this.index = index; + this.callback = callback; + } + + @Override + public void run() { + try { + exec(); + } catch (Exception e) { + String errMsg = "channel " + signature + ", " + "msg: " + e.getMessage(); + LOG.error("sync task exec error: {}", errMsg); + callback.onFailed(errMsg); + } + } + + public int getIndex() { + return this.index; + } + + /** + * implement in child + */ + protected abstract void exec() throws Exception; +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/SyncTaskPool.java b/fe/fe-core/src/main/java/org/apache/doris/task/SyncTaskPool.java new file mode 100644 index 0000000000..fb2309d49b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/task/SyncTaskPool.java @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.task; + +import org.apache.doris.common.Config; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.IntUnaryOperator; + +public class SyncTaskPool { + private static final int NUM_OF_SLOTS = Config.max_sync_task_threads_num; + private static final SerialExecutorService EXECUTOR = new SerialExecutorService(NUM_OF_SLOTS); + private static final AtomicInteger nextIndex = new AtomicInteger(); + + public static void submit(Runnable task) { + if (task == null) { + return; + } + EXECUTOR.submit(task); + } + + /** + * Gets the next index loop from 0 to @NUM_OF_SLOTS - 1 + */ + public static int getNextIndex() { + return nextIndex.updateAndGet(new IntUnaryOperator() { + @Override + public int applyAsInt(int operand) { + if (++operand >= NUM_OF_SLOTS) { + operand = 0; + } + return operand; + } + }); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java index 8a893c1d6b..343a0fff4f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java @@ -73,6 +73,7 @@ public class CanalSyncDataTest { private long offset = 0; private long nextId = 1000L; private int batchSize = 8192; + private long channelId = 100001L; ReentrantLock getLock; @@ -220,13 +221,12 @@ public class CanalSyncDataTest { CanalSyncDataReceiver receiver = new CanalSyncDataReceiver( syncJob, connector, "test", "mysql_db.mysql_tbl", consumer, 8192, getLock); CanalSyncChannel channel = new CanalSyncChannel( - syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl"); + channelId, syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl"); Map idToChannels = Maps.newHashMap(); idToChannels.put(channel.getId(), channel); consumer.setChannels(idToChannels); - channel.start(); consumer.start(); receiver.start(); @@ -235,7 +235,6 @@ public class CanalSyncDataTest { } finally { receiver.stop(); consumer.stop(); - channel.stop(); } Assert.assertEquals("position:N/A", consumer.getPositionInfo()); @@ -295,13 +294,12 @@ public class CanalSyncDataTest { CanalSyncDataReceiver receiver = new CanalSyncDataReceiver( syncJob, connector, "test", "mysql_db.mysql_tbl", consumer, 8192, getLock); CanalSyncChannel channel = new CanalSyncChannel( - syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl"); + channelId, syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl"); Map idToChannels = Maps.newHashMap(); idToChannels.put(channel.getId(), channel); consumer.setChannels(idToChannels); - channel.start(); consumer.start(); receiver.start(); @@ -310,7 +308,6 @@ public class CanalSyncDataTest { } finally { receiver.stop(); consumer.stop(); - channel.stop(); } LOG.info(consumer.getPositionInfo()); @@ -360,13 +357,12 @@ public class CanalSyncDataTest { CanalSyncDataReceiver receiver = new CanalSyncDataReceiver( syncJob, connector, "test", "mysql_db.mysql_tbl", consumer, 8192, getLock); CanalSyncChannel channel = new CanalSyncChannel( - syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl"); + channelId, syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl"); Map idToChannels = Maps.newHashMap(); idToChannels.put(channel.getId(), channel); consumer.setChannels(idToChannels); - channel.start(); consumer.start(); receiver.start(); @@ -375,7 +371,6 @@ public class CanalSyncDataTest { } finally { receiver.stop(); consumer.stop(); - channel.stop(); } Assert.assertEquals("position:N/A", consumer.getPositionInfo()); @@ -444,13 +439,12 @@ public class CanalSyncDataTest { CanalSyncDataReceiver receiver = new CanalSyncDataReceiver( syncJob, connector, "test", "mysql_db.mysql_tbl", consumer, 8192, getLock); CanalSyncChannel channel = new CanalSyncChannel( - syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl"); + channelId, syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl"); Map idToChannels = Maps.newHashMap(); idToChannels.put(channel.getId(), channel); consumer.setChannels(idToChannels); - channel.start(); consumer.start(); receiver.start(); @@ -459,7 +453,6 @@ public class CanalSyncDataTest { } finally { receiver.stop(); consumer.stop(); - channel.stop(); } Assert.assertEquals("position:N/A", consumer.getPositionInfo()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/task/SerialExecutorServiceTest.java b/fe/fe-core/src/test/java/org/apache/doris/task/SerialExecutorServiceTest.java new file mode 100644 index 0000000000..cc9f500cec --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/task/SerialExecutorServiceTest.java @@ -0,0 +1,133 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.task; + +import org.apache.doris.load.sync.SyncChannelCallback; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class SerialExecutorServiceTest { + private static final Logger LOG = LoggerFactory.getLogger(MasterTaskExecutorTest.class); + private static final int NUM_OF_SLOTS = 10; + private static final int THREAD_NUM = 10; + + private static SerialExecutorService taskPool; + // thread signature -> tasks submit serial + private static Map> submitSerial; + // thread signature -> tasks execute serial + private static Map> execSerial; + + @Before + public void setUp() { + taskPool = new SerialExecutorService(NUM_OF_SLOTS); + submitSerial = new ConcurrentHashMap<>(); + execSerial = new ConcurrentHashMap<>(); + } + + @After + public void tearDown() { + if (taskPool != null) { + taskPool.close(); + } + } + + @Test + public void testSubmit() { + for (long i = 0; i < THREAD_NUM; i++) { + if (!submitSerial.containsKey(i)) { + submitSerial.put(i, new ArrayList<>()); + } + SubmitThread thread = new SubmitThread("Thread-" + i, i, submitSerial.get(i)); + thread.start(); + } + + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + + // The submission order of the same signature should be equal to the execution order + Assert.assertEquals(submitSerial.size(), THREAD_NUM); + Assert.assertEquals(submitSerial.size(), execSerial.size()); + for (long i = 0; i < THREAD_NUM; i++) { + Assert.assertTrue(submitSerial.containsKey(i)); + Assert.assertTrue(execSerial.containsKey(i)); + List submitSerialList = submitSerial.get(i); + List execSerialList = execSerial.get(i); + Assert.assertEquals(submitSerialList.size(), execSerialList.size()); + for (int j = 0; j < submitSerialList.size(); j++) { + Assert.assertEquals(submitSerialList.get(j), execSerialList.get(j)); + } + } + } + + private static class TestSyncTask extends SyncTask { + public int serial; + + public TestSyncTask(long signature, int index, int serial, SyncChannelCallback callback) { + super(signature, index, callback); + this.serial = serial; + } + + @Override + protected void exec() { + LOG.info("run exec. signature: {}, index: {}, serial: {}", signature, index, serial); + if (!execSerial.containsKey(signature)) { + execSerial.put(signature, new ArrayList<>()); + } + execSerial.get(signature).add(serial); + } + } + + private static class SubmitThread extends Thread { + private int index = SyncTaskPool.getNextIndex(); + private long signature; + private List submitSerialList; + + public SubmitThread(String name, long signature, List submitSerialList) { + super(name); + this.signature = signature; + this.submitSerialList = submitSerialList; + } + + public void run() { + for (int i = 0; i < 100; i++) { + TestSyncTask task = new TestSyncTask(signature, index, i, new SyncChannelCallback() { + @Override + public void onFinished(long channelId) { + } + @Override + public void onFailed(String errMsg) { + } + }); + submitSerialList.add(i); + taskPool.submit(task); + } + } + } +}