From 13b26ee920dd539d9cbf06bb09ed675c8642c742 Mon Sep 17 00:00:00 2001 From: abmdocrt Date: Mon, 27 Nov 2023 15:10:26 +0800 Subject: [PATCH] [Fix](core) Fix wal space back pressure core and add regression test (#27311) --- be/src/olap/wal_manager.cpp | 14 +- be/src/olap/wal_manager.h | 2 + be/src/olap/wal_writer.cpp | 34 +++-- be/src/olap/wal_writer.h | 6 +- be/src/runtime/group_commit_mgr.cpp | 20 +-- be/src/vec/sink/group_commit_block_sink.cpp | 5 +- be/test/olap/wal_manager_test.cpp | 3 +- be/test/olap/wal_reader_writer_test.cpp | 3 +- ..._group_commit_and_wal_back_pressure.csv.gz | Bin 0 -> 372017 bytes ...est_group_commit_and_wal_back_pressure.out | 10 ++ ..._group_commit_and_wal_back_pressure.groovy | 134 ++++++++++++++++++ 11 files changed, 200 insertions(+), 31 deletions(-) create mode 100644 regression-test/data/load_p0/stream_load/test_group_commit_and_wal_back_pressure.csv.gz create mode 100644 regression-test/data/load_p0/stream_load/test_group_commit_and_wal_back_pressure.out create mode 100644 regression-test/suites/load_p0/stream_load/test_group_commit_and_wal_back_pressure.groovy diff --git a/be/src/olap/wal_manager.cpp b/be/src/olap/wal_manager.cpp index 10af149445..abb7d8a324 100644 --- a/be/src/olap/wal_manager.cpp +++ b/be/src/olap/wal_manager.cpp @@ -41,6 +41,7 @@ WalManager::WalManager(ExecEnv* exec_env, const std::string& wal_dir_list) : _exec_env(exec_env), _stop_background_threads_latch(1), _stop(false) { doris::vectorized::WalReader::string_split(wal_dir_list, ",", _wal_dirs); _all_wal_disk_bytes = std::make_shared(0); + _cv = std::make_shared(); } WalManager::~WalManager() { @@ -199,7 +200,7 @@ Status WalManager::create_wal_writer(int64_t wal_id, std::shared_ptr& RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(base_path)); } LOG(INFO) << "create wal " << wal_path; - wal_writer = std::make_shared(wal_path, _all_wal_disk_bytes); + wal_writer = std::make_shared(wal_path, _all_wal_disk_bytes, _cv); RETURN_IF_ERROR(wal_writer->init()); { std::lock_guard wrlock(_wal_lock); @@ -207,6 +208,7 @@ Status WalManager::create_wal_writer(int64_t wal_id, std::shared_ptr& } return Status::OK(); } + Status WalManager::scan_wals(const std::string& wal_path) { size_t count = 0; bool exists = true; @@ -336,13 +338,11 @@ Status WalManager::delete_wal(int64_t wal_id) { { std::lock_guard wrlock(_wal_lock); if (_wal_id_to_writer_map.find(wal_id) != _wal_id_to_writer_map.end()) { - _all_wal_disk_bytes->store( - _all_wal_disk_bytes->fetch_sub(_wal_id_to_writer_map[wal_id]->disk_bytes(), - std::memory_order_relaxed), - std::memory_order_relaxed); - _wal_id_to_writer_map[wal_id]->cv.notify_one(); + _all_wal_disk_bytes->fetch_sub(_wal_id_to_writer_map[wal_id]->disk_bytes(), + std::memory_order_relaxed); + _cv->notify_one(); std::string wal_path = _wal_path_map[wal_id]; - LOG(INFO) << "wal delete file=" << wal_path << ", this file disk usage is" + LOG(INFO) << "wal delete file=" << wal_path << ", this file disk usage is " << _wal_id_to_writer_map[wal_id]->disk_bytes() << " ,after deleting it, all wals disk usage is " << _all_wal_disk_bytes->load(std::memory_order_relaxed); diff --git a/be/src/olap/wal_manager.h b/be/src/olap/wal_manager.h index cf4589fbf0..2cfaaa4ec5 100644 --- a/be/src/olap/wal_manager.h +++ b/be/src/olap/wal_manager.h @@ -17,6 +17,7 @@ #include +#include #include #include "common/config.h" @@ -85,5 +86,6 @@ private: std::unordered_map> _wal_status_queues; std::atomic _stop; std::unordered_map&> _wal_column_id_map; + std::shared_ptr _cv; }; } // namespace doris \ No newline at end of file diff --git a/be/src/olap/wal_writer.cpp b/be/src/olap/wal_writer.cpp index 52202b35c3..9d3da90d88 100644 --- a/be/src/olap/wal_writer.cpp +++ b/be/src/olap/wal_writer.cpp @@ -18,6 +18,8 @@ #include "olap/wal_writer.h" #include +#include +#include #include "common/config.h" #include "io/fs/file_writer.h" @@ -32,8 +34,13 @@ const char* k_wal_magic = "WAL1"; const uint32_t k_wal_magic_length = 4; WalWriter::WalWriter(const std::string& file_name, - const std::shared_ptr& all_wal_disk_bytes) - : _file_name(file_name), _disk_bytes(0), _all_wal_disk_bytes(all_wal_disk_bytes) {} + const std::shared_ptr& all_wal_disk_bytes, + const std::shared_ptr& cv) + : cv(cv), + _file_name(file_name), + _disk_bytes(0), + _all_wal_disk_bytes(all_wal_disk_bytes), + _is_first_append_blocks(true) {} WalWriter::~WalWriter() {} @@ -52,9 +59,19 @@ Status WalWriter::finalize() { Status WalWriter::append_blocks(const PBlockArray& blocks) { { - std::unique_lock l(_mutex); - while (_all_wal_disk_bytes->load(std::memory_order_relaxed) > config::wal_max_disk_size) { - cv.wait_for(l, std::chrono::milliseconds(WalWriter::MAX_WAL_WRITE_WAIT_TIME)); + if (_is_first_append_blocks) { + _is_first_append_blocks = false; + std::unique_lock l(_mutex); + while (_all_wal_disk_bytes->load(std::memory_order_relaxed) > + config::wal_max_disk_size) { + LOG(INFO) << "First time to append blocks to wal file " << _file_name + << ". Currently, all wal disk space usage is " + << _all_wal_disk_bytes->load(std::memory_order_relaxed) + << ", larger than the maximum limit " << config::wal_max_disk_size + << ", so we need to wait. When any other load finished, that wal will be " + "removed, the space used by that wal will be free."; + cv->wait_for(l, std::chrono::milliseconds(WalWriter::MAX_WAL_WRITE_WAIT_TIME)); + } } } size_t total_size = 0; @@ -82,11 +99,8 @@ Status WalWriter::append_blocks(const PBlockArray& blocks) { "failed to write block to wal expected= " + std::to_string(total_size) + ",actually=" + std::to_string(offset)); } - _disk_bytes.store(_disk_bytes.fetch_add(total_size, std::memory_order_relaxed), - std::memory_order_relaxed); - _all_wal_disk_bytes->store( - _all_wal_disk_bytes->fetch_add(total_size, std::memory_order_relaxed), - std::memory_order_relaxed); + _disk_bytes.fetch_add(total_size, std::memory_order_relaxed); + _all_wal_disk_bytes->fetch_add(total_size, std::memory_order_relaxed); return Status::OK(); } diff --git a/be/src/olap/wal_writer.h b/be/src/olap/wal_writer.h index 058902783e..88ff465976 100644 --- a/be/src/olap/wal_writer.h +++ b/be/src/olap/wal_writer.h @@ -35,7 +35,8 @@ extern const uint32_t k_wal_magic_length; class WalWriter { public: explicit WalWriter(const std::string& file_name, - const std::shared_ptr& all_wal_disk_bytes); + const std::shared_ptr& all_wal_disk_bytes, + const std::shared_ptr& cv); ~WalWriter(); Status init(); @@ -50,7 +51,7 @@ public: public: static const int64_t LENGTH_SIZE = 8; static const int64_t CHECKSUM_SIZE = 4; - std::condition_variable cv; + std::shared_ptr cv; static const int64_t VERSION_SIZE = 4; private: @@ -60,6 +61,7 @@ private: std::atomic_size_t _disk_bytes; std::shared_ptr _all_wal_disk_bytes; std::mutex _mutex; + bool _is_first_append_blocks; }; } // namespace doris \ No newline at end of file diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 375af4db6c..c044ccca3b 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -18,6 +18,9 @@ #include "runtime/group_commit_mgr.h" #include +#include + +#include #include "client_cache.h" #include "common/config.h" @@ -33,14 +36,15 @@ Status LoadBlockQueue::add_block(std::shared_ptr block) DCHECK(block->get_schema_version() == schema_version); std::unique_lock l(mutex); RETURN_IF_ERROR(_status); - while (*_all_block_queues_bytes > config::group_commit_max_queue_size) { + while (_all_block_queues_bytes->load(std::memory_order_relaxed) > + config::group_commit_max_queue_size) { _put_cond.wait_for( l, std::chrono::milliseconds(LoadBlockQueue::MAX_BLOCK_QUEUE_ADD_WAIT_TIME)); } if (block->rows() > 0) { _block_queue.push_back(block); - *_all_block_queues_bytes += block->bytes(); - *_single_block_queue_bytes += block->bytes(); + _all_block_queues_bytes->fetch_add(block->bytes(), std::memory_order_relaxed); + _single_block_queue_bytes->fetch_add(block->bytes(), std::memory_order_relaxed); } _get_cond.notify_all(); return Status::OK(); @@ -81,11 +85,11 @@ Status LoadBlockQueue::get_block(vectorized::Block* block, bool* find_block, boo fblock->swap_future_block(future_block); *find_block = true; _block_queue.pop_front(); - *_all_block_queues_bytes -= fblock->bytes(); - *_single_block_queue_bytes -= block->bytes(); + _all_block_queues_bytes->fetch_sub(fblock->bytes(), std::memory_order_relaxed); + _single_block_queue_bytes->fetch_sub(block->bytes(), std::memory_order_relaxed); } if (_block_queue.empty() && need_commit && _load_ids.empty()) { - CHECK(*_single_block_queue_bytes == 0); + CHECK_EQ(_single_block_queue_bytes->load(), 0); *eos = true; } else { *eos = false; @@ -121,8 +125,8 @@ void LoadBlockQueue::cancel(const Status& st) { auto& future_block = _block_queue.front(); std::unique_lock l0(*(future_block->lock)); future_block->set_result(st, future_block->rows(), 0); - *_all_block_queues_bytes -= future_block->bytes(); - *_single_block_queue_bytes -= future_block->bytes(); + _all_block_queues_bytes->fetch_sub(future_block->bytes(), std::memory_order_relaxed); + _single_block_queue_bytes->fetch_sub(future_block->bytes(), std::memory_order_relaxed); future_block->cv->notify_all(); } _block_queue.pop_front(); diff --git a/be/src/vec/sink/group_commit_block_sink.cpp b/be/src/vec/sink/group_commit_block_sink.cpp index 0730cb2a53..266fdf72f0 100644 --- a/be/src/vec/sink/group_commit_block_sink.cpp +++ b/be/src/vec/sink/group_commit_block_sink.cpp @@ -47,6 +47,7 @@ Status GroupCommitBlockSink::init(const TDataSink& t_sink) { _db_id = table_sink.db_id; _table_id = table_sink.table_id; _base_schema_version = table_sink.base_schema_version; + _load_id = table_sink.load_id; return Status::OK(); } @@ -155,8 +156,8 @@ Status GroupCommitBlockSink::_add_block(RuntimeState* state, std::make_shared(); future_block->swap(*(output_block.get())); TUniqueId load_id; - load_id.__set_hi(load_id.hi); - load_id.__set_lo(load_id.lo); + load_id.__set_hi(_load_id.hi); + load_id.__set_lo(_load_id.lo); future_block->set_info(_base_schema_version, load_id); if (_load_block_queue == nullptr) { RETURN_IF_ERROR(state->exec_env()->group_commit_mgr()->get_first_block_load_queue( diff --git a/be/test/olap/wal_manager_test.cpp b/be/test/olap/wal_manager_test.cpp index ec387680a6..64588dc5d1 100644 --- a/be/test/olap/wal_manager_test.cpp +++ b/be/test/olap/wal_manager_test.cpp @@ -80,7 +80,8 @@ public: void createWal(const std::string& wal_path) { std::shared_ptr _all_wal_disk_bytes = std::make_shared(0); - auto wal_writer = WalWriter(wal_path, _all_wal_disk_bytes); + std::shared_ptr cv = std::make_shared(); + auto wal_writer = WalWriter(wal_path, _all_wal_disk_bytes, cv); static_cast(wal_writer.init()); static_cast(wal_writer.finalize()); } diff --git a/be/test/olap/wal_reader_writer_test.cpp b/be/test/olap/wal_reader_writer_test.cpp index 09460477e3..1d1102f350 100644 --- a/be/test/olap/wal_reader_writer_test.cpp +++ b/be/test/olap/wal_reader_writer_test.cpp @@ -92,7 +92,8 @@ TEST_F(WalReaderWriterTest, TestWriteAndRead1) { std::string file_name = _s_test_data_path + "/abcd123.txt"; std::shared_ptr _all_wal_disk_bytes = std::make_shared(0); - auto wal_writer = WalWriter(file_name, _all_wal_disk_bytes); + std::shared_ptr cv = std::make_shared(); + auto wal_writer = WalWriter(file_name, _all_wal_disk_bytes, cv); static_cast(wal_writer.init()); size_t file_len = 0; int64_t file_size = -1; diff --git a/regression-test/data/load_p0/stream_load/test_group_commit_and_wal_back_pressure.csv.gz b/regression-test/data/load_p0/stream_load/test_group_commit_and_wal_back_pressure.csv.gz new file mode 100644 index 0000000000000000000000000000000000000000..539fb01b418a20d2b92a9428b5a41fedcd51f07e GIT binary patch literal 372017 zcmb2|=HNKc5|hfnT#{N`qL*A;#_;C6ci$9N2Dgiq`j6$W9|*eg<$|~elT>n4vvI%? z3Eq7d9=y8SG}&v_`|_}YceeBUrkAgetNZlqrv3T-d-vAVNx= zCKf#M5WRX%E!XiH!@E4G*1)6rVl-cj=8MsMG1`uT z^m9hbLwc2mkDt|8y(_fG%zCH|Hv!#8Tta9AiYp$h*)JN4|=5|DAjfM|;ia`yn z(eN1!A4F~$O&hR!b2ML!=8MsMf!X>P%@?EjVl-cj<_qK=)o7VOuQH);_UD&7j{n6> zey9yMhb4AsG;Ltkz@zzMG+&J7i_v^B+Kz&ZG>n#qbSe*z6c*dbe%~RFnf0J8HYn90R>;Z; zW#gt|S~%7#;_WE4yrVY%d!)U%3 z%@?EjVl-bM4>63E38Q5K?aKs<oXyb#bu@g? zLj*OnM#E<`d~g*pqiJI_ZH%T3%x2SQz8K9HqxoVqUm!QfN6UoKGJ*DG!lNCQ^ZTmr z^y9J#*0y))9=-e$e;=H_m&%M@Ac|RAz|t!8g4K@yH|8fRaV%u$YP2<~tL6Q7b}m<% zLcVBD6aK_LdUEw>_~1%hqiJI_ZH%T3hVeqxoVqUySApt!895^ zqv3;lFlaP?;i^(WIcYTSjOLxuyaUP!sJUb`UySC9(R?wQFOcUxM$3fJGJ*DG!sz7X z=n=P=;8=SK6zXucTD7o+(Cxj!{pCXAK|Bd<&toxB{Kyu_>rka`iLgKo%` z1X932%ZkzP84aJ|F(}z#|GU_SMN!1TaD)4?_3IDr-x++6=gHb8UvNs7P(PH7zk$Hi zII0G-d;kX?G>eRe&uI7{PiT#%4M@{tG+&J7i_v_6S-TG3d~vSy|GD0)|5~x zLr~uX%EV1Q5>(pLhFcn(wK@t%)!+;4(eQ!PM7WaCX!wkV&uH4f?B|Z=i_v^BnlDE4 z1-Ov~ZD5R+38Q7gXqhltCJbDe@F?c{`4AsG;NHQnWK~6kYR$+ z@^Gk?hoh61qm!waOqYT7$H}XNC5*aCr86)G<-%U7)JBO=mZ0#={#B< zj+Td`<>6>~I9eW#mWLARqm!4Tlb4ufA++n{pdc_h;0P%t5FvsZTBG4J8a|_G19O0C zG+&J7i_v^BnlF(1+M{K{XqhltCXA#qVRZ6xbn+52y(9IYMhByjD@de(ftHh_;WHXO zql5LM`C@di9@2CkEe}V_!_o3^v^*Rw4@b+x(aFov0X0Z2fOeKJsL}8l4Ihj|0j)eo z)5d7p7)=|PgL9+#Vl-cj=8MsMfjrJIS|*H^2_vse7@fQvoxH@X2cX>t5H&g&ja*59 zLIkyt7!9A%@EIMfAI%q|gY}T+V-pV`);XB_Tn-C#kf zQR!+)b?!f2T=S|$v1nNWOp|L2Ew$oUer=7#d3j&KV>nYgKnmeIjG z%=kov*XZC2w(*A1@BxkOjHZpzw1LP8sHMng+89k6qiF+FN{;3WSW9iROc*T_M$3fJ zG6A_*9DHTM=;USR=!7e-VhU85j}8JNXJAl>poZ3H_>6|n=pgB6zJN9RK_M|(CXAK| zqh$gpCydsWuny2@nJ`)=jJz^obn>ji_!9Mv^*Rw z4@XjYI68SbI(dni-Vv?j(Lpyz&4>sQ)X*9YpV9CcO&gd4RHOM~G+&J7i_v_6+@BgP z6GqDffe!oM#Xc;GA`XTd+>fnae{lcK;DbC*);9TKFD*I>mG-nDCnD757nnDy2D6+2 z2Mjd14ZiSs6!ZOk%?^27Lk+N`&>{A*i3NFV4kiKh7=)5jJd_P#VU-38j;t%1QS-~F z-;px|W=a_ipV9EaRmhB{jnT9*nl?tuOmHItTCI+j38Q7gXqhltCXAK|kZ$MTD-X|= z{y*3LTM#peLQ8Ncl_99_0cGN*9tn;PK;R3=!8f=t8egOFg3UXD)SVdgDF zP8%I~8mR+MqxoVqUyM#{qv!t7@(?{lP;<#>_>6`RqV+JEHee;_XucTD7b7oUj80yT zPF}*sEU>12>J4O!POe}LV5s0|IFAk@j}9WEHb+6c(R?wQFGlkPC?`NG3Me(2FGlml zXucTD7qHR{om0tVYEybd1b=rIthxam>P|*(X=s|Hn10G zqvaTKV;59Xpr(}3@EHxC(aEsUd@;Bu!$#|O)KVLjH~7l%(aFov$xF;K6_IL32i-;o z-4Lk(HKmNEjnT9*nl>F*jOL5cx)L(P zGx*9wi|c=_<=?quCQ;PJ?;_Vd3XBjxph_U~I@~)xNFfVgEBuaij;g^`B!a?jG<=W? z3s8ulhSq5KjD`=cf@m~tjHZpzv@u#{BCq`%EfYq|gwZl#v`iQ+6Ck}Y`j&^qclUpO zSce?;sI7S@FX{-l5R{3Vs%YU@uZUY3T+9u=4)&;`C>F*fE#_#Ds!|<7%dY<%Y>0sCX7yAj!vdxrgvy{0HWwS*fBb> zjjQ+_jf>H=F`7282f=6=h1`e-l@zEcWi)(7!)J5?Z!}*ZPa=($38Q5K?aPGG$;;8n zOU!Z?(K;C&a2y?QM5G4Plrow&M$^V<+Q97bj^>Ned@-6YM)L)7|7EmH7%dYBlVz02f~mmNTh&)mXo95Ga5dFW1w(!f)ko0 zu~MV?bTpri=F`#oYjlDW(#0Ju4@b*G+Lwo;lb55DmzZ@9qJcI#;0P&=5g~#aTBG4Z zUikdjY=7KEz==_WxjN;~FR4GXtwql`+|#h*f z66&LBa3yMlEqrd86ge zX!$c*{*0Erqh&9o`$}ASI68SbI^l|0W*}>=a70(#0^r64t(RPEI;b?g{S{{y;heN+S9G$!zoxH@%$B5Sd=%5m$ z#z%w*YG{py&uI9JrVY#is?mHgnlDE4#b~}j?kA3x38Q7gXqhmQ%7oF$%hAb8%=C`b zgBl%3MXn%`0tQ-6j)u=@_>2xTkLHV^KhQk700mm4W2HvR`O$KIw45I;=SRzV$Pfu} z<>Bb$<>=%kW|@JgU`GcWA-M?=BB-G?8a|`pGnzIq2dGB#1=aIKhyCwj9~MOs2g42S z$JVbuxPNExL7pdTn|zU5Jn*zQdX+jnC`Q8vEiI0wjnTA$S!6>RHlt}{G;Pp6Z4}?# z|M_7ZuHhPZ3`8B_7Q$M3BMVlvaI9BE7JzF<2##J^56Rt#5J4R*91Wk*@EJ`TXw4)6 z4wlh;F`6$%^TlYsKyFfumI+L`xF-Q@^W4AsG;JUime6+NXucTD7o+)N zFyxCLo9&Og2skl{FjuGi`6cydwzcRPhkIH#Sa3xTI0qAVjr^f({2c9!>Iam7sT>Ec9LK!GSP~QWZR&Y~~1eNx*;g$vm@#xjucsqFlqv10eKDe^b zXxbP}8>49hv%xr;FGlmlXucTD7vKgav?V)QCXAK|qh-QCmkEzzzQ3>8A&=<{XfB0P z9bzAwSdcfJAdgMJV^L0Vbf5Q9XhgdrD# zpb$X~tBb$<>=%k zW}Sm*nT-xAK}usph@ghnX!wkV&uH2h9f(J6tBjTjqh-QqnJ`)=jFt(I{>x~2I8w{Q z(aFov$xF<70I3HxI>4AsG;LrlI!6cVA*~BUErJ?Mqv10e zKBH*^b5MUYUySC9;gv5&Coe}QFEPtGq}IUbfb8giEUsR}XxbP}8>49hsj!5$4My|D zXucTD7o+)NbP@~FT^ubBhfaA||Lyq<4kwL$ zhzA9gIQi8!kxH*gV8V?4MXhtdNln&Y9d6ALQN^7;WHXO zqiF+k#CbGdjOL5cd@-6YkozyAWx{BgfV)f>oxJQEop8mKJV1@^(ZL|(!W0xDsG&6) zKBM6?I(aafFOUavM$3fJGGVk#7%dY<%LGW*YqUHZspaA52 zYINWUxspH%7-;)pG<-(G2U3ZSrj60TdPp;3v`iQ+6GqE~(eiM#JRB_#M|NE>I(a!d zd5KvMK)X%?94w;)PmmH45hAFeH5xvn;WL^xFbAkc^TlYs7|j=>`2u;IVYEybEfYq| zgppJxj80yTPF`ZBccdQF$R2zd-Ia^0Ishe+(X@fwcmjn8YG{py&uI9JPGXJbi_u9e zNVjUVJRB_#N6W*JR345_UXD&)Vy1UQD|vLl5mGZELIgFmM#E<`d`8m-<^a`bz8K9H zqxoVqUm*9VM$3fJGGVk#7)fQq=;Y<-=LCG7VX=5~PAWuXgXOGdeF`70;(*|buX*6Gq=8MsM zF`6$>>&kPb|IfAm7Q_q|yhnmcd)lywBj>x0!cjH&0(&%kP#XFe zscAHPM#BdqQ9v`)XxbP}8>49hb5Yu8z8K9HqxoVqUyQcHA^qRc@{nHT;iH)E?`w9* zV`e>Qiw#P3h<$8gfy50|1REu%cqkj2IBJJ>R1L189u#(?gMi3|ASgsoLu)jAM#BeJ zK{T2+M$^V<+88Y}N81~a=HqC2I9eW#mWQL|;lP!Lqm!4Tlc|{b57DC=9q577)QAv4 z4Xx4e84aJ&w1GJ~Ihrp<^TlYs7|j>R{kzdJVYEybEfYpknJ_wesXjWHiYv>5I#Qzp zpvV;@C`3?0YczaD!)J7Wcr;%O+yL=t{e@a0qw+?}=h5=+%0$5l0hqGU8KkSi@vh@ghnX!wkV&*((OXucSo zsDLzQN6W*}@^G{~94!x#gMhwe!sz7X=wLKv{zJ6dM+crDH8mncP(y1pd`81(G;LrG zP>tq`(R?wQFGlkPa{q3$Oc*T_M$3eeR3?m0UXD&)Vy1Vb9u$2CJ4PojaTULyC>f0l zKRK0h|wA9oROViaMnPWkgo>d$Oz(K8PBv~IBADJ@2?QpZe_$SHI* zd`80uc_LvnZ9oc>(R?wQFGlkP=Fr3N%@-Ee|60qxbI0^DY6E7G>mCJ0NEv}Dfz0c0 z@Ax2vEPyRD9qSxbgDZ=H!frHtkSk+Qh@ghnX!wkV53Yh}G;NHgjnT9*T4o}z6C5oQ zM$3fJGGVk#7%dYZy)pWhhsAgIe|}hp9QLTKc_=UH2)7WFiJPiu;aIPTTN+%<4d0IX zsNXTmFhpV)jdw`yMuZ4zXpM%?X!wk#4a`x_(R?wQFGlmlXube99-&p{XqhltCXAK| zL!nFKusy5;WHXO zqZ4?e`2u+oX|zljEfZ*8CX7yAj!rsamb-}7$>>1g=s+PNHK3-H(X=s|Hb&D1W{-C? zUySC9(R?wQFOd5$qh-QqnK1Iogwe^%(aB59dH|^xF*?YFTuC4W4798m4WH5Qq5UA> z=)^5FyJ4k9^WbP69L9S{~BAJRF_8 z9G$$xtaG46kN^kE=zt@nG)9C7YG{py&uI9JrVY#is?mHgnlDCHzWD#``3w#xjfSLz z{+{^w=KA90O_C>~S!XKX~W-`9aS?LKDZkDqiJI_Z49%t@hIl|`49h zv+5nq7o+)NG+&J73*<)oXqhltCeXf2I9K}rT>Ec9%&dpnsLv48_dpfF!h0k*I(UaK zAV&u`F(*He^Z01^jD`=EVg;N;M$^V<+89k6n4`d>`C>F*jOL5cd;zW`p^eYcGGVk# T7#wB7qnP^3nKmqKN$P9>^~RI! literal 0 HcmV?d00001 diff --git a/regression-test/data/load_p0/stream_load/test_group_commit_and_wal_back_pressure.out b/regression-test/data/load_p0/stream_load/test_group_commit_and_wal_back_pressure.out new file mode 100644 index 0000000000..4b064bec44 --- /dev/null +++ b/regression-test/data/load_p0/stream_load/test_group_commit_and_wal_back_pressure.out @@ -0,0 +1,10 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !1 -- +1 + +-- !2 -- +1 + +-- !3 -- +1 + diff --git a/regression-test/suites/load_p0/stream_load/test_group_commit_and_wal_back_pressure.groovy b/regression-test/suites/load_p0/stream_load/test_group_commit_and_wal_back_pressure.groovy new file mode 100644 index 0000000000..910589df11 --- /dev/null +++ b/regression-test/suites/load_p0/stream_load/test_group_commit_and_wal_back_pressure.groovy @@ -0,0 +1,134 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_group_commit_and_wal_back_pressure", "p2") { + + def tableName = "test_group_commit_and_wal_back_pressure" + sql """ DROP TABLE IF EXISTS ${tableName}1 """ + sql """ + CREATE TABLE ${tableName}1 ( + k bigint, + v string + ) + UNIQUE KEY(k) + DISTRIBUTED BY HASH (k) BUCKETS 32 + PROPERTIES( + "replication_num" = "1" + ); + """ + + sql """ DROP TABLE IF EXISTS ${tableName}2 """ + sql """ + CREATE TABLE ${tableName}2 ( + k bigint, + v string + ) + UNIQUE KEY(k) + DISTRIBUTED BY HASH (k) BUCKETS 32 + PROPERTIES( + "replication_num" = "1" + ); + """ + + sql """ DROP TABLE IF EXISTS ${tableName}3 """ + sql """ + CREATE TABLE ${tableName}3 ( + k bigint, + v string + ) + UNIQUE KEY(k) + DISTRIBUTED BY HASH (k) BUCKETS 32 + PROPERTIES( + "replication_num" = "1" + ); + """ + + def t1 = [] + for (int i = 0; i < 20; i++) { + t1.add(Thread.startDaemon { + streamLoad { + table "${tableName}1" + + set 'column_separator', ',' + set 'compress_type', 'GZ' + set 'format', 'csv' + set 'group_commit', 'true' + unset 'label' + + file 'test_group_commit_and_wal_back_pressure.csv.gz' + time 100000 + } + }) + } + + def t2 = [] + for (int i = 0; i < 20; i++) { + t2.add(Thread.startDaemon { + streamLoad { + table "${tableName}2" + + set 'column_separator', ',' + set 'compress_type', 'GZ' + set 'format', 'csv' + set 'group_commit', 'true' + unset 'label' + + file 'test_group_commit_and_wal_back_pressure.csv.gz' + time 100000 + } + }) + } + + def t3 = [] + for (int i = 0; i < 20; i++) { + t3.add(Thread.startDaemon { + streamLoad { + table "${tableName}3" + + set 'column_separator', ',' + set 'compress_type', 'GZ' + set 'format', 'csv' + set 'group_commit', 'true' + unset 'label' + + file 'test_group_commit_and_wal_back_pressure.csv.gz' + time 100000 + } + }) + } + + for (Thread th in t1) { + th.join() + } + + for (Thread th in t2) { + th.join() + } + + for (Thread th in t3) { + th.join() + } + + sql "sync" + + qt_1 """ select count(*) from ${tableName}1;""" + + qt_2 """ select count(*) from ${tableName}2;""" + + qt_3 """ select count(*) from ${tableName}3;""" + +} \ No newline at end of file