diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index cb203f0689..597c7d49bc 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -97,6 +97,7 @@ void NodeChannel::open() { request.set_num_senders(_parent->_num_senders); request.set_need_gen_rollup(_parent->_need_gen_rollup); request.set_load_mem_limit(_parent->_load_mem_limit); + request.set_load_channel_timeout_s(_parent->_load_channel_timeout_s); _open_closure = new RefCountClosure(); _open_closure->ref(); @@ -423,6 +424,12 @@ Status OlapTableSink::init(const TDataSink& t_sink) { _location = _pool->add(new OlapTableLocationParam(table_sink.location)); _nodes_info = _pool->add(new DorisNodesInfo(table_sink.nodes_info)); + if (table_sink.__isset.load_channel_timeout_s) { + _load_channel_timeout_s = table_sink.load_channel_timeout_s; + } else { + _load_channel_timeout_s = config::streaming_load_rpc_max_alive_time_sec; + } + return Status::OK(); } diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 624caad9b2..77fb89f1cb 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -286,6 +286,9 @@ private: // load mem limit is for remote load channel int64_t _load_mem_limit = -1; + + // the timeout of load channels opened by this tablet sink. in second + int64_t _load_channel_timeout_s = 0; }; } diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index 7f47069c7c..584443da42 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -23,8 +23,8 @@ namespace doris { -LoadChannel::LoadChannel(const UniqueId& load_id, int64_t mem_limit, MemTracker* mem_tracker) - : _load_id(load_id) { +LoadChannel::LoadChannel(const UniqueId& load_id, int64_t mem_limit, MemTracker* mem_tracker, int64_t timeout_s) + : _load_id(load_id), _timeout_s(timeout_s) { _mem_tracker.reset(new MemTracker(mem_limit, _load_id.to_string(), mem_tracker)); // _last_updated_time should be set before being inserted to // _load_channels in load_channel_mgr, or it may be erased diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h index d6ddc1be94..414a57680f 100644 --- a/be/src/runtime/load_channel.h +++ b/be/src/runtime/load_channel.h @@ -36,7 +36,7 @@ class TabletsChannel; // corresponding to a certain load job class LoadChannel { public: - LoadChannel(const UniqueId& load_id, int64_t mem_limit, MemTracker* mem_tracker); + LoadChannel(const UniqueId& load_id, int64_t mem_limit, MemTracker* mem_tracker, int64_t timeout_s); ~LoadChannel(); // open a new load channel if not exist @@ -65,6 +65,8 @@ public: int64_t mem_consumption() const { return _mem_tracker->consumption(); } + int64_t timeout() const { return _timeout_s; } + private: // when mem consumption exceeds limit, should call this to find the max mem consumption channel // and try to reduce its mem usage. @@ -85,6 +87,10 @@ private: bool _opened = false; std::atomic _last_updated_time; + + // the timeout of this load channel. + // if channel is timeout, it will be removed by load channel manager. + int64_t _timeout_s; }; } diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp index bb76ab1e84..021ab34077 100644 --- a/be/src/runtime/load_channel_mgr.cpp +++ b/be/src/runtime/load_channel_mgr.cpp @@ -64,7 +64,8 @@ Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) { } else { // create a new load channel int64_t load_mem_limit = _calc_load_mem_limit(params.has_load_mem_limit() ? params.load_mem_limit() : -1); - channel.reset(new LoadChannel(load_id, load_mem_limit, _mem_tracker.get())); + int64_t load_channel_timeout_s = _get_load_channel_timeout(params.has_load_channel_timeout_s() ? params.load_channel_timeout_s() : -1); + channel.reset(new LoadChannel(load_id, load_mem_limit, _mem_tracker.get(), load_channel_timeout_s)); _load_channels.insert({load_id, channel}); } } @@ -86,6 +87,14 @@ int64_t LoadChannelMgr::_calc_load_mem_limit(int64_t mem_limit) { return load_mem_limit; } +int64_t LoadChannelMgr::_get_load_channel_timeout(int64_t timeout_s) { + int64_t load_channel_timeout_s = config::streaming_load_rpc_max_alive_time_sec; + if (timeout_s > 0) { + load_channel_timeout_s = std::max(load_channel_timeout_s, timeout_s); + } + return load_channel_timeout_s; +} + static void dummy_deleter(const CacheKey& key, void* value) { } @@ -204,14 +213,13 @@ Status LoadChannelMgr::_start_bg_worker() { Status LoadChannelMgr::_start_load_channels_clean() { std::vector> need_delete_channels; - const int32_t max_alive_time = config::streaming_load_rpc_max_alive_time_sec; time_t now = time(nullptr); { std::vector need_delete_channel_ids; std::lock_guard l(_lock); for (auto& kv : _load_channels) { time_t last_updated_time = kv.second->last_updated_time(); - if (difftime(now, last_updated_time) >= max_alive_time) { + if (difftime(now, last_updated_time) >= kv.second->timeout()) { need_delete_channel_ids.emplace_back(kv.first); need_delete_channels.emplace_back(kv.second); } @@ -228,7 +236,8 @@ Status LoadChannelMgr::_start_load_channels_clean() { // eg: MemTracker in load channel for (auto& channel : need_delete_channels) { channel->cancel(); - LOG(INFO) << "load channel has been safely deleted: " << channel->load_id(); + LOG(INFO) << "load channel has been safely deleted: " << channel->load_id() + << ", timeout(s): " << channel->timeout(); } // this log print every 1 min, so that we could observe the mem consumption of load process diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h index 64e7a37d07..559e54c5f5 100644 --- a/be/src/runtime/load_channel_mgr.h +++ b/be/src/runtime/load_channel_mgr.h @@ -60,6 +60,8 @@ private: int64_t _calc_total_mem_limit(int64_t process_mem_limit); // calculate the memory limit for a single load process. int64_t _calc_load_mem_limit(int64_t mem_limit); + // get the timeout of load channels + int64_t _get_load_channel_timeout(int64_t timeout_s); // check if the total load mem consumption exceeds limit. // If yes, it will pick a load channel to try to reduce memory consumption. diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 731e86187e..2fd1a6b787 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -46,7 +46,9 @@ Status TabletsChannel::open(const PTabletWriterOpenRequest& params) { // Normal case, already open by other sender return Status::OK(); } - LOG(INFO) << "open tablets channel: " << _key; + LOG(INFO) << "open tablets channel: " << _key + << ", tablets num: " << params.tablets().size() + << ", timeout(s): " << params.load_channel_timeout_s(); _txn_id = params.txn_id(); _index_id = params.index_id(); _schema = new OlapTableSchemaParam(); diff --git a/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java b/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java index b456310e5f..9e0f79d8b3 100644 --- a/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java @@ -281,6 +281,7 @@ public class InsertStmt extends DdlStmt { uuid = UUID.randomUUID(); // create label and begin transaction + long timeoutSecond = ConnectContext.get().getSessionVariable().getQueryTimeoutS(); if (!isExplain() && !isTransactionBegin) { if (Strings.isNullOrEmpty(label)) { label = "insert_" + uuid.toString(); @@ -288,7 +289,6 @@ public class InsertStmt extends DdlStmt { if (targetTable instanceof OlapTable) { LoadJobSourceType sourceType = LoadJobSourceType.INSERT_STREAMING; - long timeoutSecond = ConnectContext.get().getSessionVariable().getQueryTimeoutS(); transactionId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(), label, "FE: " + FrontendOptions.getLocalHostAddress(), sourceType, timeoutSecond); } @@ -299,7 +299,7 @@ public class InsertStmt extends DdlStmt { if (!isExplain() && targetTable instanceof OlapTable) { OlapTableSink sink = (OlapTableSink) dataSink; TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); - sink.init(loadId, transactionId, db.getId()); + sink.init(loadId, transactionId, db.getId(), timeoutSecond); } } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index f0097e6bf4..b1bf6851b4 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -374,7 +374,7 @@ public class BrokerLoadJob extends LoadJob { // Generate loading task and init the plan of task LoadLoadingTask task = new LoadLoadingTask(db, table, brokerDesc, entry.getValue(), getDeadlineMs(), execMemLimit, - strictMode, transactionId, this, timezone); + strictMode, transactionId, this, timezone, timeoutSecond); UUID uuid = UUID.randomUUID(); TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); task.init(loadId, attachment.getFileStatusByTable(tableId), attachment.getFileNumByTable(tableId)); diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java index 66bc62c182..d72c84dd1b 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java @@ -61,13 +61,15 @@ public class BrokerLoadPendingTask extends LoadTask { private void getAllFileStatus() throws UserException { long start = System.currentTimeMillis(); + long totalFileSize = 0; + int totalFileNum = 0; for (Map.Entry> entry : tableToBrokerFileList.entrySet()) { long tableId = entry.getKey(); List> fileStatusList = Lists.newArrayList(); List fileGroups = entry.getValue(); - long totalFileSize = 0; - int totalFileNum = 0; + long tableTotalFileSize = 0; + int tabletotalFileNum = 0; int groupNum = 0; for (BrokerFileGroup fileGroup : fileGroups) { long groupFileSize = 0; @@ -83,17 +85,20 @@ public class BrokerLoadPendingTask extends LoadTask { .add("file_status", fstatus).build()); } } - totalFileSize += groupFileSize; - totalFileNum += fileStatuses.size(); + tableTotalFileSize += groupFileSize; + tabletotalFileNum += fileStatuses.size(); LOG.info("get {} files in file group {} for table {}. size: {}. job: {}", fileStatuses.size(), groupNum, entry.getKey(), groupFileSize, callback.getCallbackId()); groupNum++; } - ((BrokerLoadJob) callback).setLoadFileInfo(totalFileNum, totalFileSize); + totalFileSize += tableTotalFileSize; + totalFileNum += tabletotalFileNum; ((BrokerPendingTaskAttachment) attachment).addFileStatus(tableId, fileStatusList); LOG.info("get {} files to be loaded. total size: {}. cost: {} ms, job: {}", - totalFileNum, totalFileSize, (System.currentTimeMillis() - start), callback.getCallbackId()); + tabletotalFileNum, tableTotalFileSize, (System.currentTimeMillis() - start), callback.getCallbackId()); } + + ((BrokerLoadJob) callback).setLoadFileInfo(totalFileNum, totalFileSize); } } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index 5b773af5a5..8f6830958e 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -580,7 +580,6 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements } } idToTasks.clear(); - loadStatistic.clearAllLoads(); // set failMsg and state this.failMsg = failMsg; diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java index 64d09f9a72..464744de7d 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java @@ -58,13 +58,16 @@ public class LoadLoadingTask extends LoadTask { private final boolean strictMode; private final long txnId; private final String timezone; + // timeout of load job, in seconds + private final long timeoutS; private LoadingTaskPlanner planner; public LoadLoadingTask(Database db, OlapTable table, BrokerDesc brokerDesc, List fileGroups, long jobDeadlineMs, long execMemLimit, boolean strictMode, - long txnId, LoadTaskCallback callback, String timezone) { + long txnId, LoadTaskCallback callback, String timezone, + long timeoutS) { super(callback); this.db = db; this.table = table; @@ -77,11 +80,12 @@ public class LoadLoadingTask extends LoadTask { this.failMsg = new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL); this.retryTime = 2; // 2 times is enough this.timezone = timezone; + this.timeoutS = timeoutS; } public void init(TUniqueId loadId, List> fileStatusList, int fileNum) throws UserException { this.loadId = loadId; - planner = new LoadingTaskPlanner(callback.getCallbackId(), txnId, db.getId(), table, brokerDesc, fileGroups, strictMode, timezone); + planner = new LoadingTaskPlanner(callback.getCallbackId(), txnId, db.getId(), table, brokerDesc, fileGroups, strictMode, timezone, this.timeoutS); planner.plan(loadId, fileStatusList, fileNum); } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java index e0019e68a7..f994438ab0 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java @@ -67,6 +67,7 @@ public class LoadingTaskPlanner { private final BrokerDesc brokerDesc; private final List fileGroups; private final boolean strictMode; + private final long timeoutS; // timeout of load job, in second // Something useful private Analyzer analyzer = new Analyzer(Catalog.getInstance(), null); @@ -80,7 +81,7 @@ public class LoadingTaskPlanner { public LoadingTaskPlanner(Long loadJobId, long txnId, long dbId, OlapTable table, BrokerDesc brokerDesc, List brokerFileGroups, - boolean strictMode, String timezone) { + boolean strictMode, String timezone, long timeoutS) { this.loadJobId = loadJobId; this.txnId = txnId; this.dbId = dbId; @@ -89,6 +90,7 @@ public class LoadingTaskPlanner { this.fileGroups = brokerFileGroups; this.strictMode = strictMode; this.analyzer.setTimezone(timezone); + this.timeoutS = timeoutS; } public void plan(TUniqueId loadId, List> fileStatusesList, int filesAdded) @@ -122,7 +124,7 @@ public class LoadingTaskPlanner { // 2. Olap table sink String partitionNames = convertBrokerDescPartitionInfo(); OlapTableSink olapTableSink = new OlapTableSink(table, tupleDesc, partitionNames); - olapTableSink.init(loadId, txnId, dbId); + olapTableSink.init(loadId, txnId, dbId, timeoutS); olapTableSink.finalize(); // 3. Plan fragment diff --git a/fe/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/src/main/java/org/apache/doris/planner/OlapTableSink.java index b32e511951..6378c8b415 100644 --- a/fe/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -88,11 +88,12 @@ public class OlapTableSink extends DataSink { this.partitions = Strings.emptyToNull(partitions); } - public void init(TUniqueId loadId, long txnId, long dbId) throws AnalysisException { + public void init(TUniqueId loadId, long txnId, long dbId, long loadChannelTimeoutS) throws AnalysisException { TOlapTableSink tSink = new TOlapTableSink(); tSink.setLoad_id(loadId); tSink.setTxn_id(txnId); tSink.setDb_id(dbId); + tSink.setLoad_channel_timeout_s(loadChannelTimeoutS); tDataSink = new TDataSink(TDataSinkType.DATA_SPLIT_SINK); tDataSink.setType(TDataSinkType.OLAP_TABLE_SINK); tDataSink.setOlap_table_sink(tSink); diff --git a/fe/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index b2491f9b82..c39763833e 100644 --- a/fe/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -108,7 +108,7 @@ public class StreamLoadPlanner { // create dest sink OlapTableSink olapTableSink = new OlapTableSink(destTable, tupleDesc, streamLoadTask.getPartitions()); - olapTableSink.init(loadId, streamLoadTask.getTxnId(), db.getId()); + olapTableSink.init(loadId, streamLoadTask.getTxnId(), db.getId(), streamLoadTask.getTimeout()); olapTableSink.finalize(); // for stream load, we only need one fragment, ScanNode -> DataSink. diff --git a/fe/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java b/fe/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java index 8d422bf015..8c70e4c9e8 100644 --- a/fe/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java +++ b/fe/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java @@ -100,7 +100,7 @@ public class OlapTableSinkTest { }}; OlapTableSink sink = new OlapTableSink(dstTable, tuple, ""); - sink.init(new TUniqueId(1, 2), 3, 4); + sink.init(new TUniqueId(1, 2), 3, 4, 1000); sink.finalize(); LOG.info("sink is {}", sink.toThrift()); LOG.info("{}", sink.getExplainString("", TExplainLevel.NORMAL)); @@ -136,7 +136,7 @@ public class OlapTableSinkTest { }}; OlapTableSink sink = new OlapTableSink(dstTable, tuple, "p1"); - sink.init(new TUniqueId(1, 2), 3, 4); + sink.init(new TUniqueId(1, 2), 3, 4, 1000); try { sink.finalize(); } catch (UserException e) { @@ -158,7 +158,7 @@ public class OlapTableSinkTest { }}; OlapTableSink sink = new OlapTableSink(dstTable, tuple, "p3"); - sink.init(new TUniqueId(1, 2), 3, 4); + sink.init(new TUniqueId(1, 2), 3, 4, 1000); sink.finalize(); LOG.info("sink is {}", sink.toThrift()); LOG.info("{}", sink.getExplainString("", TExplainLevel.NORMAL)); @@ -175,7 +175,7 @@ public class OlapTableSinkTest { }}; OlapTableSink sink = new OlapTableSink(dstTable, tuple, "p1"); - sink.init(new TUniqueId(1, 2), 3, 4); + sink.init(new TUniqueId(1, 2), 3, 4, 1000); sink.finalize(); LOG.info("sink is {}", sink.toThrift()); LOG.info("{}", sink.getExplainString("", TExplainLevel.NORMAL)); diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index ae559015f0..d88cc3176c 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -69,6 +69,7 @@ message PTabletWriterOpenRequest { required int32 num_senders = 6; required bool need_gen_rollup = 7; optional int64 load_mem_limit = 8; + optional int64 load_channel_timeout_s = 9; }; message PTabletWriterOpenResult { diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index 74822b6d48..2792a07276 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -104,6 +104,7 @@ struct TOlapTableSink { 11: required Descriptors.TOlapTablePartitionParam partition 12: required Descriptors.TOlapTableLocationParam location 13: required Descriptors.TPaloNodesInfo nodes_info + 14: optional i64 load_channel_timeout_s // the timeout of load channels in second } struct TDataSink {