diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index f286c33d54..2722b5a8a1 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1109,6 +1109,7 @@ DEFINE_Int16(bitmap_serialize_version, "1"); DEFINE_String(group_commit_replay_wal_dir, "./wal"); DEFINE_Int32(group_commit_replay_wal_retry_num, "10"); DEFINE_Int32(group_commit_replay_wal_retry_interval_seconds, "5"); +DEFINE_Int32(group_commit_relay_wal_threads, "10"); // the count of thread to group commit insert DEFINE_Int32(group_commit_insert_threads, "10"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 4054b315aa..37020c56be 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1182,6 +1182,7 @@ DECLARE_Int16(bitmap_serialize_version); DECLARE_String(group_commit_replay_wal_dir); DECLARE_Int32(group_commit_replay_wal_retry_num); DECLARE_Int32(group_commit_replay_wal_retry_interval_seconds); +DECLARE_mInt32(group_commit_relay_wal_threads); // This config can be set to limit thread number in group commit insert thread pool. DECLARE_mInt32(group_commit_insert_threads); diff --git a/be/src/olap/wal_manager.cpp b/be/src/olap/wal_manager.cpp index abb7d8a324..f03075609e 100644 --- a/be/src/olap/wal_manager.cpp +++ b/be/src/olap/wal_manager.cpp @@ -42,6 +42,10 @@ WalManager::WalManager(ExecEnv* exec_env, const std::string& wal_dir_list) doris::vectorized::WalReader::string_split(wal_dir_list, ",", _wal_dirs); _all_wal_disk_bytes = std::make_shared(0); _cv = std::make_shared(); + static_cast(ThreadPoolBuilder("GroupCommitReplayWalThreadPool") + .set_min_threads(1) + .set_max_threads(config::group_commit_relay_wal_threads) + .build(&_thread_pool)); } WalManager::~WalManager() { @@ -56,6 +60,7 @@ void WalManager::stop() { if (_replay_thread) { _replay_thread->join(); } + _thread_pool->shutdown(); LOG(INFO) << "WalManager is stopped"; } } @@ -161,6 +166,10 @@ Status WalManager::add_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id, << std::to_string(wal_id) << "_" << label; { std::lock_guard wrlock(_wal_lock); + auto it = _wal_path_map.find(wal_id); + if (it != _wal_path_map.end()) { + return Status::InternalError("wal_id {} already in wal_path_map", wal_id); + } _wal_path_map.emplace(wal_id, ss.str()); } return Status::OK(); @@ -299,10 +308,12 @@ Status WalManager::replay() { } } for (const auto& table_id : replay_tables) { - auto st = _table_map[table_id]->replay_wals(); - if (!st.ok()) { - LOG(WARNING) << "Failed add replay wal on table " << table_id; - } + RETURN_IF_ERROR(_thread_pool->submit_func([table_id, this] { + auto st = this->_table_map[table_id]->replay_wals(); + if (!st.ok()) { + LOG(WARNING) << "Failed add replay wal on table " << table_id; + } + })); } } while (!_stop_background_threads_latch.wait_for( std::chrono::seconds(config::group_commit_replay_wal_retry_interval_seconds))); @@ -351,10 +362,13 @@ Status WalManager::delete_wal(int64_t wal_id) { if (_wal_id_to_writer_map.empty()) { CHECK_EQ(_all_wal_disk_bytes->load(std::memory_order_relaxed), 0); } - std::string wal_path = _wal_path_map[wal_id]; - RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(wal_path)); - LOG(INFO) << "delete file=" << wal_path; - _wal_path_map.erase(wal_id); + auto it = _wal_path_map.find(wal_id); + if (it != _wal_path_map.end()) { + std::string wal_path = it->second; + RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(wal_path)); + LOG(INFO) << "delete file=" << wal_path; + _wal_path_map.erase(wal_id); + } } return Status::OK(); } @@ -371,10 +385,13 @@ void WalManager::stop_relay_wal() { } void WalManager::add_wal_column_index(int64_t wal_id, std::vector& column_index) { + std::lock_guard wrlock(_wal_column_id_map_lock); _wal_column_id_map.emplace(wal_id, column_index); + LOG(INFO) << "add " << wal_id << " to wal_column_id_map"; } void WalManager::erase_wal_column_index(int64_t wal_id) { + std::lock_guard wrlock(_wal_column_id_map_lock); if (_wal_column_id_map.erase(wal_id)) { LOG(INFO) << "erase " << wal_id << " from wal_column_id_map"; } else { @@ -383,6 +400,7 @@ void WalManager::erase_wal_column_index(int64_t wal_id) { } Status WalManager::get_wal_column_index(int64_t wal_id, std::vector& column_index) { + std::lock_guard wrlock(_wal_column_id_map_lock); auto it = _wal_column_id_map.find(wal_id); if (it != _wal_column_id_map.end()) { column_index = it->second; diff --git a/be/src/olap/wal_manager.h b/be/src/olap/wal_manager.h index d0a547a8d6..fdeadbee1d 100644 --- a/be/src/olap/wal_manager.h +++ b/be/src/olap/wal_manager.h @@ -31,6 +31,7 @@ #include "runtime/exec_env.h" #include "runtime/stream_load/stream_load_context.h" #include "util/thread.h" +#include "util/threadpool.h" namespace doris { class WalManager { @@ -86,7 +87,9 @@ private: std::shared_ptr _all_wal_disk_bytes; std::unordered_map> _wal_status_queues; std::atomic _stop; + std::shared_mutex _wal_column_id_map_lock; std::unordered_map&> _wal_column_id_map; std::shared_ptr _cv; + std::unique_ptr _thread_pool; }; } // namespace doris \ No newline at end of file diff --git a/be/src/olap/wal_table.cpp b/be/src/olap/wal_table.cpp index 4bfcec502a..54d158e7a9 100644 --- a/be/src/olap/wal_table.cpp +++ b/be/src/olap/wal_table.cpp @@ -48,6 +48,8 @@ WalTable::~WalTable() {} std::string k_request_line; #endif +bool retry = false; + void WalTable::add_wals(std::vector wals) { std::lock_guard lock(_replay_wal_lock); for (const auto& wal : wals) { @@ -57,6 +59,7 @@ void WalTable::add_wals(std::vector wals) { } Status WalTable::replay_wals() { std::vector need_replay_wals; + std::vector need_erase_wals; { std::lock_guard lock(_replay_wal_lock); if (_replay_wal_map.empty()) { @@ -76,7 +79,7 @@ Status WalTable::replay_wals() { std::string rename_path = _get_tmp_path(wal); LOG(INFO) << "rename wal from " << wal << " to " << rename_path; std::rename(wal.c_str(), rename_path.c_str()); - _replay_wal_map.erase(wal); + need_erase_wals.push_back(wal); continue; } if (_need_replay(info)) { @@ -84,6 +87,13 @@ Status WalTable::replay_wals() { } } std::sort(need_replay_wals.begin(), need_replay_wals.end()); + for (const auto& wal : need_erase_wals) { + if (_replay_wal_map.erase(wal)) { + LOG(INFO) << "erase wal " << wal << " from _replay_wal_map"; + } else { + LOG(WARNING) << "fail to erase wal " << wal << " from _replay_wal_map"; + } + } } for (const auto& wal : need_replay_wals) { { @@ -216,7 +226,49 @@ Status WalTable::_get_wal_info(const std::string& wal, } void http_request_done(struct evhttp_request* req, void* arg) { - event_base_loopbreak((struct event_base*)arg); + std::stringstream out; + std::string status; + std::string msg; + std::string wal_id; + size_t len = 0; + if (req != nullptr) { + auto input = evhttp_request_get_input_buffer(req); + char* request_line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF); + while (request_line != nullptr) { + std::string s(request_line); + out << request_line; + request_line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF); + } + auto out_str = out.str(); + LOG(INFO) << "replay wal out_str:" << out_str; + rapidjson::Document doc; + if (!out_str.empty()) { + doc.Parse(out.str().c_str()); + status = std::string(doc["Status"].GetString()); + msg = std::string(doc["Message"].GetString()); + LOG(INFO) << "replay wal status:" << status << ",msg:" << msg; + if (status.find("Fail") != status.npos) { + if (msg.find("Label") != msg.npos && + msg.find("has already been used") != msg.npos) { + retry = false; + } else { + retry = true; + } + } else { + retry = false; + } + } else { + retry = true; + } + } else { + LOG(WARNING) << "req is null"; + } + + if (arg != nullptr) { + event_base_loopbreak((struct event_base*)arg); + } else { + LOG(WARNING) << "arg is null"; + } } Status WalTable::_send_request(int64_t wal_id, const std::string& wal, const std::string& label) { @@ -224,6 +276,7 @@ Status WalTable::_send_request(int64_t wal_id, const std::string& wal, const std struct event_base* base = nullptr; struct evhttp_connection* conn = nullptr; struct evhttp_request* req = nullptr; + retry = false; event_init(); base = event_base_new(); conn = evhttp_connection_new("127.0.0.1", doris::config::webserver_port); @@ -239,18 +292,20 @@ Status WalTable::_send_request(int64_t wal_id, const std::string& wal, const std std::vector index_vector; std::stringstream ss_name; std::stringstream ss_id; - int index = 0; + int index_raw = 0; for (auto column_id_str : column_id_element) { try { int64_t column_id = std::strtoll(column_id_str.c_str(), NULL, 10); auto it = _column_id_name_map.find(column_id); - if (it != _column_id_name_map.end()) { - ss_name << it->second << ","; + auto it2 = _column_id_index_map.find(column_id); + if (it != _column_id_name_map.end() && it2 != _column_id_index_map.end()) { + ss_name << "`" << it->second << "`,"; ss_id << "c" << std::to_string(_column_id_index_map[column_id]) << ","; - index_vector.emplace_back(index); + index_vector.emplace_back(index_raw); _column_id_name_map.erase(column_id); + _column_id_index_map.erase(column_id); } - index++; + index_raw++; } catch (const std::invalid_argument& e) { return Status::InvalidArgument("Invalid format, {}", e.what()); } @@ -273,44 +328,21 @@ Status WalTable::_send_request(int64_t wal_id, const std::string& wal, const std evhttp_connection_free(conn); event_base_free(base); -#endif - bool retry = false; - std::string status; - std::string msg; - std::stringstream out; - rapidjson::Document doc; -#ifndef BE_TEST - size_t len = 0; - auto input = evhttp_request_get_input_buffer(req); - char* request_line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF); - while (request_line != nullptr) { - std::string s(request_line); - out << request_line; - request_line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF); - } #else + std::stringstream out; out << k_request_line; -#endif auto out_str = out.str(); - if (!out_str.empty()) { - doc.Parse(out.str().c_str()); - status = std::string(doc["Status"].GetString()); - msg = std::string(doc["Message"].GetString()); - LOG(INFO) << "replay wal " << wal_id << " status:" << status << ",msg:" << msg; - if (status.find("Fail") != status.npos) { - if (msg.find("Label") != msg.npos && msg.find("has already been used") != msg.npos) { - retry = false; - } else { - retry = true; - } - } else { - retry = false; - } - } else { + rapidjson::Document doc; + doc.Parse(out_str.c_str()); + auto status = std::string(doc["Status"].GetString()); + if (status.find("Fail") != status.npos) { retry = true; + } else { + retry = false; } +#endif if (retry) { - LOG(INFO) << "fail to replay wal =" << wal << ",status:" << status << ",msg:" << msg; + LOG(INFO) << "fail to replay wal =" << wal; std::lock_guard lock(_replay_wal_lock); auto it = _replay_wal_map.find(wal); if (it != _replay_wal_map.end()) { @@ -320,7 +352,7 @@ Status WalTable::_send_request(int64_t wal_id, const std::string& wal, const std _replay_wal_map.emplace(wal, replay_wal_info {0, UnixMillis(), false}); } } else { - LOG(INFO) << "success to replay wal =" << wal << ",status:" << status << ",msg:" << msg; + LOG(INFO) << "success to replay wal =" << wal; RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(wal_id)); RETURN_IF_ERROR(_exec_env->wal_mgr()->erase_wal_status_queue(_table_id, wal_id)); std::lock_guard lock(_replay_wal_lock); @@ -384,14 +416,17 @@ Status WalTable::_get_column_info(int64_t db_id, int64_t tb_id) { std::string columns_str = result.column_info; std::vector column_element; doris::vectorized::WalReader::string_split(columns_str, ",", column_element); - int64_t index = 1; + int64_t column_index = 1; + _column_id_name_map.clear(); + _column_id_index_map.clear(); for (auto column : column_element) { auto pos = column.find(":"); try { auto column_name = column.substr(0, pos); int64_t column_id = std::strtoll(column.substr(pos + 1).c_str(), NULL, 10); _column_id_name_map.emplace(column_id, column_name); - _column_id_index_map.emplace(column_id, index++); + _column_id_index_map.emplace(column_id, column_index); + column_index++; } catch (const std::invalid_argument& e) { return Status::InvalidArgument("Invalid format, {}", e.what()); } diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index b97d5de8a5..16c841cc0f 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -341,11 +341,15 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ std::vector {wal_path})); _exec_env->wal_mgr()->add_wal_status_queue(table_id, txn_id, WalManager::WAL_STATUS::REPLAY); + } else { + RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(txn_id)); + RETURN_IF_ERROR(_exec_env->wal_mgr()->erase_wal_status_queue(table_id, txn_id)); } return st; } // TODO handle execute and commit error - if (!prepare_failed && !result_status.ok()) { + if (!prepare_failed && !result_status.ok() && + !(result_status.is())) { RETURN_IF_ERROR(_exec_env->wal_mgr()->add_wal_path(_db_id, table_id, txn_id, label)); std::string wal_path; RETURN_IF_ERROR(_exec_env->wal_mgr()->get_wal_path(txn_id, wal_path)); @@ -458,7 +462,7 @@ Status LoadBlockQueue::create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id, const std::string& import_label, WalManager* wal_manager, std::vector& slot_desc, int be_exe_version) { _v_wal_writer = std::make_shared( - db_id, tb_id, txn_id, label, wal_manager, slot_desc, be_exe_version); + db_id, tb_id, wal_id, import_label, wal_manager, slot_desc, be_exe_version); return _v_wal_writer->init(); } diff --git a/be/src/vec/exec/format/wal/wal_reader.cpp b/be/src/vec/exec/format/wal/wal_reader.cpp index 035ce2cd82..f0e5e29ca8 100644 --- a/be/src/vec/exec/format/wal/wal_reader.cpp +++ b/be/src/vec/exec/format/wal/wal_reader.cpp @@ -58,7 +58,7 @@ Status WalReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { for (auto column : columns) { auto pos = _column_index[index]; vectorized::ColumnPtr column_ptr = src_block.get_by_position(pos).column; - if (column.column->is_nullable()) { + if (column_ptr != nullptr && column.column->is_nullable()) { column_ptr = make_nullable(column_ptr); } dst_block.insert(index, vectorized::ColumnWithTypeAndName(std::move(column_ptr), @@ -67,7 +67,7 @@ Status WalReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { } block->swap(dst_block); *read_rows = block->rows(); - VLOG_DEBUG << "read block rows:" << *read_rows; + LOG(INFO) << "read block rows:" << *read_rows; return Status::OK(); } diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index a35555554b..6002f4eea6 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -900,7 +900,7 @@ Status VFileScanner::_get_next_reader() { _name_to_col_type.clear(); _missing_cols.clear(); - static_cast(_cur_reader->get_columns(&_name_to_col_type, &_missing_cols)); + RETURN_IF_ERROR(_cur_reader->get_columns(&_name_to_col_type, &_missing_cols)); _cur_reader->set_push_down_agg_type(_get_push_down_agg_type()); RETURN_IF_ERROR(_generate_fill_columns()); if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) { diff --git a/be/src/vec/sink/group_commit_block_sink.cpp b/be/src/vec/sink/group_commit_block_sink.cpp index bb5c5c70d0..6b78ad74ee 100644 --- a/be/src/vec/sink/group_commit_block_sink.cpp +++ b/be/src/vec/sink/group_commit_block_sink.cpp @@ -30,7 +30,7 @@ namespace vectorized { GroupCommitBlockSink::GroupCommitBlockSink(ObjectPool* pool, const RowDescriptor& row_desc, const std::vector& texprs, Status* status) - : DataSink(row_desc) { + : DataSink(row_desc), _filter_bitmap(1024) { // From the thrift expressions create the real exprs. *status = vectorized::VExpr::create_expr_trees(texprs, _output_vexpr_ctxs); _name = "GroupCommitBlockSink"; @@ -50,6 +50,8 @@ Status GroupCommitBlockSink::init(const TDataSink& t_sink) { _group_commit_mode = table_sink.group_commit_mode; _load_id = table_sink.load_id; _max_filter_ratio = table_sink.max_filter_ratio; + _vpartition = new doris::VOlapTablePartitionParam(_schema, table_sink.partition); + RETURN_IF_ERROR(_vpartition->init()); return Status::OK(); } @@ -139,13 +141,35 @@ Status GroupCommitBlockSink::send(RuntimeState* state, vectorized::Block* input_ bool has_filtered_rows = false; RETURN_IF_ERROR(_block_convertor->validate_and_convert_block( state, input_block, block, _output_vexpr_ctxs, rows, has_filtered_rows)); - if (_block_convertor->num_filtered_rows() > 0) { + _has_filtered_rows = false; + if (!_vpartition->is_auto_partition()) { + //reuse vars for find_partition + _partitions.assign(rows, nullptr); + _filter_bitmap.Reset(rows); + + for (int index = 0; index < rows; index++) { + _vpartition->find_partition(block.get(), index, _partitions[index]); + } + for (int row_index = 0; row_index < rows; row_index++) { + if (_partitions[row_index] == nullptr) [[unlikely]] { + _filter_bitmap.Set(row_index, true); + LOG(WARNING) << "no partition for this tuple. tuple=" + << block->dump_data(row_index, 1); + } + _has_filtered_rows = true; + } + } + + if (_block_convertor->num_filtered_rows() > 0 || _has_filtered_rows) { auto cloneBlock = block->clone_without_columns(); auto res_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock); for (int i = 0; i < rows; ++i) { if (_block_convertor->filter_map()[i]) { continue; } + if (_filter_bitmap.Get(i)) { + continue; + } res_block.add_row(block.get(), i); } block->swap(res_block.to_block()); diff --git a/be/src/vec/sink/group_commit_block_sink.h b/be/src/vec/sink/group_commit_block_sink.h index 2ae37be368..9adb7f38bc 100644 --- a/be/src/vec/sink/group_commit_block_sink.h +++ b/be/src/vec/sink/group_commit_block_sink.h @@ -70,6 +70,11 @@ private: std::vector> _blocks; bool _is_block_appended = false; double _max_filter_ratio = 0.0; + VOlapTablePartitionParam* _vpartition = nullptr; + // reuse for find_tablet. + std::vector _partitions; + Bitmap _filter_bitmap; + bool _has_filtered_rows = false; }; } // namespace vectorized diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 4ac1abf7f1..4f88d22c83 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -240,6 +240,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; @@ -3296,15 +3297,17 @@ public class FrontendServiceImpl implements FrontendService.Iface { return result; } - Table table = db.getTable(tableId).get(); - if (table == null) { + Table table; + try { + table = db.getTable(tableId).get(); + } catch (NoSuchElementException e) { errorStatus.setErrorMsgs( (Lists.newArrayList(String.format("dbId=%d tableId=%d is not exists", dbId, tableId)))); result.setStatus(errorStatus); return result; } StringBuilder sb = new StringBuilder(); - for (Column column : table.getFullSchema()) { + for (Column column : table.getBaseSchema(true)) { sb.append(column.getName() + ":" + column.getUniqueId() + ","); } String columnInfo = sb.toString(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java index 72a0f061c0..5fe89a07f1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java @@ -25,7 +25,6 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.HdfsResource; import org.apache.doris.catalog.MapType; -import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.StructField; @@ -330,13 +329,9 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio if (this.fileFormatType == TFileFormatType.FORMAT_WAL) { List fileColumns = new ArrayList<>(); Table table = Env.getCurrentInternalCatalog().getTableByTableId(tableId); - List tableColumns = table.getBaseSchema(false); + List tableColumns = table.getBaseSchema(true); for (int i = 1; i <= tableColumns.size(); i++) { - fileColumns.add(new Column("c" + i, tableColumns.get(i - 1).getDataType(), true)); - } - Column deleteSignColumn = ((OlapTable) table).getDeleteSignColumn(); - if (deleteSignColumn != null) { - fileColumns.add(new Column("c" + (tableColumns.size() + 1), deleteSignColumn.getDataType(), true)); + fileColumns.add(new Column("c" + i, tableColumns.get(i - 1).getType(), true)); } return fileColumns; } diff --git a/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy b/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy index 6909a919c6..ed3b1fc832 100644 --- a/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy +++ b/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy @@ -59,8 +59,8 @@ suite("test_group_commit_http_stream") { assertTrue(json.GroupCommit) assertTrue(json.Label.startsWith("group_commit_")) assertEquals(total_rows, json.NumberTotalRows) - //assertEquals(loaded_rows, json.NumberLoadedRows) - //assertEquals(filtered_rows, json.NumberFilteredRows) + assertEquals(loaded_rows, json.NumberLoadedRows) + assertEquals(filtered_rows, json.NumberFilteredRows) assertEquals(unselected_rows, json.NumberUnselectedRows) if (filtered_rows > 0) { assertFalse(json.ErrorURL.isEmpty()) diff --git a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy index b60b6dc555..e12a1f2f01 100644 --- a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy +++ b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy @@ -58,8 +58,8 @@ suite("test_group_commit_stream_load") { assertTrue(json.GroupCommit) assertTrue(json.Label.startsWith("group_commit_")) assertEquals(total_rows, json.NumberTotalRows) - //assertEquals(loaded_rows, json.NumberLoadedRows) - //assertEquals(filtered_rows, json.NumberFilteredRows) + assertEquals(loaded_rows, json.NumberLoadedRows) + assertEquals(filtered_rows, json.NumberFilteredRows) assertEquals(unselected_rows, json.NumberUnselectedRows) if (filtered_rows > 0) { assertFalse(json.ErrorURL.isEmpty())