From a3b7cf484b9b58d483eb468b76bb4fed5c76f480 Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Fri, 6 Dec 2019 21:51:00 +0800 Subject: [PATCH] Set the load channel's timeout to be the same as the load job's timeout (#2405) [Load] When performing a long-time load job, the following errors may occur. Causes the load to fail. load channel manager add batch with unknown load id: xxx There is a case of this error because Doris opened an unrelated channel during the load process. This channel will not receive any data during the entire load process. Therefore, after a fixed timeout, the channel will be released. And after the entire load job is completed, it will try to close all open channels. When it try to close this channel, it will find that the channel no longer exists and an error is reported. This CL will pass the timeout of load job to the load channel, so that the timeout of load channels will be same as load job's. --- be/src/exec/tablet_sink.cpp | 7 +++++++ be/src/exec/tablet_sink.h | 3 +++ be/src/runtime/load_channel.cpp | 4 ++-- be/src/runtime/load_channel.h | 8 +++++++- be/src/runtime/load_channel_mgr.cpp | 17 +++++++++++++---- be/src/runtime/load_channel_mgr.h | 2 ++ be/src/runtime/tablets_channel.cpp | 4 +++- .../org/apache/doris/analysis/InsertStmt.java | 4 ++-- .../apache/doris/load/loadv2/BrokerLoadJob.java | 2 +- .../load/loadv2/BrokerLoadPendingTask.java | 17 +++++++++++------ .../org/apache/doris/load/loadv2/LoadJob.java | 1 - .../doris/load/loadv2/LoadLoadingTask.java | 8 ++++++-- .../doris/load/loadv2/LoadingTaskPlanner.java | 6 ++++-- .../org/apache/doris/planner/OlapTableSink.java | 3 ++- .../apache/doris/planner/StreamLoadPlanner.java | 2 +- .../apache/doris/planner/OlapTableSinkTest.java | 8 ++++---- gensrc/proto/internal_service.proto | 1 + gensrc/thrift/DataSinks.thrift | 1 + 18 files changed, 70 insertions(+), 28 deletions(-) diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index cb203f0689..597c7d49bc 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -97,6 +97,7 @@ void NodeChannel::open() { request.set_num_senders(_parent->_num_senders); request.set_need_gen_rollup(_parent->_need_gen_rollup); request.set_load_mem_limit(_parent->_load_mem_limit); + request.set_load_channel_timeout_s(_parent->_load_channel_timeout_s); _open_closure = new RefCountClosure(); _open_closure->ref(); @@ -423,6 +424,12 @@ Status OlapTableSink::init(const TDataSink& t_sink) { _location = _pool->add(new OlapTableLocationParam(table_sink.location)); _nodes_info = _pool->add(new DorisNodesInfo(table_sink.nodes_info)); + if (table_sink.__isset.load_channel_timeout_s) { + _load_channel_timeout_s = table_sink.load_channel_timeout_s; + } else { + _load_channel_timeout_s = config::streaming_load_rpc_max_alive_time_sec; + } + return Status::OK(); } diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 624caad9b2..77fb89f1cb 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -286,6 +286,9 @@ private: // load mem limit is for remote load channel int64_t _load_mem_limit = -1; + + // the timeout of load channels opened by this tablet sink. in second + int64_t _load_channel_timeout_s = 0; }; } diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index 7f47069c7c..584443da42 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -23,8 +23,8 @@ namespace doris { -LoadChannel::LoadChannel(const UniqueId& load_id, int64_t mem_limit, MemTracker* mem_tracker) - : _load_id(load_id) { +LoadChannel::LoadChannel(const UniqueId& load_id, int64_t mem_limit, MemTracker* mem_tracker, int64_t timeout_s) + : _load_id(load_id), _timeout_s(timeout_s) { _mem_tracker.reset(new MemTracker(mem_limit, _load_id.to_string(), mem_tracker)); // _last_updated_time should be set before being inserted to // _load_channels in load_channel_mgr, or it may be erased diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h index d6ddc1be94..414a57680f 100644 --- a/be/src/runtime/load_channel.h +++ b/be/src/runtime/load_channel.h @@ -36,7 +36,7 @@ class TabletsChannel; // corresponding to a certain load job class LoadChannel { public: - LoadChannel(const UniqueId& load_id, int64_t mem_limit, MemTracker* mem_tracker); + LoadChannel(const UniqueId& load_id, int64_t mem_limit, MemTracker* mem_tracker, int64_t timeout_s); ~LoadChannel(); // open a new load channel if not exist @@ -65,6 +65,8 @@ public: int64_t mem_consumption() const { return _mem_tracker->consumption(); } + int64_t timeout() const { return _timeout_s; } + private: // when mem consumption exceeds limit, should call this to find the max mem consumption channel // and try to reduce its mem usage. @@ -85,6 +87,10 @@ private: bool _opened = false; std::atomic _last_updated_time; + + // the timeout of this load channel. + // if channel is timeout, it will be removed by load channel manager. + int64_t _timeout_s; }; } diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp index bb76ab1e84..021ab34077 100644 --- a/be/src/runtime/load_channel_mgr.cpp +++ b/be/src/runtime/load_channel_mgr.cpp @@ -64,7 +64,8 @@ Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) { } else { // create a new load channel int64_t load_mem_limit = _calc_load_mem_limit(params.has_load_mem_limit() ? params.load_mem_limit() : -1); - channel.reset(new LoadChannel(load_id, load_mem_limit, _mem_tracker.get())); + int64_t load_channel_timeout_s = _get_load_channel_timeout(params.has_load_channel_timeout_s() ? params.load_channel_timeout_s() : -1); + channel.reset(new LoadChannel(load_id, load_mem_limit, _mem_tracker.get(), load_channel_timeout_s)); _load_channels.insert({load_id, channel}); } } @@ -86,6 +87,14 @@ int64_t LoadChannelMgr::_calc_load_mem_limit(int64_t mem_limit) { return load_mem_limit; } +int64_t LoadChannelMgr::_get_load_channel_timeout(int64_t timeout_s) { + int64_t load_channel_timeout_s = config::streaming_load_rpc_max_alive_time_sec; + if (timeout_s > 0) { + load_channel_timeout_s = std::max(load_channel_timeout_s, timeout_s); + } + return load_channel_timeout_s; +} + static void dummy_deleter(const CacheKey& key, void* value) { } @@ -204,14 +213,13 @@ Status LoadChannelMgr::_start_bg_worker() { Status LoadChannelMgr::_start_load_channels_clean() { std::vector> need_delete_channels; - const int32_t max_alive_time = config::streaming_load_rpc_max_alive_time_sec; time_t now = time(nullptr); { std::vector need_delete_channel_ids; std::lock_guard l(_lock); for (auto& kv : _load_channels) { time_t last_updated_time = kv.second->last_updated_time(); - if (difftime(now, last_updated_time) >= max_alive_time) { + if (difftime(now, last_updated_time) >= kv.second->timeout()) { need_delete_channel_ids.emplace_back(kv.first); need_delete_channels.emplace_back(kv.second); } @@ -228,7 +236,8 @@ Status LoadChannelMgr::_start_load_channels_clean() { // eg: MemTracker in load channel for (auto& channel : need_delete_channels) { channel->cancel(); - LOG(INFO) << "load channel has been safely deleted: " << channel->load_id(); + LOG(INFO) << "load channel has been safely deleted: " << channel->load_id() + << ", timeout(s): " << channel->timeout(); } // this log print every 1 min, so that we could observe the mem consumption of load process diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h index 64e7a37d07..559e54c5f5 100644 --- a/be/src/runtime/load_channel_mgr.h +++ b/be/src/runtime/load_channel_mgr.h @@ -60,6 +60,8 @@ private: int64_t _calc_total_mem_limit(int64_t process_mem_limit); // calculate the memory limit for a single load process. int64_t _calc_load_mem_limit(int64_t mem_limit); + // get the timeout of load channels + int64_t _get_load_channel_timeout(int64_t timeout_s); // check if the total load mem consumption exceeds limit. // If yes, it will pick a load channel to try to reduce memory consumption. diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 731e86187e..2fd1a6b787 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -46,7 +46,9 @@ Status TabletsChannel::open(const PTabletWriterOpenRequest& params) { // Normal case, already open by other sender return Status::OK(); } - LOG(INFO) << "open tablets channel: " << _key; + LOG(INFO) << "open tablets channel: " << _key + << ", tablets num: " << params.tablets().size() + << ", timeout(s): " << params.load_channel_timeout_s(); _txn_id = params.txn_id(); _index_id = params.index_id(); _schema = new OlapTableSchemaParam(); diff --git a/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java b/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java index b456310e5f..9e0f79d8b3 100644 --- a/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java @@ -281,6 +281,7 @@ public class InsertStmt extends DdlStmt { uuid = UUID.randomUUID(); // create label and begin transaction + long timeoutSecond = ConnectContext.get().getSessionVariable().getQueryTimeoutS(); if (!isExplain() && !isTransactionBegin) { if (Strings.isNullOrEmpty(label)) { label = "insert_" + uuid.toString(); @@ -288,7 +289,6 @@ public class InsertStmt extends DdlStmt { if (targetTable instanceof OlapTable) { LoadJobSourceType sourceType = LoadJobSourceType.INSERT_STREAMING; - long timeoutSecond = ConnectContext.get().getSessionVariable().getQueryTimeoutS(); transactionId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(), label, "FE: " + FrontendOptions.getLocalHostAddress(), sourceType, timeoutSecond); } @@ -299,7 +299,7 @@ public class InsertStmt extends DdlStmt { if (!isExplain() && targetTable instanceof OlapTable) { OlapTableSink sink = (OlapTableSink) dataSink; TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); - sink.init(loadId, transactionId, db.getId()); + sink.init(loadId, transactionId, db.getId(), timeoutSecond); } } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index f0097e6bf4..b1bf6851b4 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -374,7 +374,7 @@ public class BrokerLoadJob extends LoadJob { // Generate loading task and init the plan of task LoadLoadingTask task = new LoadLoadingTask(db, table, brokerDesc, entry.getValue(), getDeadlineMs(), execMemLimit, - strictMode, transactionId, this, timezone); + strictMode, transactionId, this, timezone, timeoutSecond); UUID uuid = UUID.randomUUID(); TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); task.init(loadId, attachment.getFileStatusByTable(tableId), attachment.getFileNumByTable(tableId)); diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java index 66bc62c182..d72c84dd1b 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java @@ -61,13 +61,15 @@ public class BrokerLoadPendingTask extends LoadTask { private void getAllFileStatus() throws UserException { long start = System.currentTimeMillis(); + long totalFileSize = 0; + int totalFileNum = 0; for (Map.Entry> entry : tableToBrokerFileList.entrySet()) { long tableId = entry.getKey(); List> fileStatusList = Lists.newArrayList(); List fileGroups = entry.getValue(); - long totalFileSize = 0; - int totalFileNum = 0; + long tableTotalFileSize = 0; + int tabletotalFileNum = 0; int groupNum = 0; for (BrokerFileGroup fileGroup : fileGroups) { long groupFileSize = 0; @@ -83,17 +85,20 @@ public class BrokerLoadPendingTask extends LoadTask { .add("file_status", fstatus).build()); } } - totalFileSize += groupFileSize; - totalFileNum += fileStatuses.size(); + tableTotalFileSize += groupFileSize; + tabletotalFileNum += fileStatuses.size(); LOG.info("get {} files in file group {} for table {}. size: {}. job: {}", fileStatuses.size(), groupNum, entry.getKey(), groupFileSize, callback.getCallbackId()); groupNum++; } - ((BrokerLoadJob) callback).setLoadFileInfo(totalFileNum, totalFileSize); + totalFileSize += tableTotalFileSize; + totalFileNum += tabletotalFileNum; ((BrokerPendingTaskAttachment) attachment).addFileStatus(tableId, fileStatusList); LOG.info("get {} files to be loaded. total size: {}. cost: {} ms, job: {}", - totalFileNum, totalFileSize, (System.currentTimeMillis() - start), callback.getCallbackId()); + tabletotalFileNum, tableTotalFileSize, (System.currentTimeMillis() - start), callback.getCallbackId()); } + + ((BrokerLoadJob) callback).setLoadFileInfo(totalFileNum, totalFileSize); } } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index 5b773af5a5..8f6830958e 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -580,7 +580,6 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements } } idToTasks.clear(); - loadStatistic.clearAllLoads(); // set failMsg and state this.failMsg = failMsg; diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java index 64d09f9a72..464744de7d 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java @@ -58,13 +58,16 @@ public class LoadLoadingTask extends LoadTask { private final boolean strictMode; private final long txnId; private final String timezone; + // timeout of load job, in seconds + private final long timeoutS; private LoadingTaskPlanner planner; public LoadLoadingTask(Database db, OlapTable table, BrokerDesc brokerDesc, List fileGroups, long jobDeadlineMs, long execMemLimit, boolean strictMode, - long txnId, LoadTaskCallback callback, String timezone) { + long txnId, LoadTaskCallback callback, String timezone, + long timeoutS) { super(callback); this.db = db; this.table = table; @@ -77,11 +80,12 @@ public class LoadLoadingTask extends LoadTask { this.failMsg = new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL); this.retryTime = 2; // 2 times is enough this.timezone = timezone; + this.timeoutS = timeoutS; } public void init(TUniqueId loadId, List> fileStatusList, int fileNum) throws UserException { this.loadId = loadId; - planner = new LoadingTaskPlanner(callback.getCallbackId(), txnId, db.getId(), table, brokerDesc, fileGroups, strictMode, timezone); + planner = new LoadingTaskPlanner(callback.getCallbackId(), txnId, db.getId(), table, brokerDesc, fileGroups, strictMode, timezone, this.timeoutS); planner.plan(loadId, fileStatusList, fileNum); } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java index e0019e68a7..f994438ab0 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java @@ -67,6 +67,7 @@ public class LoadingTaskPlanner { private final BrokerDesc brokerDesc; private final List fileGroups; private final boolean strictMode; + private final long timeoutS; // timeout of load job, in second // Something useful private Analyzer analyzer = new Analyzer(Catalog.getInstance(), null); @@ -80,7 +81,7 @@ public class LoadingTaskPlanner { public LoadingTaskPlanner(Long loadJobId, long txnId, long dbId, OlapTable table, BrokerDesc brokerDesc, List brokerFileGroups, - boolean strictMode, String timezone) { + boolean strictMode, String timezone, long timeoutS) { this.loadJobId = loadJobId; this.txnId = txnId; this.dbId = dbId; @@ -89,6 +90,7 @@ public class LoadingTaskPlanner { this.fileGroups = brokerFileGroups; this.strictMode = strictMode; this.analyzer.setTimezone(timezone); + this.timeoutS = timeoutS; } public void plan(TUniqueId loadId, List> fileStatusesList, int filesAdded) @@ -122,7 +124,7 @@ public class LoadingTaskPlanner { // 2. Olap table sink String partitionNames = convertBrokerDescPartitionInfo(); OlapTableSink olapTableSink = new OlapTableSink(table, tupleDesc, partitionNames); - olapTableSink.init(loadId, txnId, dbId); + olapTableSink.init(loadId, txnId, dbId, timeoutS); olapTableSink.finalize(); // 3. Plan fragment diff --git a/fe/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/src/main/java/org/apache/doris/planner/OlapTableSink.java index b32e511951..6378c8b415 100644 --- a/fe/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -88,11 +88,12 @@ public class OlapTableSink extends DataSink { this.partitions = Strings.emptyToNull(partitions); } - public void init(TUniqueId loadId, long txnId, long dbId) throws AnalysisException { + public void init(TUniqueId loadId, long txnId, long dbId, long loadChannelTimeoutS) throws AnalysisException { TOlapTableSink tSink = new TOlapTableSink(); tSink.setLoad_id(loadId); tSink.setTxn_id(txnId); tSink.setDb_id(dbId); + tSink.setLoad_channel_timeout_s(loadChannelTimeoutS); tDataSink = new TDataSink(TDataSinkType.DATA_SPLIT_SINK); tDataSink.setType(TDataSinkType.OLAP_TABLE_SINK); tDataSink.setOlap_table_sink(tSink); diff --git a/fe/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index b2491f9b82..c39763833e 100644 --- a/fe/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -108,7 +108,7 @@ public class StreamLoadPlanner { // create dest sink OlapTableSink olapTableSink = new OlapTableSink(destTable, tupleDesc, streamLoadTask.getPartitions()); - olapTableSink.init(loadId, streamLoadTask.getTxnId(), db.getId()); + olapTableSink.init(loadId, streamLoadTask.getTxnId(), db.getId(), streamLoadTask.getTimeout()); olapTableSink.finalize(); // for stream load, we only need one fragment, ScanNode -> DataSink. diff --git a/fe/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java b/fe/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java index 8d422bf015..8c70e4c9e8 100644 --- a/fe/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java +++ b/fe/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java @@ -100,7 +100,7 @@ public class OlapTableSinkTest { }}; OlapTableSink sink = new OlapTableSink(dstTable, tuple, ""); - sink.init(new TUniqueId(1, 2), 3, 4); + sink.init(new TUniqueId(1, 2), 3, 4, 1000); sink.finalize(); LOG.info("sink is {}", sink.toThrift()); LOG.info("{}", sink.getExplainString("", TExplainLevel.NORMAL)); @@ -136,7 +136,7 @@ public class OlapTableSinkTest { }}; OlapTableSink sink = new OlapTableSink(dstTable, tuple, "p1"); - sink.init(new TUniqueId(1, 2), 3, 4); + sink.init(new TUniqueId(1, 2), 3, 4, 1000); try { sink.finalize(); } catch (UserException e) { @@ -158,7 +158,7 @@ public class OlapTableSinkTest { }}; OlapTableSink sink = new OlapTableSink(dstTable, tuple, "p3"); - sink.init(new TUniqueId(1, 2), 3, 4); + sink.init(new TUniqueId(1, 2), 3, 4, 1000); sink.finalize(); LOG.info("sink is {}", sink.toThrift()); LOG.info("{}", sink.getExplainString("", TExplainLevel.NORMAL)); @@ -175,7 +175,7 @@ public class OlapTableSinkTest { }}; OlapTableSink sink = new OlapTableSink(dstTable, tuple, "p1"); - sink.init(new TUniqueId(1, 2), 3, 4); + sink.init(new TUniqueId(1, 2), 3, 4, 1000); sink.finalize(); LOG.info("sink is {}", sink.toThrift()); LOG.info("{}", sink.getExplainString("", TExplainLevel.NORMAL)); diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index ae559015f0..d88cc3176c 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -69,6 +69,7 @@ message PTabletWriterOpenRequest { required int32 num_senders = 6; required bool need_gen_rollup = 7; optional int64 load_mem_limit = 8; + optional int64 load_channel_timeout_s = 9; }; message PTabletWriterOpenResult { diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index 74822b6d48..2792a07276 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -104,6 +104,7 @@ struct TOlapTableSink { 11: required Descriptors.TOlapTablePartitionParam partition 12: required Descriptors.TOlapTableLocationParam location 13: required Descriptors.TPaloNodesInfo nodes_info + 14: optional i64 load_channel_timeout_s // the timeout of load channels in second } struct TDataSink {