[Fix](executor)Fix workload thread start failed when follower convert to master

This commit is contained in:
wangbo
2024-05-11 23:31:59 +08:00
committed by yiguolei
parent 11360b27a2
commit 20e2d2e2f8
8 changed files with 91 additions and 143 deletions

View File

@ -1711,7 +1711,7 @@ public class Env {
dnsCache.start();
workloadGroupMgr.startUpdateThread();
workloadGroupMgr.start();
workloadSchedPolicyMgr.start();
workloadRuntimeStatusMgr.start();

View File

@ -34,6 +34,7 @@ import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.common.proc.ProcResult;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.DropWorkloadGroupOperationLog;
import org.apache.doris.persist.gson.GsonPostProcessable;
@ -64,7 +65,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
public class WorkloadGroupMgr extends MasterDaemon implements Writable, GsonPostProcessable {
public static final String DEFAULT_GROUP_NAME = "normal";
@ -90,22 +91,13 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
private final ResourceProcNode procNode = new ResourceProcNode();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private Thread updatePropThread;
public void startUpdateThread() {
WorkloadGroupMgr wgMgr = this;
updatePropThread = new Thread(() -> {
Thread.currentThread().setName("reset-query-queue-prop");
while (true) {
try {
wgMgr.resetQueryQueueProp();
Thread.sleep(Config.query_queue_update_interval_ms);
} catch (Throwable e) {
LOG.warn("reset query queue failed ", e);
}
}
});
updatePropThread.start();
@Override
protected void runAfterCatalogReady() {
try {
resetQueryQueueProp();
} catch (Throwable e) {
LOG.warn("reset query queue failed ", e);
}
}
public void resetQueryQueueProp() {
@ -142,6 +134,7 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
}
public WorkloadGroupMgr() {
super("workload-group-thread", Config.query_queue_update_interval_ms);
// if no fe image exist, we should append internal group here.
appendInternalWorkloadGroup();
}

View File

@ -19,7 +19,7 @@ package org.apache.doris.resource.workloadschedpolicy;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.Daemon;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.plugin.audit.AuditEvent;
import org.apache.doris.thrift.TQueryStatistics;
import org.apache.doris.thrift.TReportWorkloadRuntimeStatusParams;
@ -37,7 +37,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class WorkloadRuntimeStatusMgr {
public class WorkloadRuntimeStatusMgr extends MasterDaemon {
private static final Logger LOG = LogManager.getLogger(WorkloadRuntimeStatusMgr.class);
private Map<Long, Map<String, TQueryStatistics>> beToQueryStatsMap = Maps.newConcurrentMap();
@ -46,43 +46,33 @@ public class WorkloadRuntimeStatusMgr {
private final ReentrantReadWriteLock queryAuditEventLock = new ReentrantReadWriteLock();
private List<AuditEvent> queryAuditEventList = Lists.newLinkedList();
class WorkloadRuntimeStatsThread extends Daemon {
WorkloadRuntimeStatusMgr workloadStatsMgr;
public WorkloadRuntimeStatsThread(WorkloadRuntimeStatusMgr workloadRuntimeStatusMgr, String threadName,
int interval) {
super(threadName, interval);
this.workloadStatsMgr = workloadRuntimeStatusMgr;
}
@Override
protected void runOneCycle() {
// 1 merge be query statistics
Map<String, TQueryStatistics> queryStatisticsMap = workloadStatsMgr.getQueryStatisticsMap();
// 2 log query audit
List<AuditEvent> auditEventList = workloadStatsMgr.getQueryNeedAudit();
for (AuditEvent auditEvent : auditEventList) {
TQueryStatistics queryStats = queryStatisticsMap.get(auditEvent.queryId);
if (queryStats != null) {
auditEvent.scanRows = queryStats.scan_rows;
auditEvent.scanBytes = queryStats.scan_bytes;
auditEvent.peakMemoryBytes = queryStats.max_peak_memory_bytes;
auditEvent.cpuTimeMs = queryStats.cpu_ms;
auditEvent.shuffleSendBytes = queryStats.shuffle_send_bytes;
auditEvent.shuffleSendRows = queryStats.shuffle_send_rows;
}
Env.getCurrentAuditEventProcessor().handleAuditEvent(auditEvent);
}
// 3 clear beToQueryStatsMap when be report timeout
workloadStatsMgr.clearReportTimeoutBeStatistics();
}
public WorkloadRuntimeStatusMgr() {
super("workload-runtime-stats-thread", Config.workload_runtime_status_thread_interval_ms);
}
private Daemon thread = null;
@Override
protected void runAfterCatalogReady() {
// 1 merge be query statistics
Map<String, TQueryStatistics> queryStatisticsMap = getQueryStatisticsMap();
// 2 log query audit
List<AuditEvent> auditEventList = getQueryNeedAudit();
for (AuditEvent auditEvent : auditEventList) {
TQueryStatistics queryStats = queryStatisticsMap.get(auditEvent.queryId);
if (queryStats != null) {
auditEvent.scanRows = queryStats.scan_rows;
auditEvent.scanBytes = queryStats.scan_bytes;
auditEvent.peakMemoryBytes = queryStats.max_peak_memory_bytes;
auditEvent.cpuTimeMs = queryStats.cpu_ms;
auditEvent.shuffleSendBytes = queryStats.shuffle_send_bytes;
auditEvent.shuffleSendRows = queryStats.shuffle_send_rows;
}
Env.getCurrentAuditEventProcessor().handleAuditEvent(auditEvent);
}
// 3 clear beToQueryStatsMap when be report timeout
clearReportTimeoutBeStatistics();
}
public void submitFinishQueryToAudit(AuditEvent event) {
queryAuditEventLogWriteLock();
@ -116,12 +106,6 @@ public class WorkloadRuntimeStatusMgr {
return ret;
}
public void start() {
thread = new WorkloadRuntimeStatsThread(this, "workload-runtime-stats-thread",
Config.workload_runtime_status_thread_interval_ms);
thread.start();
}
public void updateBeQueryStats(TReportWorkloadRuntimeStatusParams params) {
if (!params.isSetBackendId()) {
LOG.warn("be report workload runtime status but without beid");

View File

@ -29,6 +29,7 @@ import org.apache.doris.common.io.Writable;
import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.common.proc.ProcResult;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
@ -59,7 +60,7 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable {
public class WorkloadSchedPolicyMgr extends MasterDaemon implements Writable, GsonPostProcessable {
private static final Logger LOG = LogManager.getLogger(WorkloadSchedPolicyMgr.class);
@ -69,6 +70,10 @@ public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable {
private PolicyProcNode policyProcNode = new PolicyProcNode();
public WorkloadSchedPolicyMgr() {
super("workload-sched-thread", Config.workload_sched_policy_interval_ms);
}
public static final ImmutableList<String> WORKLOAD_SCHED_POLICY_NODE_TITLE_NAMES
= new ImmutableList.Builder<String>()
.add("Id").add("Name").add("Condition").add("Action").add("Priority").add("Enabled").add("Version")
@ -99,60 +104,43 @@ public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable {
}
};
private Thread policyExecThread = new Thread() {
@Override
protected void runAfterCatalogReady() {
try {
// todo(wb) add more query info source, not only comes from connectionmap
// 1 get query info map
Map<Integer, ConnectContext> connectMap = ExecuteEnv.getInstance().getScheduler()
.getConnectionMap();
List<WorkloadQueryInfo> queryInfoList = new ArrayList<>();
@Override
public void run() {
while (true) {
try {
// todo(wb) add more query info source, not only comes from connectionmap
// 1 get query info map
Map<Integer, ConnectContext> connectMap = ExecuteEnv.getInstance().getScheduler()
.getConnectionMap();
List<WorkloadQueryInfo> queryInfoList = new ArrayList<>();
// a snapshot for connect context
Set<Integer> keySet = new HashSet<>();
keySet.addAll(connectMap.keySet());
// a snapshot for connect context
Set<Integer> keySet = new HashSet<>();
keySet.addAll(connectMap.keySet());
for (Integer connectId : keySet) {
ConnectContext cctx = connectMap.get(connectId);
if (cctx == null || cctx.isKilled()) {
continue;
}
String username = cctx.getQualifiedUser();
WorkloadQueryInfo policyQueryInfo = new WorkloadQueryInfo();
policyQueryInfo.queryId = cctx.queryId() == null ? null : DebugUtil.printId(cctx.queryId());
policyQueryInfo.tUniqueId = cctx.queryId();
policyQueryInfo.context = cctx;
policyQueryInfo.metricMap = new HashMap<>();
policyQueryInfo.metricMap.put(WorkloadMetricType.USERNAME, username);
queryInfoList.add(policyQueryInfo);
}
// 2 exec policy
if (queryInfoList.size() > 0) {
execPolicy(queryInfoList);
}
} catch (Throwable t) {
LOG.error("[policy thread]error happens when exec policy");
for (Integer connectId : keySet) {
ConnectContext cctx = connectMap.get(connectId);
if (cctx == null || cctx.isKilled()) {
continue;
}
// 3 sleep
try {
Thread.sleep(Config.workload_sched_policy_interval_ms);
} catch (InterruptedException e) {
LOG.error("error happends when policy exec thread sleep");
}
String username = cctx.getQualifiedUser();
WorkloadQueryInfo policyQueryInfo = new WorkloadQueryInfo();
policyQueryInfo.queryId = cctx.queryId() == null ? null : DebugUtil.printId(cctx.queryId());
policyQueryInfo.tUniqueId = cctx.queryId();
policyQueryInfo.context = cctx;
policyQueryInfo.metricMap = new HashMap<>();
policyQueryInfo.metricMap.put(WorkloadMetricType.USERNAME, username);
queryInfoList.add(policyQueryInfo);
}
}
};
public void start() {
policyExecThread.setName("workload-auto-scheduler-thread");
policyExecThread.start();
// 2 exec policy
if (queryInfoList.size() > 0) {
execPolicy(queryInfoList);
}
} catch (Throwable t) {
LOG.error("[policy thread]error happens when exec policy");
}
}
public void createWorkloadSchedPolicy(CreateWorkloadSchedPolicyStmt createStmt) throws UserException {

View File

@ -19,7 +19,6 @@ package org.apache.doris.resource.workloadgroup;
import org.apache.doris.analysis.AlterWorkloadGroupStmt;
import org.apache.doris.analysis.CreateWorkloadGroupStmt;
import org.apache.doris.analysis.DropWorkloadGroupStmt;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
@ -190,38 +189,6 @@ public class WorkloadGroupMgrTest {
}
}
@Test
public void testDropWorkloadGroup() throws UserException {
Config.enable_workload_group = true;
ConnectContext context = new ConnectContext();
WorkloadGroupMgr workloadGroupMgr = new WorkloadGroupMgr();
Map<String, String> properties = Maps.newHashMap();
properties.put(WorkloadGroup.CPU_SHARE, "10");
properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
String name = "g1";
CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties);
workloadGroupMgr.createWorkloadGroup(createStmt);
context.getSessionVariable().setWorkloadGroup(name);
Assert.assertEquals(1, workloadGroupMgr.getWorkloadGroup(context).size());
DropWorkloadGroupStmt dropStmt = new DropWorkloadGroupStmt(false, name);
workloadGroupMgr.dropWorkloadGroup(dropStmt);
try {
context.getSessionVariable().setWorkloadGroup(name);
workloadGroupMgr.getWorkloadGroup(context);
Assert.fail();
} catch (UserException e) {
Assert.assertTrue(e.getMessage().contains("does not exist"));
}
DropWorkloadGroupStmt dropDefaultStmt = new DropWorkloadGroupStmt(false, WorkloadGroupMgr.DEFAULT_GROUP_NAME);
try {
workloadGroupMgr.dropWorkloadGroup(dropDefaultStmt);
} catch (DdlException e) {
Assert.assertTrue(e.getMessage().contains("is not allowed"));
}
}
@Test
public void testAlterWorkloadGroup() throws UserException {
Config.enable_workload_group = true;

View File

@ -9,6 +9,15 @@
normal 20 50% true 2147483647 0 0 1% 16
test_group 10 10% true 2147483647 0 0 -1 -1
-- !show_del_wg_1 --
normal 20 50% true 2147483647 0 0 1% 16
test_drop_wg 10 0% true 2147483647 0 0 -1 -1
test_group 10 10% true 2147483647 0 0 -1 -1
-- !show_del_wg_2 --
normal 20 50% true 2147483647 0 0 1% 16
test_group 10 10% true 2147483647 0 0 -1 -1
-- !mem_limit_1 --
2

View File

@ -110,6 +110,7 @@ enable_job_schedule_second_for_test = true
enable_workload_group = true
publish_topic_info_interval_ms = 1000
workload_sched_policy_interval_ms = 1000
master_sync_policy = WRITE_NO_SYNC
replica_sync_policy = WRITE_NO_SYNC

View File

@ -128,6 +128,12 @@ suite("test_crud_wlg") {
qt_show_1 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num,tag from information_schema.workload_groups where name in ('normal','test_group') order by name;"
// test drop workload group
sql "create workload group if not exists test_drop_wg properties ('cpu_share'='10')"
qt_show_del_wg_1 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num,tag from information_schema.workload_groups where name in ('normal','test_group','test_drop_wg') order by name;"
sql "drop workload group test_drop_wg"
qt_show_del_wg_2 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num,tag from information_schema.workload_groups where name in ('normal','test_group','test_drop_wg') order by name;"
// test memory_limit
test {
sql "alter workload group test_group properties ( 'memory_limit'='100%' );"