[fix](group_commit) Need to wait wal to be deleted when creating MaterializedView (#30956)

This commit is contained in:
huanghaibin
2024-02-21 08:58:31 +08:00
committed by yiguolei
parent ccac4208e1
commit 1e3968fe7e
6 changed files with 186 additions and 31 deletions

View File

@ -289,6 +289,7 @@ Status WalManager::_load_wals() {
continue;
}
}
_exec_env->wal_mgr()->add_wal_queue(wal.tb_id, wal.wal_id);
WARN_IF_ERROR(add_recover_wal(wal.db_id, wal.tb_id, wal.wal_id, wal.wal_path),
fmt::format("Failed to add recover wal={}", wal.wal_path));
}
@ -406,7 +407,6 @@ Status WalManager::_replay_background() {
Status WalManager::add_recover_wal(int64_t db_id, int64_t table_id, int64_t wal_id,
std::string wal) {
add_wal_queue(table_id, wal_id);
std::lock_guard<std::shared_mutex> wrlock(_table_lock);
std::shared_ptr<WalTable> table_ptr;
auto it = _table_map.find(table_id);

View File

@ -218,6 +218,10 @@ public class MaterializedViewHandler extends AlterHandler {
olapTable.setState(OlapTableState.ROLLUP);
// wait wal delete
Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId());
Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId());
Env.getCurrentEnv().getEditLog().logAlterJob(rollupJobV2);
LOG.info("finished to create materialized view job: {}", rollupJobV2.getJobId());
} finally {
@ -300,6 +304,11 @@ public class MaterializedViewHandler extends AlterHandler {
// ATTN: This order is not mandatory, because database lock will protect us,
// but this order is more reasonable
olapTable.setState(OlapTableState.ROLLUP);
// wait wal delete
Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId());
Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId());
// 2 batch submit rollup job
List<AlterJobV2> rollupJobV2List = new ArrayList<>(rollupNameJobMap.values());
batchAddAlterJobV2(rollupJobV2List);
@ -1169,6 +1178,7 @@ public class MaterializedViewHandler extends AlterHandler {
private void onJobDone(AlterJobV2 alterJob) {
removeJobFromRunningQueue(alterJob);
if (removeAlterJobV2FromTableNotFinalStateJobMap(alterJob)) {
Env.getCurrentEnv().getGroupCommitManager().unblockTable(alterJob.getTableId());
changeTableStatus(alterJob.getDbId(), alterJob.getTableId(), OlapTableState.NORMAL);
LOG.info("set table's state to NORMAL, table id: {}, job id: {}", alterJob.getTableId(),
alterJob.getJobId());

View File

@ -40,7 +40,6 @@ import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.common.MetaNotFoundException;
@ -537,7 +536,9 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
}
return;
}
waitWalFinished();
Env.getCurrentEnv().getGroupCommitManager().blockTable(tableId);
Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(tableId);
Env.getCurrentEnv().getGroupCommitManager().unblockTable(tableId);
/*
* all tasks are finished. check the integrity.
* we just check whether all new replicas are healthy.
@ -600,34 +601,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
LOG.info("set table's state to NORMAL, table id: {}, job id: {}", tableId, jobId);
}
private void waitWalFinished() {
// wait wal done here
Env.getCurrentEnv().getGroupCommitManager().blockTable(tableId);
LOG.info("block group commit for table={} when schema change", tableId);
List<Long> aliveBeIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
long expireTime = System.currentTimeMillis() + Config.check_wal_queue_timeout_threshold;
while (true) {
LOG.info("wait for wal queue size to be empty");
boolean walFinished = Env.getCurrentEnv().getGroupCommitManager()
.isPreviousWalFinished(tableId, aliveBeIds);
if (walFinished) {
LOG.info("all wal is finished for table={}", tableId);
break;
} else if (System.currentTimeMillis() > expireTime) {
LOG.warn("waitWalFinished time out for table={}", tableId);
break;
} else {
try {
Thread.sleep(100);
} catch (InterruptedException ie) {
LOG.warn("failed to wait for wal for table={} when schema change", tableId, ie);
}
}
}
Env.getCurrentEnv().getGroupCommitManager().unblockTable(tableId);
LOG.info("unblock group commit for table={} when schema change", tableId);
}
private void onFinished(OlapTable tbl) {
// replace the origin index with shadow index, set index state as NORMAL
for (Partition partition : tbl.getPartitions()) {

View File

@ -45,11 +45,39 @@ public class GroupCommitManager {
}
public void blockTable(long tableId) {
LOG.info("block group commit for table={} when schema change", tableId);
blockedTableIds.add(tableId);
}
public void unblockTable(long tableId) {
blockedTableIds.remove(tableId);
LOG.info("unblock group commit for table={} when schema change", tableId);
}
/**
* Waiting All WAL files to be deleted.
*/
public void waitWalFinished(long tableId) {
List<Long> aliveBeIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
long expireTime = System.currentTimeMillis() + Config.check_wal_queue_timeout_threshold;
while (true) {
LOG.info("wait for wal queue size to be empty");
boolean walFinished = Env.getCurrentEnv().getGroupCommitManager()
.isPreviousWalFinished(tableId, aliveBeIds);
if (walFinished) {
LOG.info("all wal is finished for table={}", tableId);
break;
} else if (System.currentTimeMillis() > expireTime) {
LOG.warn("waitWalFinished time out for table={}", tableId);
break;
} else {
try {
Thread.sleep(100);
} catch (InterruptedException ie) {
LOG.warn("failed to wait for wal for table={} when schema change", tableId, ie);
}
}
}
}
/**

File diff suppressed because one or more lines are too long

View File

@ -318,5 +318,131 @@ suite("insert_group_commit_into") {
} finally {
// try_sql("DROP TABLE ${table}")
}
// table with MaterializedView
tableName = "insert_group_commit_into_mv"
table = dbName + "." + tableName
def table_tmp = dbName + ".test_table_tmp"
try {
// create table
sql """ drop table if exists ${table}; """
sql """CREATE table ${table} (
`ordernum` varchar(65533) NOT NULL ,
`dnt` datetime NOT NULL ,
`data` json NULL
) ENGINE=OLAP
DUPLICATE KEY(`ordernum`, `dnt`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`ordernum`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);"""
sql """drop table if exists ${table_tmp};"""
sql """CREATE TABLE ${table_tmp} (
`dnt` varchar(200) NULL,
`ordernum` varchar(200) NULL,
`type` varchar(20) NULL,
`powers` double SUM NULL,
`p0` double REPLACE NULL,
`heatj` double SUM NULL,
`j0` double REPLACE NULL,
`heatg` double SUM NULL,
`g0` double REPLACE NULL,
`solar` double SUM NULL
) ENGINE=OLAP
AGGREGATE KEY(`dnt`, `ordernum`, `type`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`ordernum`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
); """
sql """DROP MATERIALIZED VIEW IF EXISTS ods_zn_dnt_max1 ON ${table};"""
sql """create materialized view ods_zn_dnt_max1 as
select ordernum,max(dnt) as dnt from ${table}
group by ordernum
ORDER BY ordernum;"""
connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) {
sql """ set group_commit = async_mode; """
if (item == "nereids") {
sql """ set enable_nereids_dml = true; """
sql """ set enable_nereids_planner=true; """
//sql """ set enable_fallback_to_original_planner=false; """
} else {
sql """ set enable_nereids_dml = false; """
}
// 1. insert into
int count = 0;
while (count < 30) {
try {
group_commit_insert """
insert into ${table} values('cib2205045_1_1s','2023/6/10 3:55:33','{"DB1":168939,"DNT":"2023-06-10 03:55:33"}');""", 1
break
} catch (Exception e) {
logger.info("got exception:" + e)
if (e.getMessage().contains("is blocked on schema change")) {
Thread.sleep(1000)
}
count++
}
}
group_commit_insert """insert into ${table} values('cib2205045_1_1s','2023/6/10 3:56:33','{"DB1":168939,"DNT":"2023-06-10 03:56:33"}');""", 1
group_commit_insert """insert into ${table} values('cib2205045_1_1s','2023/6/10 3:57:33','{"DB1":168939,"DNT":"2023-06-10 03:57:33"}');""", 1
group_commit_insert """insert into ${table} values('cib2205045_1_1s','2023/6/10 3:58:33','{"DB1":168939,"DNT":"2023-06-10 03:58:33"}');""", 1
getRowCount(4)
qt_order """select
'2023-06-10',
tmp.ordernum,
cast(nvl(if(tmp.p0-tmp1.p0>0,tmp.p0-tmp1.p0,tmp.p0-tmp.p1),0) as decimal(10,4)),
nvl(tmp.p0,0),
cast(nvl(if(tmp.j0-tmp1.j0>0,tmp.j0-tmp1.j0,tmp.j0-tmp.j1)*277.78,0) as decimal(10,4)),
nvl(tmp.j0,0),
cast(nvl(if(tmp.g0-tmp1.g0>0,tmp.g0-tmp1.g0,tmp.g0-tmp.g1)*277.78,0) as decimal(10,4)),
nvl(tmp.g0,0),
cast(nvl(tmp.solar,0) as decimal(20,4)),
'day'
from
(
select
ordernum,
max(ljrl1) g0,min(ljrl1) g1,
max(ljrl2) j0,min(ljrl2) j1,
max(db1) p0,min(db1) p1,
max(fzl)*1600*0.278 solar
from(
select ordernum,dnt,
cast(if(json_extract(data,'\$.LJRL1')=0 or json_extract(data,'\$.LJRL1') like '%E%',null,json_extract(data,'\$.LJRL1')) as double) ljrl1,
cast(if(json_extract(data,'\$.LJRL2')=0 or json_extract(data,'\$.LJRL2') like '%E%',null,json_extract(data,'\$.LJRL2')) as double) ljrl2,
first_value(cast(if(json_extract(data,'\$.FZL')=0 or json_extract(data,'\$.FZL') like '%E%',null,
json_extract(data,'\$.FZL')) as double)) over (partition by ordernum order by dnt desc) fzl,
cast(if(json_extract(data,'\$.DB1')=0 or json_extract(data,'\$.DB1') like '%E%',null,json_extract(data,'\$.DB1')) as double) db1
from ${table}
)a1
group by ordernum
)tmp left join (
select
ordernum,MAX(p0) p0,MAX(j0) j0,MAX(g0) g0
from ${table_tmp}
group by ordernum
)tmp1
on tmp.ordernum=tmp1.ordernum;"""
qt_order2 """
SELECT
row_number() over(partition by add_date order by pc_num desc)
,row_number() over(partition by add_date order by vc_num desc)
,row_number() over(partition by add_date order by vt_num desc)
FROM (
SELECT
cast(dnt as datev2) add_date
,row_number() over(order by dnt) pc_num
,row_number() over(order by dnt) vc_num
,row_number() over(order by dnt) vt_num
FROM ${table}
) t;"""
}
} finally {
}
}
}