branch-2.1: [improve](routine load) add more metrics to observe the routine load job #48209 (#48765)

Cherry-picked from #48209

Co-authored-by: hui lai <laihui@selectdb.com>
This commit is contained in:
github-actions[bot]
2025-05-05 10:25:37 +08:00
committed by GitHub
parent 6dee1fe623
commit 65151b46aa
9 changed files with 357 additions and 30 deletions

View File

@ -22,6 +22,7 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.proto.InternalService;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.system.Backend;
@ -221,37 +222,46 @@ public class KafkaUtil {
private static InternalService.PProxyResult getInfoRequest(InternalService.PProxyRequest request, int timeout)
throws LoadException {
long startTime = System.currentTimeMillis();
int retryTimes = 0;
TNetworkAddress address = null;
Future<InternalService.PProxyResult> future = null;
InternalService.PProxyResult result = null;
while (retryTimes < 3) {
List<Long> backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
if (backendIds.isEmpty()) {
throw new LoadException("Failed to get info. No alive backends");
}
Collections.shuffle(backendIds);
Backend be = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
try {
while (retryTimes < 3) {
List<Long> backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
if (backendIds.isEmpty()) {
MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_FAIL_COUNT.increase(1L);
throw new LoadException("Failed to get info. No alive backends");
}
Collections.shuffle(backendIds);
Backend be = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
try {
future = BackendServiceProxy.getInstance().getInfo(address, request);
result = future.get(Config.max_get_kafka_meta_timeout_second, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.warn("failed to get info request to " + address + " err " + e.getMessage());
retryTimes++;
continue;
}
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
LOG.warn("failed to get info request to "
+ address + " err " + result.getStatus().getErrorMsgsList());
retryTimes++;
} else {
return result;
try {
future = BackendServiceProxy.getInstance().getInfo(address, request);
result = future.get(Config.max_get_kafka_meta_timeout_second, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.warn("failed to get info request to " + address + " err " + e.getMessage());
retryTimes++;
continue;
}
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
LOG.warn("failed to get info request to "
+ address + " err " + result.getStatus().getErrorMsgsList());
retryTimes++;
} else {
return result;
}
}
MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_FAIL_COUNT.increase(1L);
throw new LoadException("Failed to get info");
} finally {
long endTime = System.currentTimeMillis();
MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_LANTENCY.increase(endTime - startTime);
MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_COUNT.increase(1L);
}
throw new LoadException("Failed to get info");
}
}

View File

@ -809,11 +809,11 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
// if rate of error data is more than max_filter_ratio, pause job
protected void updateProgress(RLTaskTxnCommitAttachment attachment) throws UserException {
updateNumOfData(attachment.getTotalRows(), attachment.getFilteredRows(), attachment.getUnselectedRows(),
attachment.getReceivedBytes(), false /* not replay */);
attachment.getReceivedBytes(), attachment.getTaskExecutionTimeMs(), false /* not replay */);
}
private void updateNumOfData(long numOfTotalRows, long numOfErrorRows, long unselectedRows, long receivedBytes,
boolean isReplay) throws UserException {
long taskExecutionTime, boolean isReplay) throws UserException {
this.jobStatistic.totalRows += numOfTotalRows;
this.jobStatistic.errorRows += numOfErrorRows;
this.jobStatistic.unselectedRows += unselectedRows;
@ -824,6 +824,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
MetricRepo.COUNTER_ROUTINE_LOAD_ROWS.increase(numOfTotalRows);
MetricRepo.COUNTER_ROUTINE_LOAD_ERROR_ROWS.increase(numOfErrorRows);
MetricRepo.COUNTER_ROUTINE_LOAD_RECEIVED_BYTES.increase(receivedBytes);
MetricRepo.COUNTER_ROUTINE_LOAD_TASK_EXECUTE_TIME.increase(taskExecutionTime);
MetricRepo.COUNTER_ROUTINE_LOAD_TASK_EXECUTE_TIME.increase(1L);
}
// check error rate
@ -893,7 +895,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
protected void replayUpdateProgress(RLTaskTxnCommitAttachment attachment) {
try {
updateNumOfData(attachment.getTotalRows(), attachment.getFilteredRows(), attachment.getUnselectedRows(),
attachment.getReceivedBytes(), true /* is replay */);
attachment.getReceivedBytes(), attachment.getTaskExecutionTimeMs(), true /* is replay */);
} catch (UserException e) {
LOG.error("should not happen", e);
}

View File

@ -125,6 +125,11 @@ public final class MetricRepo {
public static LongCounterMetric COUNTER_ROUTINE_LOAD_ROWS;
public static LongCounterMetric COUNTER_ROUTINE_LOAD_RECEIVED_BYTES;
public static LongCounterMetric COUNTER_ROUTINE_LOAD_ERROR_ROWS;
public static LongCounterMetric COUNTER_ROUTINE_LOAD_GET_META_LANTENCY;
public static LongCounterMetric COUNTER_ROUTINE_LOAD_GET_META_COUNT;
public static LongCounterMetric COUNTER_ROUTINE_LOAD_GET_META_FAIL_COUNT;
public static LongCounterMetric COUNTER_ROUTINE_LOAD_TASK_EXECUTE_TIME;
public static LongCounterMetric COUNTER_ROUTINE_LOAD_TASK_EXECUTE_COUNT;
public static LongCounterMetric COUNTER_HIT_SQL_BLOCK_RULE;
public static AutoMappedMetric<LongCounterMetric> THRIFT_COUNTER_RPC_ALL;
@ -500,6 +505,21 @@ public final class MetricRepo {
COUNTER_ROUTINE_LOAD_ERROR_ROWS = new LongCounterMetric("routine_load_error_rows", MetricUnit.ROWS,
"total error rows of routine load");
DORIS_METRIC_REGISTER.addMetrics(COUNTER_ROUTINE_LOAD_ERROR_ROWS);
COUNTER_ROUTINE_LOAD_GET_META_LANTENCY = new LongCounterMetric("routine_load_get_meta_latency",
MetricUnit.MILLISECONDS, "get meta lantency of routine load");
DORIS_METRIC_REGISTER.addMetrics(COUNTER_ROUTINE_LOAD_GET_META_LANTENCY);
COUNTER_ROUTINE_LOAD_GET_META_COUNT = new LongCounterMetric("routine_load_get_meta_count", MetricUnit.NOUNIT,
"get meta count of routine load");
DORIS_METRIC_REGISTER.addMetrics(COUNTER_ROUTINE_LOAD_GET_META_COUNT);
COUNTER_ROUTINE_LOAD_GET_META_FAIL_COUNT = new LongCounterMetric("routine_load_get_meta_fail_count",
MetricUnit.NOUNIT, "get meta fail count of routine load");
DORIS_METRIC_REGISTER.addMetrics(COUNTER_ROUTINE_LOAD_GET_META_FAIL_COUNT);
COUNTER_ROUTINE_LOAD_TASK_EXECUTE_TIME = new LongCounterMetric("routine_load_task_execute_time",
MetricUnit.MILLISECONDS, "task execute time of routine load");
DORIS_METRIC_REGISTER.addMetrics(COUNTER_ROUTINE_LOAD_TASK_EXECUTE_TIME);
COUNTER_ROUTINE_LOAD_TASK_EXECUTE_COUNT = new LongCounterMetric("routine_load_task_execute_count",
MetricUnit.MILLISECONDS, "task execute count of routine load");
DORIS_METRIC_REGISTER.addMetrics(COUNTER_ROUTINE_LOAD_TASK_EXECUTE_COUNT);
COUNTER_HIT_SQL_BLOCK_RULE = new LongCounterMetric("counter_hit_sql_block_rule", MetricUnit.ROWS,
"total hit sql block rule query");

View File

@ -301,7 +301,7 @@ public class RoutineLoadJobTest {
RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob();
Deencapsulation.setField(routineLoadJob, "maxErrorNum", 0);
Deencapsulation.setField(routineLoadJob, "maxBatchRows", 0);
Deencapsulation.invoke(routineLoadJob, "updateNumOfData", 1L, 1L, 0L, 1L, false);
Deencapsulation.invoke(routineLoadJob, "updateNumOfData", 1L, 1L, 0L, 1L, 1L, false);
Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, Deencapsulation.getField(routineLoadJob, "state"));
@ -316,7 +316,7 @@ public class RoutineLoadJobTest {
RoutineLoadStatistic jobStatistic = Deencapsulation.getField(routineLoadJob, "jobStatistic");
Deencapsulation.setField(jobStatistic, "currentErrorRows", 1);
Deencapsulation.setField(jobStatistic, "currentTotalRows", 99);
Deencapsulation.invoke(routineLoadJob, "updateNumOfData", 2L, 0L, 0L, 1L, false);
Deencapsulation.invoke(routineLoadJob, "updateNumOfData", 2L, 0L, 0L, 1L, 1L, false);
Assert.assertEquals(RoutineLoadJob.JobState.RUNNING, Deencapsulation.getField(routineLoadJob, "state"));
Assert.assertEquals(new Long(0), Deencapsulation.getField(jobStatistic, "currentErrorRows"));