[Enhance] [Binlog] Reduce thread number of SyncJob to save resources (#6418)

This commit is going to reduce thread number of SyncJob .
1、Submit send task to thread pool to send data.
2、Submit eof task to thread pool to block and wake up client to commit transactions.
3、Use SerialExecutorService to ensure correct order of sent data in every channel.

Besides,some bugs have been fixed in this commit
1、Failed to resume syncJob.
2、Failed to do sync data when set multiple tables in a syncJob.
3、In a cluster with multiple Fe, master may hang up after creating syncJob.
This commit is contained in:
xy720
2021-09-17 10:01:27 +08:00
committed by GitHub
parent 085942b30f
commit 95cdb7cc0c
23 changed files with 618 additions and 208 deletions

View File

@ -5264,6 +5264,8 @@ keyword ::=
{: RESULT = id; :}
| KW_ISOLATION:id
{: RESULT = id; :}
| KW_JOB:id
{: RESULT = id; :}
| KW_ENCRYPTKEY:id
{: RESULT = id; :}
| KW_ENCRYPTKEYS:id

View File

@ -60,6 +60,8 @@ public class ChannelDescription implements Writable {
// column names of source table
@SerializedName(value = "colNames")
private final List<String> colNames;
@SerializedName(value = "channelId")
private long channelId;
public ChannelDescription(String srcDatabase, String srcTableName, String targetTable, PartitionNames partitionNames, List<String> colNames) {
this.srcDatabase = srcDatabase;
@ -119,6 +121,14 @@ public class ChannelDescription implements Writable {
}
}
public void setChannelId(long channelId) {
this.channelId = channelId;
}
public long getChannelId() {
return this.channelId;
}
public String getTargetTable() {
return targetTable;
}

View File

@ -1318,7 +1318,7 @@ public class Catalog {
ExportChecker.init(Config.export_checker_interval_second * 1000L);
ExportChecker.startAll();
// Sync checker
SyncChecker.init(Config.sync_checker_interval_second);
SyncChecker.init(Config.sync_checker_interval_second * 1000L);
SyncChecker.startAll();
// Tablet checker and scheduler
tabletChecker.start();

View File

@ -635,6 +635,11 @@ public class Config extends ConfigBase {
*/
@ConfField public static int sync_checker_interval_second = 5;
/**
* max num of thread to handle sync task in sync task thread-pool.
*/
@ConfField public static int max_sync_task_threads_num = 10;
/**
* Default number of waiting jobs for routine load and version 2 of load
* This is a desired number.

View File

@ -18,7 +18,6 @@
package org.apache.doris.load.sync;
import org.apache.doris.analysis.PartitionNames;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.UserException;
@ -32,7 +31,7 @@ import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
public class SyncChannel extends SyncLifeCycle {
public class SyncChannel {
private static final Logger LOG = LogManager.getLogger(SyncChannel.class);
protected long id;
@ -46,8 +45,8 @@ public class SyncChannel extends SyncLifeCycle {
protected String srcTable;
protected SyncChannelCallback callback;
public SyncChannel(SyncJob syncJob, Database db, OlapTable table, List<String> columns, String srcDataBase, String srcTable) {
this.id = Catalog.getCurrentCatalog().getNextId();
public SyncChannel(long id, SyncJob syncJob, Database db, OlapTable table, List<String> columns, String srcDataBase, String srcTable) {
this.id = id;
this.jobId = syncJob.getId();
this.db = db;
this.tbl = table;
@ -57,22 +56,6 @@ public class SyncChannel extends SyncLifeCycle {
this.srcTable = srcTable.toLowerCase();
}
@Override
public void start() {
super.start();
LOG.info("channel {} has been started. dest table: {}, mysql src table: {}.{}", id, targetTable, srcDataBase, srcTable);
}
@Override
public void stop() {
super.stop();
LOG.info("channel {} has been stopped. dest table: {}, mysql src table: {}.{}", id, targetTable, srcDataBase, srcTable);
}
@Override
public void process() {
}
public void beginTxn(long batchId) throws UserException, TException, TimeoutException,
InterruptedException, ExecutionException {
}

View File

@ -19,8 +19,6 @@ package org.apache.doris.load.sync;
public interface SyncChannelCallback {
public boolean state();
public void onFinished(long channelId);
public void onFailed(String errMsg);

View File

@ -24,14 +24,11 @@ import org.apache.doris.thrift.TStatusCode;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.atomic.AtomicBoolean;
public class SyncChannelHandle implements SyncChannelCallback {
private Logger LOG = LogManager.getLogger(SyncChannelHandle.class);
// channel id -> dummy value(-1)
private MarkedCountDownLatch<Long, Long> latch;
private Sync sync = new Sync();
public void reset(int size) {
this.latch = new MarkedCountDownLatch<>(size);
@ -41,19 +38,6 @@ public class SyncChannelHandle implements SyncChannelCallback {
latch.addMark(channel.getId(), -1L);
}
public void set(Boolean mutex) {
if (mutex) {
this.sync.innerSetTrue();
} else {
this.sync.innerSetFalse();
}
}
@Override
public boolean state() {
return this.sync.innerState();
}
@Override
public void onFinished(long channelId) {
this.latch.markedCountDown(channelId, -1L);
@ -71,41 +55,4 @@ public class SyncChannelHandle implements SyncChannelCallback {
public Status getStatus() {
return latch.getStatus();
}
// This class describes the inner state.
private final class Sync {
private AtomicBoolean state;
boolean innerState() {
return this.state.get();
}
public boolean getState() {
return state.get();
}
void innerSetTrue() {
boolean s;
do {
s = getState();
if (s) {
return;
}
} while(!state.compareAndSet(s, true));
}
void innerSetFalse() {
boolean s;
do {
s = getState();
if (!s) {
return;
}
} while(!state.compareAndSet(s, false));
}
private Sync() {
state = new AtomicBoolean(false);
}
}
}

View File

@ -25,6 +25,7 @@ import org.apache.doris.task.MasterTaskExecutor;
import com.google.common.collect.Maps;
import org.apache.doris.task.SyncPendingTask;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -67,7 +68,7 @@ public class SyncChecker extends MasterDaemon {
@Override
protected void runAfterCatalogReady() {
LOG.debug("start check export jobs. job state: {}", jobState.name());
LOG.debug("start check sync jobs. job state: {}", jobState.name());
switch (jobState) {
case PENDING:
runPendingJobs();

View File

@ -307,6 +307,10 @@ public abstract class SyncJob implements Writable {
public void setChannelDescriptions(List<ChannelDescription> channelDescriptions) {
this.channelDescriptions = channelDescriptions;
// set channel id
for (ChannelDescription channelDescription : channelDescriptions) {
channelDescription.setChannelId(Catalog.getCurrentCatalog().getNextId());
}
}
public long getId() {

View File

@ -62,6 +62,11 @@ public abstract class SyncLifeCycle {
this.running = false;
if (thread != null) {
// Deadlock prevention
if (thread == Thread.currentThread()) {
return;
}
try {
thread.join();
} catch (InterruptedException e) {

View File

@ -32,6 +32,8 @@ import org.apache.doris.load.sync.model.Data;
import org.apache.doris.proto.InternalService;
import org.apache.doris.qe.InsertStreamTxnExecutor;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.task.SyncTask;
import org.apache.doris.task.SyncTaskPool;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TMergeType;
@ -49,7 +51,6 @@ import com.alibaba.otter.canal.protocol.CanalEntry;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -58,8 +59,6 @@ import org.apache.thrift.TException;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class CanalSyncChannel extends SyncChannel {
@ -69,44 +68,47 @@ public class CanalSyncChannel extends SyncChannel {
private static final String DELETE_CONDITION = DELETE_COLUMN + "=1";
private static final String NULL_VALUE_FOR_LOAD = "\\N";
private final int index;
private long timeoutSecond;
private long lastBatchId;
private LinkedBlockingQueue<Data<InternalService.PDataRow>> pendingQueue;
private Data<InternalService.PDataRow> batchBuffer;
private InsertStreamTxnExecutor txnExecutor;
public CanalSyncChannel(SyncJob syncJob, Database db, OlapTable table, List<String> columns, String srcDataBase, String srcTable) {
super(syncJob, db, table, columns, srcDataBase, srcTable);
public CanalSyncChannel(long id, SyncJob syncJob, Database db, OlapTable table, List<String> columns, String srcDataBase, String srcTable) {
super(id, syncJob, db, table, columns, srcDataBase, srcTable);
this.index = SyncTaskPool.getNextIndex();
this.batchBuffer = new Data<>();
this.pendingQueue = Queues.newLinkedBlockingQueue(128);
this.lastBatchId = -1L;
this.timeoutSecond = -1L;
}
public void process() {
while (running) {
if (!isTxnInit()) {
continue;
}
// if txn has begun, send all data in queue
if (isTxnBegin()) {
while (!pendingQueue.isEmpty()) {
try {
Data<InternalService.PDataRow> rows = pendingQueue.poll(CanalConfigs.channelWaitingTimeoutMs, TimeUnit.MILLISECONDS);
if (rows != null) {
sendData(rows);
}
} catch (Exception e) {
String errMsg = "encounter exception in channel, channel " + id + ", " +
"msg: " + e.getMessage() + ", table: " + targetTable;
LOG.error(errMsg);
callback.onFailed(errMsg);
}
}
}
if (callback.state()) {
callback.onFinished(id);
}
private final static class SendTask extends SyncTask {
private final InsertStreamTxnExecutor executor;
private final Data<InternalService.PDataRow> rows;
public SendTask(long signature, int index, SyncChannelCallback callback, Data<InternalService.PDataRow> rows, InsertStreamTxnExecutor executor) {
super(signature, index, callback);
this.executor = executor;
this.rows = rows;
}
public void exec() throws Exception {
TransactionEntry txnEntry = executor.getTxnEntry();
txnEntry.setDataToSend(rows.getDatas());
executor.sendData();
}
}
private final static class EOFTask extends SyncTask {
public EOFTask(long signature, int index, SyncChannelCallback callback) {
super(signature, index, callback);
}
public void exec() throws Exception {
callback.onFinished(signature);
}
}
@ -189,10 +191,10 @@ public class CanalSyncChannel extends SyncChannel {
throw e;
} finally {
this.batchBuffer = new Data<>();
this.pendingQueue.clear();
updateBatchId(-1L);
}
}
@Override
public void commitTxn() throws TException, TimeoutException, InterruptedException, ExecutionException {
if (!isTxnBegin()) {
@ -213,10 +215,10 @@ public class CanalSyncChannel extends SyncChannel {
throw e;
} finally {
this.batchBuffer = new Data<>();
this.pendingQueue.clear();
updateBatchId(-1L);
}
}
@Override
public void initTxn(long timeoutSecond) {
if (!isTxnInit()) {
@ -254,7 +256,12 @@ public class CanalSyncChannel extends SyncChannel {
}
}
private void execute(long batchId, CanalEntry.EventType eventType, List<CanalEntry.Column> columns) {
public void submitEOF() {
EOFTask task = new EOFTask(id, index, callback);
SyncTaskPool.submit(task);
}
public void execute(long batchId, CanalEntry.EventType eventType, List<CanalEntry.Column> columns) {
InternalService.PDataRow row = parseRow(eventType, columns);
try {
Preconditions.checkState(isTxnInit());
@ -262,7 +269,8 @@ public class CanalSyncChannel extends SyncChannel {
if (!isTxnBegin()) {
beginTxn(batchId);
} else {
this.pendingQueue.put(this.batchBuffer);
SendTask task = new SendTask(id, index, callback, batchBuffer, txnExecutor);
SyncTaskPool.submit(task);
this.batchBuffer = new Data<>();
}
updateBatchId(batchId);
@ -294,19 +302,13 @@ public class CanalSyncChannel extends SyncChannel {
return row.build();
}
private void sendData(Data<InternalService.PDataRow> rows) throws TException, TimeoutException,
InterruptedException, ExecutionException {
Preconditions.checkState(isTxnBegin());
TransactionEntry txnEntry = txnExecutor.getTxnEntry();
txnEntry.setDataToSend(rows.getDatas());
this.txnExecutor.sendData();
}
public void flushData() throws TException, TimeoutException,
InterruptedException, ExecutionException {
if (batchBuffer.isNotEmpty()) {
sendData(batchBuffer);
batchBuffer = new Data<>();
if (this.batchBuffer.isNotEmpty()) {
TransactionEntry txnEntry = txnExecutor.getTxnEntry();
txnEntry.setDataToSend(batchBuffer.getDatas());
this.txnExecutor.sendData();
this.batchBuffer = new Data<>();
}
}

View File

@ -102,7 +102,6 @@ public class CanalSyncDataConsumer extends SyncDataConsumer {
@Override
public void beginForTxn() {
handle.set(false);
handle.reset(idToChannels.size());
for (CanalSyncChannel channel : idToChannels.values()) {
channel.initTxn(Config.max_stream_load_timeout_second);
@ -161,15 +160,16 @@ public class CanalSyncDataConsumer extends SyncDataConsumer {
}
public Status waitForTxn() {
for (CanalSyncChannel channel : idToChannels.values()) {
channel.submitEOF();
}
Status st = Status.CANCELLED;
handle.set(true);
try {
handle.join();
st = handle.getStatus();
} catch (InterruptedException e) {
logger.warn("InterruptedException: ", e);
} finally {
handle.set(false);
}
return st;
}
@ -190,7 +190,7 @@ public class CanalSyncDataConsumer extends SyncDataConsumer {
long totalMemSize = 0L;
long startTime = System.currentTimeMillis();
beginForTxn();
while (true) {
while (running) {
Events<CanalEntry.Entry, EntryPosition> dataEvents = null;
try {
dataEvents = dataBlockingQueue.poll(CanalConfigs.pollWaitingTimeoutMs, TimeUnit.MILLISECONDS);
@ -227,7 +227,12 @@ public class CanalSyncDataConsumer extends SyncDataConsumer {
break;
}
}
Status st = waitForTxn();
if (!running) {
abortForTxn("stopping client");
continue;
}
if (st.ok()) {
commitForTxn();
} else {
@ -260,7 +265,7 @@ public class CanalSyncDataConsumer extends SyncDataConsumer {
}
int startIndex = 0;
// if last ack position is null, it is the first time to consume batch (startOffset = 0)
// if last ack position is null, it is the first time to consume batch
EntryPosition lastAckPosition = positionMeta.getAckPosition();
if (lastAckPosition != null) {
EntryPosition firstPosition = EntryPosition.createPosition(entries.get(0));
@ -303,14 +308,18 @@ public class CanalSyncDataConsumer extends SyncDataConsumer {
EntryPosition startPosition = dataEvents.getPositionRange().getStart();
EntryPosition endPosition = dataEvents.getPositionRange().getEnd();
for (CanalSyncChannel channel : idToChannels.values()) {
String key = CanalUtils.getFullName(channel.getSrcDataBase(), channel.getSrcTable());
// if last commit position is null, it is the first time to execute batch
EntryPosition commitPosition = positionMeta.getCommitPosition(channel.getId());
String key = channel.getSrcDataBase() + "." + channel.getSrcTable();
if (commitPosition.compareTo(startPosition) < 0) {
if (commitPosition != null) {
if (commitPosition.compareTo(startPosition) < 0) {
preferChannels.put(key, channel);
} else if (commitPosition.compareTo(endPosition) < 0) {
secondaryChannels.put(key, channel);
}
} else {
preferChannels.put(key, channel);
}
else if (commitPosition.compareTo(endPosition) < 0) {
secondaryChannels.put(key, channel);
}
}
// distribute data to channels
@ -405,13 +414,16 @@ public class CanalSyncDataConsumer extends SyncDataConsumer {
private void rollback() {
holdGetLock();
try {
connector.rollback();
// Wait for the receiver to put the last message into the queue before clearing queue
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
// ignore
}
if (!ackBatches.isEmpty()) {
connector.rollback();
}
} finally {
releaseGetLock();
}

View File

@ -109,8 +109,8 @@ public class CanalSyncJob extends SyncJob {
colNames.add(column.getName());
}
}
CanalSyncChannel syncChannel = new CanalSyncChannel(this, db, olapTable, colNames,
channelDescription.getSrcDatabase(), channelDescription.getSrcTableName());
CanalSyncChannel syncChannel = new CanalSyncChannel(channelDescription.getChannelId(), this, db,
olapTable, colNames, channelDescription.getSrcDatabase(), channelDescription.getSrcTableName());
if (channelDescription.getPartitionNames() != null) {
syncChannel.setPartitions(channelDescription.getPartitionNames());
}
@ -183,7 +183,9 @@ public class CanalSyncJob extends SyncJob {
public void execute() throws UserException {
LOG.info("try to start canal client. Remote ip: {}, remote port: {}, debug: {}", ip, port, debug);
// init
init();
if (!isInit()) {
init();
}
// start client
unprotectedStartClient();
}
@ -193,10 +195,12 @@ public class CanalSyncJob extends SyncJob {
LOG.info("Cancel canal sync job {}. MsgType: {}, errMsg: {}", id, msgType.name(), errMsg);
failMsg = new SyncFailMsg(msgType, errMsg);
switch (msgType) {
case USER_CANCEL:
case SUBMIT_FAIL:
case RUN_FAIL:
unprotectedStopClient(JobState.PAUSED);
break;
case UNKNOWN:
case USER_CANCEL:
unprotectedStopClient(JobState.CANCELLED);
break;
default:
@ -228,11 +232,7 @@ public class CanalSyncJob extends SyncJob {
return;
}
if (client != null) {
if (jobState == JobState.CANCELLED) {
client.shutdown(true);
} else {
client.shutdown(false);
}
client.shutdown(true);
}
updateState(jobState, false);
LOG.info("client has been stopped. id: {}, jobName: {}" , id, jobName);
@ -251,15 +251,12 @@ public class CanalSyncJob extends SyncJob {
JobState jobState = info.getJobState();
switch (jobState) {
case RUNNING:
client.startup();
updateState(JobState.RUNNING, true);
updateState(JobState.PENDING, true);
break;
case PAUSED:
client.shutdown(false);
updateState(JobState.PAUSED, true);
break;
case CANCELLED:
client.shutdown(true);
updateState(JobState.CANCELLED, true);
break;
}
@ -300,4 +297,4 @@ public class CanalSyncJob extends SyncJob {
+ ", finishTimeMs=" + TimeUtils.longToTimeString(finishTimeMs)
+ "]";
}
}
}

View File

@ -53,8 +53,6 @@ public class SyncCanalClient {
lock.unlock();
}
private ShutDownWorker shutDownWorker;
public SyncCanalClient(CanalSyncJob syncJob, String destination, CanalConnector connector, int batchSize, boolean debug) {
this(syncJob, destination, connector, batchSize, debug, ".*\\..*");
}
@ -71,13 +69,9 @@ public class SyncCanalClient {
Preconditions.checkState(!idToChannels.isEmpty(), "no channel is registered");
lock();
try {
// 1.start all threads in channel
for (CanalSyncChannel channel : idToChannels.values()) {
channel.start();
}
// 2. start executor
// 1. start executor
consumer.start();
// 3. start receiver
// 2. start receiver
receiver.start();
} finally {
unlock();
@ -85,43 +79,17 @@ public class SyncCanalClient {
logger.info("canal client has been started.");
}
// Stop client asynchronously
public void shutdown(boolean needCleanUp) {
this.shutDownWorker = new ShutDownWorker(needCleanUp);
shutDownWorker.shutdown();
logger.info("canal client shutdown worker has been started.");
}
public class ShutDownWorker implements Runnable {
public Thread thread;
public boolean needCleanUp;
public ShutDownWorker(boolean needCleanUp) {
this.thread = new Thread(this, "ShutDownWorker");
this.needCleanUp = needCleanUp;
}
public void shutdown() {
thread.start();
}
@Override
public void run() {
lock();
try {
// 1. stop receiver
receiver.stop();
// 2. stop executor
consumer.stop(needCleanUp);
// 3. stop channels
for (CanalSyncChannel channel : idToChannels.values()) {
channel.stop();
}
} finally {
unlock();
}
logger.info("canal client has been stopped.");
lock();
try {
// 1. stop receiver
receiver.stop();
// 2. stop executor
consumer.stop(needCleanUp);
} finally {
unlock();
}
logger.info("canal client has been stopped.");
}
public void registerChannels(List<SyncChannel> channels) {

View File

@ -38,6 +38,7 @@ public class PositionMeta<T> {
this.batches = Maps.newHashMap();
this.commitPositions = Maps.newHashMap();
}
public void addBatch(long batchId, PositionRange<T> range) {
updateMaxBatchId(batchId);
batches.put(batchId, range);
@ -76,7 +77,7 @@ public class PositionMeta<T> {
}
public T getLatestPosition() {
if (batches.isEmpty()) {
if (!batches.containsKey(maxBatchId)) {
return null;
} else {
return batches.get(maxBatchId).getEnd();

View File

@ -159,9 +159,10 @@ public class InsertStreamTxnExecutor {
if (code != TStatusCode.OK) {
throw new TException("failed to insert data: " + result.getStatus().getErrorMsgsList());
}
txnEntry.clearDataToSend();
} catch (RpcException e) {
throw new TException(e);
} finally {
txnEntry.clearDataToSend();
}
}

View File

@ -0,0 +1,146 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.task;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class SerialExecutor extends AbstractExecutorService {
private final ExecutorService taskPool;
private final ReentrantLock lock = new ReentrantLock();
private final Condition terminating = lock.newCondition();
private final BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();
private Runnable active;
private boolean shutdown;
public SerialExecutor(final ExecutorService executor) {
Preconditions.checkNotNull(executor);
this.taskPool = executor;
}
public void execute(final Runnable r) {
lock.lock();
try {
checkPoolIsRunning();
tasks.add(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (active == null) {
scheduleNext();
}
} finally {
lock.unlock();
}
}
private void checkPoolIsRunning() {
Preconditions.checkState(lock.isHeldByCurrentThread());
if (shutdown) {
throw new RejectedExecutionException("SerialExecutor is already shutdown");
}
}
public void shutdown() {
lock.lock();
try {
shutdown = true;
} finally {
lock.unlock();
}
}
public List<Runnable> shutdownNow() {
lock.lock();
try {
shutdown = true;
List<Runnable> result = new ArrayList<>();
tasks.drainTo(result);
return result;
} finally {
lock.unlock();
}
}
public boolean isShutdown() {
lock.lock();
try {
return shutdown;
} finally {
lock.unlock();
}
}
public boolean isTerminated() {
lock.lock();
try {
return shutdown && active == null;
} finally {
lock.unlock();
}
}
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
lock.lock();
try {
long waitUntil = System.nanoTime() + unit.toNanos(timeout);
long remainingTime;
while ((remainingTime = waitUntil - System.nanoTime()) > 0) {
if (shutdown && active == null) {
break;
}
terminating.awaitNanos(remainingTime);
}
return remainingTime > 0;
} finally {
lock.unlock();
}
}
private void scheduleNext() {
lock.lock();
try {
if ((active = tasks.poll()) != null) {
taskPool.execute(active);
} else if (shutdown) {
terminating.signalAll();
}
} finally {
lock.unlock();
}
}
}

View File

@ -0,0 +1,80 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.task;
import org.apache.doris.common.ThreadPoolManager;
import java.util.concurrent.ExecutorService;
/**
* This executor service ensures that all tasks submitted to
* the same slot are executed in the order of submission.
*/
public class SerialExecutorService {
public interface SerialRunnable extends Runnable {
int getIndex();
}
private final int numOfSlots;
private final ExecutorService taskPool;
private final SerialExecutor[] slots;
private SerialExecutorService(int numOfSlots, ExecutorService taskPool) {
this.numOfSlots = numOfSlots;
this.slots = new SerialExecutor[numOfSlots];
this.taskPool = taskPool;
for (int i = 0; i < numOfSlots; i++) {
slots[i] = new SerialExecutor(taskPool);
}
}
public SerialExecutorService(int numOfSlots) {
this(numOfSlots, ThreadPoolManager.newDaemonFixedThreadPool(
numOfSlots, 256, "sync-task-pool", true));
}
public void submit(Runnable command) {
int index = getIndex(command);
if (isSlotIndex(index)) {
SerialExecutor serialEx = slots[index];
serialEx.execute(command);
} else {
taskPool.execute(command);
}
}
private int getIndex(Runnable command) {
int index = -1;
if (command instanceof SerialRunnable) {
index = (((SerialRunnable) command).getIndex());
}
return index;
}
private boolean isSlotIndex(int index) {
return index >= 0 && index < numOfSlots;
}
public void close() {
for (int i = 0; i < numOfSlots; i++) {
final SerialExecutor serialEx = slots[i];
serialEx.shutdown();
}
}
}

View File

@ -15,12 +15,12 @@
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.sync;
package org.apache.doris.task;
import org.apache.doris.common.UserException;
import org.apache.doris.load.sync.SyncFailMsg.MsgType;
import org.apache.doris.load.sync.SyncJob;
import org.apache.doris.load.sync.SyncJob.JobState;
import org.apache.doris.task.MasterTask;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

View File

@ -0,0 +1,71 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.task;
import org.apache.doris.load.sync.SyncChannelCallback;
import org.apache.doris.task.SerialExecutorService.SerialRunnable;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* SyncTask is a runnable to submit to SerialExecutorService. Each
* SyncTask will have an index to submit to the corresponding slot
* in the SerialExecutorService. And SerialExecutorService ensures
* that all SyncTasks submitted with the same index are always
* executed in the order of submission.
*/
public abstract class SyncTask implements SerialRunnable {
private static final Logger LOG = LogManager.getLogger(SyncTask.class);
protected long signature;
/**
* Each index corresponds to a slot in the SerialExecutorService.
* It should only be assigned by the getNextIndex() method in the
* SyncTaskPool. SyncTasks with the same index are always executed
* in the order of submission.
*/
protected int index;
protected SyncChannelCallback callback;
public SyncTask(long signature, int index, SyncChannelCallback callback) {
this.signature = signature;
this.index = index;
this.callback = callback;
}
@Override
public void run() {
try {
exec();
} catch (Exception e) {
String errMsg = "channel " + signature + ", " + "msg: " + e.getMessage();
LOG.error("sync task exec error: {}", errMsg);
callback.onFailed(errMsg);
}
}
public int getIndex() {
return this.index;
}
/**
* implement in child
*/
protected abstract void exec() throws Exception;
}

View File

@ -0,0 +1,51 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.task;
import org.apache.doris.common.Config;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntUnaryOperator;
public class SyncTaskPool {
private static final int NUM_OF_SLOTS = Config.max_sync_task_threads_num;
private static final SerialExecutorService EXECUTOR = new SerialExecutorService(NUM_OF_SLOTS);
private static final AtomicInteger nextIndex = new AtomicInteger();
public static void submit(Runnable task) {
if (task == null) {
return;
}
EXECUTOR.submit(task);
}
/**
* Gets the next index loop from 0 to @NUM_OF_SLOTS - 1
*/
public static int getNextIndex() {
return nextIndex.updateAndGet(new IntUnaryOperator() {
@Override
public int applyAsInt(int operand) {
if (++operand >= NUM_OF_SLOTS) {
operand = 0;
}
return operand;
}
});
}
}

View File

@ -73,6 +73,7 @@ public class CanalSyncDataTest {
private long offset = 0;
private long nextId = 1000L;
private int batchSize = 8192;
private long channelId = 100001L;
ReentrantLock getLock;
@ -220,13 +221,12 @@ public class CanalSyncDataTest {
CanalSyncDataReceiver receiver = new CanalSyncDataReceiver(
syncJob, connector, "test", "mysql_db.mysql_tbl", consumer, 8192, getLock);
CanalSyncChannel channel = new CanalSyncChannel(
syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl");
channelId, syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl");
Map<Long, CanalSyncChannel> idToChannels = Maps.newHashMap();
idToChannels.put(channel.getId(), channel);
consumer.setChannels(idToChannels);
channel.start();
consumer.start();
receiver.start();
@ -235,7 +235,6 @@ public class CanalSyncDataTest {
} finally {
receiver.stop();
consumer.stop();
channel.stop();
}
Assert.assertEquals("position:N/A", consumer.getPositionInfo());
@ -295,13 +294,12 @@ public class CanalSyncDataTest {
CanalSyncDataReceiver receiver = new CanalSyncDataReceiver(
syncJob, connector, "test", "mysql_db.mysql_tbl", consumer, 8192, getLock);
CanalSyncChannel channel = new CanalSyncChannel(
syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl");
channelId, syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl");
Map<Long, CanalSyncChannel> idToChannels = Maps.newHashMap();
idToChannels.put(channel.getId(), channel);
consumer.setChannels(idToChannels);
channel.start();
consumer.start();
receiver.start();
@ -310,7 +308,6 @@ public class CanalSyncDataTest {
} finally {
receiver.stop();
consumer.stop();
channel.stop();
}
LOG.info(consumer.getPositionInfo());
@ -360,13 +357,12 @@ public class CanalSyncDataTest {
CanalSyncDataReceiver receiver = new CanalSyncDataReceiver(
syncJob, connector, "test", "mysql_db.mysql_tbl", consumer, 8192, getLock);
CanalSyncChannel channel = new CanalSyncChannel(
syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl");
channelId, syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl");
Map<Long, CanalSyncChannel> idToChannels = Maps.newHashMap();
idToChannels.put(channel.getId(), channel);
consumer.setChannels(idToChannels);
channel.start();
consumer.start();
receiver.start();
@ -375,7 +371,6 @@ public class CanalSyncDataTest {
} finally {
receiver.stop();
consumer.stop();
channel.stop();
}
Assert.assertEquals("position:N/A", consumer.getPositionInfo());
@ -444,13 +439,12 @@ public class CanalSyncDataTest {
CanalSyncDataReceiver receiver = new CanalSyncDataReceiver(
syncJob, connector, "test", "mysql_db.mysql_tbl", consumer, 8192, getLock);
CanalSyncChannel channel = new CanalSyncChannel(
syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl");
channelId, syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl");
Map<Long, CanalSyncChannel> idToChannels = Maps.newHashMap();
idToChannels.put(channel.getId(), channel);
consumer.setChannels(idToChannels);
channel.start();
consumer.start();
receiver.start();
@ -459,7 +453,6 @@ public class CanalSyncDataTest {
} finally {
receiver.stop();
consumer.stop();
channel.stop();
}
Assert.assertEquals("position:N/A", consumer.getPositionInfo());

View File

@ -0,0 +1,133 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.task;
import org.apache.doris.load.sync.SyncChannelCallback;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class SerialExecutorServiceTest {
private static final Logger LOG = LoggerFactory.getLogger(MasterTaskExecutorTest.class);
private static final int NUM_OF_SLOTS = 10;
private static final int THREAD_NUM = 10;
private static SerialExecutorService taskPool;
// thread signature -> tasks submit serial
private static Map<Long, List<Integer>> submitSerial;
// thread signature -> tasks execute serial
private static Map<Long, List<Integer>> execSerial;
@Before
public void setUp() {
taskPool = new SerialExecutorService(NUM_OF_SLOTS);
submitSerial = new ConcurrentHashMap<>();
execSerial = new ConcurrentHashMap<>();
}
@After
public void tearDown() {
if (taskPool != null) {
taskPool.close();
}
}
@Test
public void testSubmit() {
for (long i = 0; i < THREAD_NUM; i++) {
if (!submitSerial.containsKey(i)) {
submitSerial.put(i, new ArrayList<>());
}
SubmitThread thread = new SubmitThread("Thread-" + i, i, submitSerial.get(i));
thread.start();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
// The submission order of the same signature should be equal to the execution order
Assert.assertEquals(submitSerial.size(), THREAD_NUM);
Assert.assertEquals(submitSerial.size(), execSerial.size());
for (long i = 0; i < THREAD_NUM; i++) {
Assert.assertTrue(submitSerial.containsKey(i));
Assert.assertTrue(execSerial.containsKey(i));
List<Integer> submitSerialList = submitSerial.get(i);
List<Integer> execSerialList = execSerial.get(i);
Assert.assertEquals(submitSerialList.size(), execSerialList.size());
for (int j = 0; j < submitSerialList.size(); j++) {
Assert.assertEquals(submitSerialList.get(j), execSerialList.get(j));
}
}
}
private static class TestSyncTask extends SyncTask {
public int serial;
public TestSyncTask(long signature, int index, int serial, SyncChannelCallback callback) {
super(signature, index, callback);
this.serial = serial;
}
@Override
protected void exec() {
LOG.info("run exec. signature: {}, index: {}, serial: {}", signature, index, serial);
if (!execSerial.containsKey(signature)) {
execSerial.put(signature, new ArrayList<>());
}
execSerial.get(signature).add(serial);
}
}
private static class SubmitThread extends Thread {
private int index = SyncTaskPool.getNextIndex();
private long signature;
private List<Integer> submitSerialList;
public SubmitThread(String name, long signature, List<Integer> submitSerialList) {
super(name);
this.signature = signature;
this.submitSerialList = submitSerialList;
}
public void run() {
for (int i = 0; i < 100; i++) {
TestSyncTask task = new TestSyncTask(signature, index, i, new SyncChannelCallback() {
@Override
public void onFinished(long channelId) {
}
@Override
public void onFailed(String errMsg) {
}
});
submitSerialList.add(i);
taskPool.submit(task);
}
}
}
}