[fix](group commit) Pick add debug log show why group commit not work; delete wal when replay success (#38611) (#38659)

Pick https://github.com/apache/doris/pull/38611
This commit is contained in:
meiyi
2024-08-01 16:59:54 +08:00
committed by GitHub
parent cafcf7acc1
commit e8690b62ee
5 changed files with 108 additions and 21 deletions

View File

@ -86,7 +86,6 @@ void WalTable::_pick_relay_wals() {
Status WalTable::_relay_wal_one_by_one() {
std::vector<std::shared_ptr<WalInfo>> need_retry_wals;
std::vector<std::shared_ptr<WalInfo>> need_delete_wals;
for (auto wal_info : _replaying_queue) {
wal_info->add_retry_num();
auto st = _replay_wal_internal(wal_info->get_wal_path());
@ -96,7 +95,12 @@ Status WalTable::_relay_wal_one_by_one() {
msg.find("LabelAlreadyUsedException") != msg.npos) {
LOG(INFO) << "succeed to replay wal=" << wal_info->get_wal_path()
<< ", st=" << st.to_string();
need_delete_wals.push_back(wal_info);
// delete wal
WARN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(_table_id, wal_info->get_wal_id()),
"failed to delete wal=" + wal_info->get_wal_path());
if (config::group_commit_wait_replay_wal_finish) {
RETURN_IF_ERROR(_exec_env->wal_mgr()->notify_relay_wal(wal_info->get_wal_id()));
}
} else {
doris::wal_fail << 1;
LOG(WARNING) << "failed to replay wal=" << wal_info->get_wal_path()
@ -111,13 +115,6 @@ Status WalTable::_relay_wal_one_by_one() {
_replay_wal_map.emplace(retry_wal_info->get_wal_path(), retry_wal_info);
}
}
for (auto delete_wal_info : need_delete_wals) {
[[maybe_unused]] auto st =
_exec_env->wal_mgr()->delete_wal(_table_id, delete_wal_info->get_wal_id());
if (config::group_commit_wait_replay_wal_finish) {
RETURN_IF_ERROR(_exec_env->wal_mgr()->notify_relay_wal(delete_wal_info->get_wal_id()));
}
}
return Status::OK();
}

View File

@ -398,6 +398,8 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_
DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.err_status",
{ status = Status::InternalError(""); });
if (status.ok()) {
DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.commit_error",
{ status = Status::InternalError(""); });
// commit txn
TLoadTxnCommitRequest request;
request.__set_auth_code(0); // this is a fake, fe not check it now

View File

@ -83,6 +83,8 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
@ -1200,28 +1202,53 @@ public class NativeInsertStmt extends InsertStmt {
LOG.warn("analyze group commit failed", e);
return;
}
boolean partialUpdate = ConnectContext.get().getSessionVariable().isEnableUniqueKeyPartialUpdate();
if (!isExplain() && !partialUpdate && ConnectContext.get().getSessionVariable().isEnableInsertGroupCommit()
&& ConnectContext.get().getSessionVariable().getSqlMode() != SqlModeHelper.MODE_NO_BACKSLASH_ESCAPES
&& targetTable instanceof OlapTable
&& ((OlapTable) targetTable).getTableProperty().getUseSchemaLightChange()
&& !targetTable.getQualifiedDbName().equalsIgnoreCase(FeConstants.INTERNAL_DB_NAME)
&& !ConnectContext.get().isTxnModel()
&& getQueryStmt() instanceof SelectStmt
&& ((SelectStmt) getQueryStmt()).getTableRefs().isEmpty() && targetPartitionNames == null
&& (label == null || Strings.isNullOrEmpty(label.getLabelName()))
&& (analyzer == null || analyzer != null && !analyzer.isReAnalyze())) {
ConnectContext ctx = ConnectContext.get();
List<Pair<BooleanSupplier, Supplier<String>>> conditions = new ArrayList<>();
conditions.add(Pair.of(() -> ctx.getSessionVariable().isEnableInsertGroupCommit(),
() -> "group_commit session variable: " + ctx.getSessionVariable().groupCommit));
conditions.add(Pair.of(() -> !isExplain(), () -> "isExplain"));
conditions.add(Pair.of(() -> !ctx.getSessionVariable().isEnableUniqueKeyPartialUpdate(),
() -> "enableUniqueKeyPartialUpdate"));
conditions.add(Pair.of(() -> !ctx.isTxnModel(), () -> "isTxnModel"));
conditions.add(Pair.of(() -> targetTable instanceof OlapTable,
() -> "not olapTable, class: " + targetTable.getClass().getName()));
conditions.add(Pair.of(() -> ((OlapTable) targetTable).getTableProperty().getUseSchemaLightChange(),
() -> "notUseSchemaLightChange"));
conditions.add(Pair.of(() -> !targetTable.getQualifiedDbName().equalsIgnoreCase(FeConstants.INTERNAL_DB_NAME),
() -> "db is internal"));
conditions.add(
Pair.of(() -> targetPartitionNames == null, () -> "targetPartitionNames: " + targetPartitionNames));
conditions.add(Pair.of(() -> ctx.getSessionVariable().getSqlMode() != SqlModeHelper.MODE_NO_BACKSLASH_ESCAPES,
() -> "sqlMode: " + ctx.getSessionVariable().getSqlMode()));
conditions.add(Pair.of(() -> queryStmt instanceof SelectStmt,
() -> "queryStmt is not SelectStmt, class: " + queryStmt.getClass().getName()));
conditions.add(Pair.of(() -> ((SelectStmt) queryStmt).getTableRefs().isEmpty(),
() -> "tableRefs is not empty: " + ((SelectStmt) queryStmt).getTableRefs()));
conditions.add(
Pair.of(() -> (label == null || Strings.isNullOrEmpty(label.getLabelName())), () -> "label: " + label));
conditions.add(
Pair.of(() -> (analyzer == null || analyzer != null && !analyzer.isReAnalyze()), () -> "analyzer"));
boolean match = conditions.stream().allMatch(p -> p.first.getAsBoolean());
if (match) {
SelectStmt selectStmt = (SelectStmt) queryStmt;
if (selectStmt.getValueList() != null) {
for (List<Expr> row : selectStmt.getValueList().getRows()) {
for (Expr expr : row) {
if (!(expr instanceof LiteralExpr)) {
if (LOG.isDebugEnabled()) {
LOG.debug("group commit is off for table: {}, because not literal expr, "
+ "expr: {}, row: {}", targetTable.getName(), expr, row);
}
return;
}
}
}
// Does not support: insert into tbl values();
if (selectStmt.getValueList().getFirstRow().isEmpty() && CollectionUtils.isEmpty(targetColumnNames)) {
if (LOG.isDebugEnabled()) {
LOG.debug("group commit is off for table: {}, because first row: {}, target columns: {}",
targetTable.getName(), selectStmt.getValueList().getFirstRow(), targetColumnNames);
}
return;
}
} else {
@ -1231,6 +1258,10 @@ public class NativeInsertStmt extends InsertStmt {
if (items != null) {
for (SelectListItem item : items) {
if (item.getExpr() != null && !(item.getExpr() instanceof LiteralExpr)) {
if (LOG.isDebugEnabled()) {
LOG.debug("group commit is off for table: {}, because not literal expr, "
+ "expr: {}, row: {}", targetTable.getName(), item.getExpr(), item);
}
return;
}
}
@ -1238,6 +1269,16 @@ public class NativeInsertStmt extends InsertStmt {
}
}
isGroupCommit = true;
} else {
if (LOG.isDebugEnabled()) {
for (Pair<BooleanSupplier, Supplier<String>> pair : conditions) {
if (pair.first.getAsBoolean() == false) {
LOG.debug("group commit is off for table: {}, because: {}", targetTable.getName(),
pair.second.get());
break;
}
}
}
}
}

View File

@ -15,11 +15,24 @@
// specific language governing permissions and limitations
// under the License.
import org.awaitility.Awaitility
import static java.util.concurrent.TimeUnit.SECONDS
suite("test_group_commit_async_wal_msg_fault_injection","nonConcurrent") {
def tableName = "wal_test"
def getRowCount = { expectedRowCount ->
Awaitility.await().atMost(30, SECONDS).pollInterval(1, SECONDS).until(
{
def result = sql "select count(*) from ${tableName}"
logger.info("table: ${tableName}, rowCount: ${result}")
return result[0][0] == expectedRowCount
}
)
}
// test successful group commit async load
sql """ DROP TABLE IF EXISTS ${tableName} """
@ -132,4 +145,30 @@ suite("test_group_commit_async_wal_msg_fault_injection","nonConcurrent") {
assertTrue(exception)
}
// test replay wal should success
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
`k` int ,
`v` int ,
) engine=olap
DISTRIBUTED BY HASH(`k`)
BUCKETS 5
properties("replication_num" = "1", "group_commit_interval_ms" = "4000")
"""
GetDebugPoint().clearDebugPointsForAllBEs()
try {
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.commit_error")
streamLoad {
table "${tableName}"
set 'column_separator', ','
set 'group_commit', 'async_mode'
unset 'label'
file 'group_commit_wal_msg.csv'
time 10000
}
getRowCount(5)
} finally {
GetDebugPoint().disableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.commit_error")
}
}

File diff suppressed because one or more lines are too long