(improv)[group commit] refactor some group commit code (#29180)

This commit is contained in:
meiyi
2023-12-29 00:26:10 +08:00
committed by GitHub
parent 9a277a6f11
commit 9be0f04506
15 changed files with 192 additions and 150 deletions

View File

@ -1103,16 +1103,20 @@ DEFINE_Int32(grace_shutdown_wait_seconds, "120");
DEFINE_Int16(bitmap_serialize_version, "1");
// group commit insert config
// group commit config
DEFINE_String(group_commit_wal_path, "");
DEFINE_Int32(group_commit_replay_wal_retry_num, "10");
DEFINE_Int32(group_commit_replay_wal_retry_interval_seconds, "5");
DEFINE_Int32(group_commit_relay_wal_threads, "10");
// the count of thread to group commit insert
// This config can be set to limit thread number in group commit request fragment thread pool.
DEFINE_Int32(group_commit_insert_threads, "10");
DEFINE_Int32(group_commit_memory_rows_for_max_filter_ratio, "10000");
DEFINE_Bool(wait_internal_group_commit_finish, "false");
// Max size(bytes) of group commit queues, used for mem back pressure, defult 64M.
DEFINE_Int32(group_commit_max_queue_size, "67108864");
// Max size(bytes) or percentage(%) of wal disk usage, used for disk space back pressure, default 10% of the disk available space.
// group_commit_wal_max_disk_limit=1024 or group_commit_wal_max_disk_limit=10% can be automatically identified.
DEFINE_String(group_commit_wal_max_disk_limit, "10%");
DEFINE_mInt32(scan_thread_nice_value, "0");
DEFINE_mInt32(tablet_schema_cache_recycle_interval, "3600");
@ -1130,13 +1134,6 @@ DEFINE_Bool(ignore_always_true_predicate_for_segment, "true");
// Dir of default timezone files
DEFINE_String(default_tzfiles_path, "${DORIS_HOME}/zoneinfo");
// Max size(bytes) of group commit queues, used for mem back pressure, defult 64M.
DEFINE_Int32(group_commit_max_queue_size, "67108864");
// Max size(bytes) or percentage(%) of wal disk usage, used for disk space back pressure, default 10% of the disk available space.
// group_commit_wal_max_disk_limit=1024 or group_commit_wal_max_disk_limit=10% can be automatically identified.
DEFINE_String(group_commit_wal_max_disk_limit, "10%");
// Ingest binlog work pool size, -1 is disable, 0 is hardware concurrency
DEFINE_Int32(ingest_binlog_work_pool_size, "-1");

View File

@ -1168,16 +1168,20 @@ DECLARE_Int32(grace_shutdown_wait_seconds);
// BitmapValue serialize version.
DECLARE_Int16(bitmap_serialize_version);
// group commit insert config
// group commit config
DECLARE_String(group_commit_wal_path);
DECLARE_Int32(group_commit_replay_wal_retry_num);
DECLARE_Int32(group_commit_replay_wal_retry_interval_seconds);
DECLARE_mInt32(group_commit_relay_wal_threads);
// This config can be set to limit thread number in group commit insert thread pool.
// This config can be set to limit thread number in group commit request fragment thread pool.
DECLARE_mInt32(group_commit_insert_threads);
DECLARE_mInt32(group_commit_memory_rows_for_max_filter_ratio);
DECLARE_Bool(wait_internal_group_commit_finish);
// Max size(bytes) of group commit queues, used for mem back pressure.
DECLARE_Int32(group_commit_max_queue_size);
// Max size(bytes) or percentage(%) of wal disk usage, used for disk space back pressure, default 10% of the disk available space.
// group_commit_wal_max_disk_limit=1024 or group_commit_wal_max_disk_limit=10% can be automatically identified.
DECLARE_mString(group_commit_wal_max_disk_limit);
// The configuration item is used to lower the priority of the scanner thread,
// typically employed to ensure CPU scheduling for write operations.
@ -1203,13 +1207,6 @@ DECLARE_Bool(ignore_always_true_predicate_for_segment);
// Dir of default timezone files
DECLARE_String(default_tzfiles_path);
// Max size(bytes) of group commit queues, used for mem back pressure.
DECLARE_Int32(group_commit_max_queue_size);
// Max size(bytes) or percentage(%) of wal disk usage, used for disk space back pressure, default 10% of the disk available space.
// group_commit_wal_max_disk_limit=1024 or group_commit_wal_max_disk_limit=10% can be automatically identified.
DECLARE_mString(group_commit_wal_max_disk_limit);
// Ingest binlog work pool size
DECLARE_Int32(ingest_binlog_work_pool_size);

View File

@ -167,45 +167,8 @@ int HttpStreamAction::on_header(HttpRequest* req) {
ctx->load_type = TLoadType::MANUL_LOAD;
ctx->load_src_type = TLoadSourceType::RAW;
Status st = Status::OK();
std::string group_commit_mode = req->header(HTTP_GROUP_COMMIT);
if (!group_commit_mode.empty() && !iequal(group_commit_mode, "sync_mode") &&
!iequal(group_commit_mode, "async_mode") && !iequal(group_commit_mode, "off_mode")) {
st = Status::InternalError("group_commit can only be [async_mode, sync_mode, off_mode]");
} else if (group_commit_mode.empty() || iequal(group_commit_mode, "off_mode")) {
// off_mode and empty
group_commit_mode = "off_mode";
ctx->group_commit = false;
} else {
// sync_mode and async_mode
ctx->group_commit = true;
}
ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true";
auto temp_partitions = !req->header(HTTP_TEMP_PARTITIONS).empty();
auto partitions = !req->header(HTTP_PARTITIONS).empty();
if (!temp_partitions && !partitions && !ctx->two_phase_commit &&
(!group_commit_mode.empty() || config::wait_internal_group_commit_finish)) {
if (iequal(group_commit_mode, "async_mode") || config::wait_internal_group_commit_finish) {
ctx->group_commit = true;
group_commit_mode = load_size_smaller_than_wal_limit(req) ? "async_mode" : "sync_mode";
if (iequal(group_commit_mode, "sync_mode")) {
size_t max_available_size =
ExecEnv::GetInstance()->wal_mgr()->get_max_available_size();
LOG(INFO) << "When enable group commit, the data size can't be too large. The data "
"size "
"for this http load("
<< (req->header(HttpHeaders::CONTENT_LENGTH).empty()
? 0
: std::stol(req->header(HttpHeaders::CONTENT_LENGTH)))
<< " Bytes) exceeds the WAL (Write-Ahead Log) limit ("
<< max_available_size
<< " Bytes). So we set this load to \"group commit\"=sync_mode "
"automatically.";
st = Status::Error<EXCEEDED_LIMIT>("Http load size too large.");
}
}
}
Status st = _handle_group_commit(req, ctx);
LOG(INFO) << "new income streaming load request." << ctx->brief()
<< " sql : " << req->header(HTTP_SQL) << ", group_commit=" << ctx->group_commit;
@ -338,6 +301,7 @@ Status HttpStreamAction::_process_put(HttpRequest* http_req,
if (!http_req->header(HTTP_GROUP_COMMIT).empty()) {
request.__set_group_commit_mode(http_req->header(HTTP_GROUP_COMMIT));
} else {
// used for wait_internal_group_commit_finish
request.__set_group_commit_mode("sync_mode");
}
}
@ -366,10 +330,9 @@ Status HttpStreamAction::_process_put(HttpRequest* http_req,
ctx->txn_id = ctx->put_result.params.txn_conf.txn_id;
ctx->label = ctx->put_result.params.import_label;
ctx->put_result.params.__set_wal_id(ctx->wal_id);
if (http_req->header(HTTP_GROUP_COMMIT) == "async mode") {
if (http_req->header(HTTP_GROUP_COMMIT) == "async_mode") {
if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
size_t content_length = 0;
content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
size_t content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
if (ctx->format == TFileFormatType::FORMAT_CSV_GZ ||
ctx->format == TFileFormatType::FORMAT_CSV_LZO ||
ctx->format == TFileFormatType::FORMAT_CSV_BZ2 ||
@ -403,4 +366,50 @@ void HttpStreamAction::_save_stream_load_record(std::shared_ptr<StreamLoadContex
}
}
Status HttpStreamAction::_handle_group_commit(HttpRequest* req,
std::shared_ptr<StreamLoadContext> ctx) {
std::string group_commit_mode = req->header(HTTP_GROUP_COMMIT);
if (!group_commit_mode.empty() && !iequal(group_commit_mode, "sync_mode") &&
!iequal(group_commit_mode, "async_mode") && !iequal(group_commit_mode, "off_mode")) {
return Status::InternalError("group_commit can only be [async_mode, sync_mode, off_mode]");
}
if (config::wait_internal_group_commit_finish) {
group_commit_mode = "sync_mode";
}
if (group_commit_mode.empty() || iequal(group_commit_mode, "off_mode")) {
// off_mode and empty
ctx->group_commit = false;
return Status::OK();
}
auto partial_columns = !req->header(HTTP_PARTIAL_COLUMNS).empty() &&
iequal(req->header(HTTP_PARTIAL_COLUMNS), "true");
auto temp_partitions = !req->header(HTTP_TEMP_PARTITIONS).empty();
auto partitions = !req->header(HTTP_PARTITIONS).empty();
if (!partial_columns && !partitions && !temp_partitions && !ctx->two_phase_commit) {
if (!config::wait_internal_group_commit_finish && !ctx->label.empty()) {
return Status::InternalError("label and group_commit can't be set at the same time");
}
ctx->group_commit = true;
if (iequal(group_commit_mode, "async_mode")) {
group_commit_mode = load_size_smaller_than_wal_limit(req) ? "async_mode" : "sync_mode";
if (iequal(group_commit_mode, "sync_mode")) {
size_t max_available_size =
ExecEnv::GetInstance()->wal_mgr()->get_max_available_size();
LOG(INFO) << "When enable group commit, the data size can't be too large or "
"unknown. The data size for this stream load("
<< (req->header(HttpHeaders::CONTENT_LENGTH).empty()
? 0
: req->header(HttpHeaders::CONTENT_LENGTH))
<< " Bytes) exceeds the WAL (Write-Ahead Log) limit ("
<< max_available_size
<< " Bytes). So we set this load to \"group commit\"=sync_mode\" "
"automatically.";
return Status::Error<EXCEEDED_LIMIT>("Http load size too large.");
}
}
}
return Status::OK();
}
} // namespace doris

View File

@ -47,12 +47,9 @@ public:
private:
Status _on_header(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx);
Status _handle(HttpRequest* req, std::shared_ptr<StreamLoadContext> ctx);
Status _data_saved_path(HttpRequest* req, std::string* file_path);
Status _process_put(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx);
void _save_stream_load_record(std::shared_ptr<StreamLoadContext> ctx, const std::string& str);
void _parse_format(const std::string& format_str, const std::string& compress_type_str,
TFileFormatType::type* format_type, TFileCompressType::type* compress_type);
bool _is_format_support_streaming(TFileFormatType::type format);
Status _handle_group_commit(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx);
private:
ExecEnv* _exec_env;

View File

@ -188,50 +188,8 @@ int StreamLoadAction::on_header(HttpRequest* req) {
url_decode(req->param(HTTP_DB_KEY), &ctx->db);
url_decode(req->param(HTTP_TABLE_KEY), &ctx->table);
ctx->label = req->header(HTTP_LABEL_KEY);
Status st = Status::OK();
std::string group_commit_mode = req->header(HTTP_GROUP_COMMIT);
if (!group_commit_mode.empty() && !iequal(group_commit_mode, "sync_mode") &&
!iequal(group_commit_mode, "async_mode") && !iequal(group_commit_mode, "off_mode")) {
st = Status::InternalError("group_commit can only be [async_mode, sync_mode, off_mode]");
} else if (group_commit_mode.empty() || iequal(group_commit_mode, "off_mode")) {
// off_mode and empty
group_commit_mode = "off_mode";
ctx->group_commit = false;
} else {
// sync_mode and async_mode
ctx->group_commit = true;
}
auto partial_columns = !req->header(HTTP_PARTIAL_COLUMNS).empty() &&
iequal(req->header(HTTP_PARTIAL_COLUMNS), "true");
ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true";
auto temp_partitions = !req->header(HTTP_TEMP_PARTITIONS).empty();
auto partitions = !req->header(HTTP_PARTITIONS).empty();
if (!partial_columns && !partitions && !temp_partitions && !ctx->two_phase_commit &&
(!group_commit_mode.empty() || config::wait_internal_group_commit_finish)) {
if (!config::wait_internal_group_commit_finish && ctx->group_commit &&
!ctx->label.empty()) {
st = Status::InternalError("label and group_commit can't be set at the same time");
}
if (iequal(group_commit_mode, "async_mode") || config::wait_internal_group_commit_finish) {
ctx->group_commit = true;
group_commit_mode = load_size_smaller_than_wal_limit(req) ? "async_mode" : "sync_mode";
if (iequal(group_commit_mode, "sync_mode")) {
size_t max_available_size =
ExecEnv::GetInstance()->wal_mgr()->get_max_available_size();
LOG(INFO) << "When enable group commit, the data size can't be too large. The data "
"size "
"for this stream load("
<< (req->header(HttpHeaders::CONTENT_LENGTH).empty()
? 0
: std::stol(req->header(HttpHeaders::CONTENT_LENGTH)))
<< " Bytes) exceeds the WAL (Write-Ahead Log) limit ("
<< max_available_size
<< " Bytes). So we set this load to \"group commit\"=sync_mode "
"automatically.";
st = Status::Error<EXCEEDED_LIMIT>("Stream load size too large.");
}
}
}
Status st = _handle_group_commit(req, ctx);
if (!ctx->group_commit && ctx->label.empty()) {
ctx->label = generate_uuid_string();
}
@ -649,7 +607,12 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
request.__set_memtable_on_sink_node(value);
}
if (ctx->group_commit) {
request.__set_group_commit_mode(http_req->header(HTTP_GROUP_COMMIT));
if (!http_req->header(HTTP_GROUP_COMMIT).empty()) {
request.__set_group_commit_mode(http_req->header(HTTP_GROUP_COMMIT));
} else {
// used for wait_internal_group_commit_finish
request.__set_group_commit_mode("sync_mode");
}
}
#ifndef BE_TEST
@ -672,8 +635,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
}
if (http_req->header(HTTP_GROUP_COMMIT) == "async_mode") {
if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
size_t content_length = 0;
content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
size_t content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
if (ctx->format == TFileFormatType::FORMAT_CSV_GZ ||
ctx->format == TFileFormatType::FORMAT_CSV_LZO ||
ctx->format == TFileFormatType::FORMAT_CSV_BZ2 ||
@ -730,4 +692,50 @@ void StreamLoadAction::_save_stream_load_record(std::shared_ptr<StreamLoadContex
}
}
Status StreamLoadAction::_handle_group_commit(HttpRequest* req,
std::shared_ptr<StreamLoadContext> ctx) {
std::string group_commit_mode = req->header(HTTP_GROUP_COMMIT);
if (!group_commit_mode.empty() && !iequal(group_commit_mode, "sync_mode") &&
!iequal(group_commit_mode, "async_mode") && !iequal(group_commit_mode, "off_mode")) {
return Status::InternalError("group_commit can only be [async_mode, sync_mode, off_mode]");
}
if (config::wait_internal_group_commit_finish) {
group_commit_mode = "sync_mode";
}
if (group_commit_mode.empty() || iequal(group_commit_mode, "off_mode")) {
// off_mode and empty
ctx->group_commit = false;
return Status::OK();
}
auto partial_columns = !req->header(HTTP_PARTIAL_COLUMNS).empty() &&
iequal(req->header(HTTP_PARTIAL_COLUMNS), "true");
auto temp_partitions = !req->header(HTTP_TEMP_PARTITIONS).empty();
auto partitions = !req->header(HTTP_PARTITIONS).empty();
if (!partial_columns && !partitions && !temp_partitions && !ctx->two_phase_commit) {
if (!config::wait_internal_group_commit_finish && !ctx->label.empty()) {
return Status::InternalError("label and group_commit can't be set at the same time");
}
ctx->group_commit = true;
if (iequal(group_commit_mode, "async_mode")) {
group_commit_mode = load_size_smaller_than_wal_limit(req) ? "async_mode" : "sync_mode";
if (iequal(group_commit_mode, "sync_mode")) {
size_t max_available_size =
ExecEnv::GetInstance()->wal_mgr()->get_max_available_size();
LOG(INFO) << "When enable group commit, the data size can't be too large or "
"unknown. The data size for this stream load("
<< (req->header(HttpHeaders::CONTENT_LENGTH).empty()
? 0
: req->header(HttpHeaders::CONTENT_LENGTH))
<< " Bytes) exceeds the WAL (Write-Ahead Log) limit ("
<< max_available_size
<< " Bytes). So we set this load to \"group commit\"=sync_mode\" "
"automatically.";
return Status::Error<EXCEEDED_LIMIT>("Stream load size too large.");
}
}
}
return Status::OK();
}
} // namespace doris

View File

@ -50,7 +50,7 @@ private:
Status _data_saved_path(HttpRequest* req, std::string* file_path);
Status _process_put(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx);
void _save_stream_load_record(std::shared_ptr<StreamLoadContext> ctx, const std::string& str);
bool _load_size_smaller_than_wal_limit(HttpRequest* req);
Status _handle_group_commit(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx);
private:
ExecEnv* _exec_env;

View File

@ -156,7 +156,6 @@ public:
Status update_load_info(TUniqueId load_id, size_t content_length);
Status get_load_info(TUniqueId load_id, size_t* content_length);
Status remove_load_info(TUniqueId load_id);
std::condition_variable cv;
private:
ExecEnv* _exec_env = nullptr;

View File

@ -19,8 +19,6 @@
#include <gen_cpp/DataSinks_types.h>
#include <chrono>
#include <mutex>
#include <shared_mutex>
#include "common/exception.h"
@ -29,7 +27,6 @@
#include "runtime/runtime_state.h"
#include "util/doris_metrics.h"
#include "vec/exprs/vexpr.h"
#include "vec/sink/volap_table_sink.h"
#include "vec/sink/vtablet_finder.h"
namespace doris {
@ -74,10 +71,10 @@ Status GroupCommitBlockSink::prepare(RuntimeState* state) {
_state = state;
// profile must add to state's object pool
_profile = state->obj_pool()->add(new RuntimeProfile("OlapTableSink"));
_profile = state->obj_pool()->add(new RuntimeProfile("GroupCommitBlockSink"));
init_sink_common_profile();
_mem_tracker =
std::make_shared<MemTracker>("OlapTableSink:" + std::to_string(state->load_job_id()));
_mem_tracker = std::make_shared<MemTracker>("GroupCommitBlockSink:" +
std::to_string(state->load_job_id()));
SCOPED_TIMER(_profile->total_time_counter());
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
@ -117,7 +114,7 @@ Status GroupCommitBlockSink::close(RuntimeState* state, Status close_status) {
(double)state->num_rows_load_filtered() / num_selected_rows > _max_filter_ratio) {
return Status::DataQualityError("too many filtered rows");
}
RETURN_IF_ERROR(_add_blocks(_group_commit_mode != TGroupCommitMode::SYNC_MODE, true));
RETURN_IF_ERROR(_add_blocks(true));
}
if (_load_block_queue) {
_load_block_queue->remove_load_id(_load_id);
@ -223,15 +220,15 @@ Status GroupCommitBlockSink::_add_block(RuntimeState* state,
_blocks.emplace_back(output_block);
} else {
if (!_is_block_appended) {
RETURN_IF_ERROR(_add_blocks(_group_commit_mode != TGroupCommitMode::SYNC_MODE, false));
RETURN_IF_ERROR(_add_blocks(false));
}
RETURN_IF_ERROR(_load_block_queue->add_block(
output_block, _group_commit_mode != TGroupCommitMode::SYNC_MODE));
output_block, _group_commit_mode == TGroupCommitMode::ASYNC_MODE));
}
return Status::OK();
}
Status GroupCommitBlockSink::_add_blocks(bool write_wal, bool is_blocks_contain_all_load_data) {
Status GroupCommitBlockSink::_add_blocks(bool is_blocks_contain_all_load_data) {
DCHECK(_is_block_appended == false);
TUniqueId load_id;
load_id.__set_hi(_load_id.hi);
@ -241,18 +238,15 @@ Status GroupCommitBlockSink::_add_blocks(bool write_wal, bool is_blocks_contain_
RETURN_IF_ERROR(_state->exec_env()->group_commit_mgr()->get_first_block_load_queue(
_db_id, _table_id, _base_schema_version, load_id, _load_block_queue,
_state->be_exec_version()));
if (write_wal) {
if (_group_commit_mode == TGroupCommitMode::ASYNC_MODE) {
_group_commit_mode = _load_block_queue->has_enough_wal_disk_space(
_blocks, load_id, is_blocks_contain_all_load_data)
? TGroupCommitMode::ASYNC_MODE
: TGroupCommitMode::SYNC_MODE;
if (_group_commit_mode == TGroupCommitMode::SYNC_MODE) {
LOG(INFO)
<< "Load label " << _load_block_queue->label
<< " will not write wal because wal disk space usage reachs max limit.";
} else {
LOG(INFO) << "Load label " << _load_block_queue->label << " will write wal to "
<< _load_block_queue->wal_base_path << ".";
LOG(INFO) << "Load id=" << print_id(_state->query_id())
<< ", use group commit label=" << _load_block_queue->label
<< " will not write wal because wal disk space usage reach max limit";
}
}
_state->set_import_label(_load_block_queue->label);
@ -263,7 +257,7 @@ Status GroupCommitBlockSink::_add_blocks(bool write_wal, bool is_blocks_contain_
}
for (auto it = _blocks.begin(); it != _blocks.end(); ++it) {
RETURN_IF_ERROR(_load_block_queue->add_block(
*it, _group_commit_mode != TGroupCommitMode::SYNC_MODE));
*it, _group_commit_mode == TGroupCommitMode::ASYNC_MODE));
}
_is_block_appended = true;
_blocks.clear();

View File

@ -47,7 +47,7 @@ public:
private:
Status _add_block(RuntimeState* state, std::shared_ptr<vectorized::Block> block);
Status _add_blocks(bool write_wal, bool is_blocks_contain_all_load_data);
Status _add_blocks(bool is_blocks_contain_all_load_data);
vectorized::VExprContextSPtrs _output_vexpr_ctxs;
@ -70,6 +70,8 @@ private:
std::vector<std::shared_ptr<vectorized::Block>> _blocks;
bool _is_block_appended = false;
double _max_filter_ratio = 0.0;
// used for find_partition
VOlapTablePartitionParam* _vpartition = nullptr;
// reuse for find_tablet.
std::vector<VOlapTablePartition*> _partitions;

View File

@ -154,6 +154,7 @@ public class NativeInsertStmt extends InsertStmt {
private long tableId = -1;
public boolean isGroupCommitStreamLoadSql = false;
private GroupCommitPlanner groupCommitPlanner;
private boolean reuseGroupCommitPlan = false;
private boolean isFromDeleteOrUpdateStmt = false;
@ -1162,12 +1163,18 @@ public class NativeInsertStmt extends InsertStmt {
return isGroupCommit;
}
public boolean isReuseGroupCommitPlan() {
return reuseGroupCommitPlan;
}
public GroupCommitPlanner planForGroupCommit(TUniqueId queryId) throws UserException, TException {
OlapTable olapTable = (OlapTable) getTargetTable();
if (groupCommitPlanner != null && olapTable.getBaseSchemaVersion() == baseSchemaVersion) {
LOG.debug("reuse group commit plan, table={}", olapTable);
reuseGroupCommitPlan = true;
return groupCommitPlanner;
}
reuseGroupCommitPlan = false;
if (!targetColumns.isEmpty()) {
Analyzer analyzerTmp = analyzer;
reset();

View File

@ -58,6 +58,8 @@ public class GroupCommitBlockSink extends OlapTableSink {
return TGroupCommitMode.ASYNC_MODE;
} else if (groupCommit.equalsIgnoreCase("sync_mode")) {
return TGroupCommitMode.SYNC_MODE;
} else if (groupCommit.equalsIgnoreCase("off_mode")) {
return TGroupCommitMode.OFF_MODE;
} else {
return null;
}

View File

@ -33,6 +33,7 @@ import org.apache.doris.nereids.parser.ParseDialect.Dialect;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.planner.GroupCommitBlockSink;
import org.apache.doris.qe.VariableMgr.VarAttr;
import org.apache.doris.thrift.TGroupCommitMode;
import org.apache.doris.thrift.TQueryOptions;
import org.apache.doris.thrift.TResourceLimit;
import org.apache.doris.thrift.TRuntimeFilterType;
@ -3176,7 +3177,9 @@ public class SessionVariable implements Serializable, Writable {
}
public boolean isEnableInsertGroupCommit() {
return Config.wait_internal_group_commit_finish || GroupCommitBlockSink.parseGroupCommit(groupCommit) != null;
return Config.wait_internal_group_commit_finish
|| GroupCommitBlockSink.parseGroupCommit(groupCommit) == TGroupCommitMode.ASYNC_MODE
|| GroupCommitBlockSink.parseGroupCommit(groupCommit) == TGroupCommitMode.SYNC_MODE;
}
public String getGroupCommit() {

View File

@ -1919,6 +1919,7 @@ public class StmtExecutor {
String errMsg = "";
TableType tblType = insertStmt.getTargetTable().getType();
boolean isGroupCommit = false;
boolean reuseGroupCommitPlan = false;
if (context.isTxnModel()) {
if (insertStmt.getQueryStmt() instanceof SelectStmt) {
if (((SelectStmt) insertStmt.getQueryStmt()).getTableRefs().size() > 0) {
@ -1935,6 +1936,7 @@ public class StmtExecutor {
int maxRetry = 3;
for (int i = 0; i < maxRetry; i++) {
GroupCommitPlanner groupCommitPlanner = nativeInsertStmt.planForGroupCommit(context.queryId);
reuseGroupCommitPlan = nativeInsertStmt.isReuseGroupCommitPlan();
List<InternalService.PDataRow> rows = groupCommitPlanner.getRows(nativeInsertStmt);
PGroupCommitInsertResponse response = groupCommitPlanner.executeGroupCommitInsert(context, rows);
TStatusCode code = TStatusCode.findByValue(response.getStatus().getStatusCode());
@ -2106,6 +2108,9 @@ public class StmtExecutor {
}
if (isGroupCommit) {
sb.append(", 'query_id':'").append(DebugUtil.printId(context.queryId)).append("'");
if (reuseGroupCommitPlan) {
sb.append(", 'reuse_group_commit_plan':'").append(true).append("'");
}
}
sb.append("}");

View File

@ -215,7 +215,7 @@ suite("insert_group_commit_into") {
}
// test connect to observer fe
/*try {
try {
def fes = sql_return_maparray "show frontends"
logger.info("frontends: ${fes}")
if (fes.size() > 1) {
@ -233,6 +233,7 @@ suite("insert_group_commit_into") {
sql """ set group_commit = async_mode; """
sql """ set enable_nereids_dml = false; """
sql """ set enable_profile= true; """
sql """ set enable_nereids_planner = false; """
// 1. insert into
def server_info = group_commit_insert """ insert into ${table}(name, id) values('c', 3); """, 1
@ -261,7 +262,7 @@ suite("insert_group_commit_into") {
logger.info("only one fe, skip test connect to observer fe")
}
} finally {
}*/
}
// table with array type
tableName = "insert_group_commit_into_duplicate_array"

View File

@ -18,10 +18,15 @@
import com.mysql.cj.ServerPreparedQuery
import com.mysql.cj.jdbc.ConnectionImpl
import com.mysql.cj.jdbc.JdbcStatement
import com.mysql.cj.jdbc.ServerPreparedStatement
import com.mysql.cj.jdbc.ServerPreparedStatement;
import com.mysql.cj.jdbc.StatementImpl
import com.mysql.cj.jdbc.result.ResultSetImpl
import com.mysql.cj.jdbc.result.ResultSetInternalMethods
import java.lang.reflect.Field
import java.sql.ResultSet
import java.util.ArrayList
import java.util.List
import java.util.concurrent.CopyOnWriteArrayList
suite("insert_group_commit_with_prepare_stmt") {
@ -72,22 +77,38 @@ suite("insert_group_commit_with_prepare_stmt") {
stmt.addBatch()
}
def group_commit_insert = { stmt, expected_row_count ->
def group_commit_insert = { stmt, expected_row_count, reuse_plan = false ->
def result = stmt.executeBatch()
logger.info("insert result: " + result)
def results = ((StatementImpl) stmt).results
if (results == null) {
logger.warn("result is null")
return
}
def serverInfo = results.getServerInfo()
logger.info("result server info: " + serverInfo)
if (result != expected_row_count) {
logger.warn("insert result: " + result + ", expected_row_count: " + expected_row_count)
if (results != null) {
def serverInfo = results.getServerInfo()
logger.info("result server info: " + serverInfo)
if (result != expected_row_count) {
logger.warn("insert result: " + result + ", expected_row_count: " + expected_row_count)
}
assertTrue(serverInfo.contains("'status':'PREPARE'"))
assertTrue(serverInfo.contains("'label':'group_commit_"))
assertEquals(reuse_plan, serverInfo.contains("reuse_group_commit_plan"))
} else {
// for batch insert
ConnectionImpl connection = (ConnectionImpl) stmt.getConnection()
Field field = ConnectionImpl.class.getDeclaredField("openStatements")
field.setAccessible(true)
CopyOnWriteArrayList<JdbcStatement> openStatements = (CopyOnWriteArrayList<JdbcStatement>) field.get(connection)
for (JdbcStatement openStatement : openStatements) {
ServerPreparedStatement serverPreparedStatement = (ServerPreparedStatement) openStatement;
Field field2 = StatementImpl.class.getDeclaredField("results");
field2.setAccessible(true);
ResultSet resultSet = (ResultSetInternalMethods) field2.get(serverPreparedStatement);
if (resultSet != null) {
ResultSetImpl resultSetImpl = (ResultSetImpl) resultSet;
String serverInfo = resultSetImpl.getServerInfo();
logger.info("serverInfo = " + serverInfo);
}
}
}
// assertEquals(result, expected_row_count)
assertTrue(serverInfo.contains("'status':'PREPARE'"))
assertTrue(serverInfo.contains("'label':'group_commit_"))
}
def getStmtId = { stmt ->
@ -140,12 +161,12 @@ suite("insert_group_commit_with_prepare_stmt") {
insert_prepared insert_stmt, 2, null, 20
insert_prepared insert_stmt, 3, "c", null
insert_prepared insert_stmt, 4, "d", 40
group_commit_insert insert_stmt, 3
group_commit_insert insert_stmt, 3, true
assertEquals(stmtId, getStmtId(insert_stmt))
insert_prepared insert_stmt, 5, "e", null
insert_prepared insert_stmt, 6, "f", 40
group_commit_insert insert_stmt, 2
group_commit_insert insert_stmt, 2, true
assertEquals(stmtId, getStmtId(insert_stmt))
getRowCount(6)
@ -161,7 +182,7 @@ suite("insert_group_commit_with_prepare_stmt") {
insert_prepared_partial insert_stmt, 'e', 7, 0
insert_prepared_partial insert_stmt, null, 8, 0
group_commit_insert insert_stmt, 2
group_commit_insert insert_stmt, 2, true
assertEquals(stmtId2, getStmtId(insert_stmt))
getRowCount(7)