Add error load log url for routine load job (#938)

This commit is contained in:
Mingyu Chen
2019-04-16 21:12:21 +08:00
committed by ZHAO Chun
parent 8e0512e88d
commit 2b4d02b2fa
14 changed files with 126 additions and 72 deletions

View File

@ -583,6 +583,7 @@ bool BrokerScanner::fill_dest_tuple(const Slice& line, Tuple* dest_tuple, MemPoo
if (!slot_desc->is_materialized()) {
continue;
}
ExprContext* ctx = _dest_expr_ctx[ctx_idx++];
void* value = ctx->get_value(_src_tuple_row);
if (value == nullptr) {

View File

@ -44,6 +44,9 @@
namespace doris {
std::string to_load_error_http_path(const std::string& file_name) {
if (file_name.empty()) {
return "";
}
std::stringstream url;
url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port
<< "/api/_load_error_log?"

View File

@ -136,6 +136,7 @@ Status KafkaDataConsumer::group_consume(
<< ", max running time(ms): " << left_time;
int64_t received_rows = 0;
int64_t put_rows = 0;
Status st = Status::OK;
MonotonicStopWatch consumer_watch;
MonotonicStopWatch watch;
@ -158,6 +159,8 @@ Status KafkaDataConsumer::group_consume(
if (!queue->blocking_put(msg)) {
// queue is shutdown
done = true;
} else {
++put_rows;
}
++received_rows;
break;
@ -183,7 +186,8 @@ Status KafkaDataConsumer::group_consume(
<< ", left time(ms): " << left_time
<< ", total cost(ms): " << watch.elapsed_time() / 1000 / 1000
<< ", consume cost(ms): " << consumer_watch.elapsed_time() / 1000 / 1000
<< ", received rows: " << received_rows;
<< ", received rows: " << received_rows
<< ", put rows: " << put_rows;
return st;
}

View File

@ -65,6 +65,7 @@ RuntimeState::RuntimeState(
_num_print_error_rows(0),
_normal_row_number(0),
_error_row_number(0),
_error_log_file_path(""),
_error_log_file(nullptr),
_instance_buffer_reservation(new ReservationTracker) {
Status status = init(fragment_instance_id, query_options, now, exec_env);
@ -91,6 +92,7 @@ RuntimeState::RuntimeState(
_num_print_error_rows(0),
_normal_row_number(0),
_error_row_number(0),
_error_log_file_path(""),
_error_log_file(nullptr),
_instance_buffer_reservation(new ReservationTracker) {
Status status = init(fragment_params.params.fragment_instance_id, query_options, now, exec_env);

View File

@ -154,7 +154,7 @@ public:
int64_t loaded_bytes = 0;
int64_t start_nanos = 0;
int64_t load_cost_nanos = 0;
std::string error_url;
std::string error_url = "";
KafkaLoadInfo* kafka_info = nullptr;

View File

@ -66,10 +66,10 @@ Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) {
if (ctx->number_filtered_rows > 0 &&
!executor->runtime_state()->get_error_log_file_path().empty()) {
if (ctx->load_type == TLoadType::MANUL_LOAD) {
// if (ctx->load_type == TLoadType::MANUL_LOAD) {
ctx->error_url = to_load_error_http_path(
executor->runtime_state()->get_error_log_file_path());
}
// }
}
} else {
LOG(WARNING) << "fragment execute failed"
@ -180,7 +180,6 @@ void StreamLoadExecutor::rollback_txn(StreamLoadContext* ctx) {
request.tbl = ctx->table;
request.txnId = ctx->txn_id;
request.__set_reason(ctx->status.get_error_msg());
TLoadTxnRollbackResult result;
// set attachment if has
TTxnCommitAttachment attachment;
@ -189,6 +188,7 @@ void StreamLoadExecutor::rollback_txn(StreamLoadContext* ctx) {
request.__isset.txnCommitAttachment = true;
}
TLoadTxnRollbackResult result;
#ifndef BE_TEST
auto rpc_st = FrontendHelper::rpc(
master_addr.hostname, master_addr.port,
@ -230,6 +230,10 @@ bool StreamLoadExecutor::collect_load_stat(StreamLoadContext* ctx, TTxnCommitAtt
rl_attach.kafkaRLTaskProgress = std::move(kafka_progress);
rl_attach.__isset.kafkaRLTaskProgress = true;
if (!ctx->error_url.empty()) {
rl_attach.__set_errorLogUrl(ctx->error_url);
}
attach->rlTaskTxnCommitAttachment = std::move(rl_attach);
attach->__isset.rlTaskTxnCommitAttachment = true;

View File

@ -78,6 +78,7 @@ public class ShowRoutineLoadStmt extends ShowStmt {
.add("Statistic")
.add("Progress")
.add("ReasonOfStateChanged")
.add("ErrorLogUrls")
.build();
private final LabelName labelName;

View File

@ -38,6 +38,7 @@ public class RLTaskTxnCommitAttachment extends TxnCommitAttachment {
private long receivedBytes;
private long taskExecutionTimeMs;
private RoutineLoadProgress progress;
private String errorLogUrl;
public RLTaskTxnCommitAttachment() {
super(TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK);
@ -60,6 +61,10 @@ public class RLTaskTxnCommitAttachment extends TxnCommitAttachment {
default:
break;
}
if (rlTaskTxnCommitAttachment.isSetErrorLogUrl()) {
this.errorLogUrl = rlTaskTxnCommitAttachment.getErrorLogUrl();
}
}
public TUniqueId getTaskId() {
@ -94,6 +99,10 @@ public class RLTaskTxnCommitAttachment extends TxnCommitAttachment {
return progress;
}
public String getErrorLogUrl() {
return errorLogUrl;
}
@Override
public String toString() {
return "RLTaskTxnCommitAttachment [filteredRows=" + filteredRows

View File

@ -53,6 +53,8 @@ import org.apache.doris.transaction.TxnStateChangeListener;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.EvictingQueue;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
@ -69,6 +71,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -196,6 +199,9 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable
protected ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
// TODO(ml): error sample
// save the latest 3 error log urls
private Queue<String> errorLogUrls = EvictingQueue.create(3);
protected boolean isTypeRead = false;
public void setTypeRead(boolean isTypeRead) {
@ -280,8 +286,13 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable
lock.writeLock().unlock();
}
public void tryWriteLock() throws InterruptedException {
lock.writeLock().tryLock(5, TimeUnit.SECONDS);
public boolean tryWriteLock(long timeout, TimeUnit unit) {
try {
return this.lock.writeLock().tryLock(timeout, unit);
} catch (InterruptedException e) {
LOG.warn("failed to try write lock at db[" + id + "]", e);
return false;
}
}
public String getName() {
@ -431,7 +442,7 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable
public boolean containsTask(UUID taskId) {
readLock();
try {
return routineLoadTaskInfoList.parallelStream()
return routineLoadTaskInfoList.stream()
.anyMatch(entity -> entity.getId().equals(taskId));
} finally {
readUnlock();
@ -549,7 +560,7 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable
}
// if task not exists, before aborted will throw exception
// if task pass the checker, lock job will be locked
// if task pass the checker, job lock will be locked
// if tryLock timeout, txn will be aborted by timeout progress.
// *** Please do not call before individually. It must be combined use with after ***
@Override
@ -574,32 +585,38 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable
executeBeforeCheck(txnState, TransactionStatus.COMMITTED);
}
/*
* try lock the write lock.
* Make sure lock is released if any exception being thrown
*/
private void executeBeforeCheck(TransactionState txnState, TransactionStatus transactionStatus)
throws TransactionException {
if (!tryWriteLock(2000, TimeUnit.MILLISECONDS)) {
// The lock of job has been locked by another thread more then timeout seconds.
// The commit txn by thread2 will be failed after waiting for timeout seconds.
// Maybe thread1 hang on somewhere
LOG.warn(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel()).add("job_id", id).add("txn_status",
transactionStatus).add("error_msg",
"txn could not be transformed while waiting for timeout of routine load job"));
throw new TransactionException("txn " + txnState.getTransactionId() + "could not be " + transactionStatus
+ "while waiting for timeout of routine load job.");
}
// task already pass the checker
try {
tryWriteLock();
// check if task has been aborted
Optional<RoutineLoadTaskInfo> routineLoadTaskInfoOptional =
routineLoadTaskInfoList.parallelStream()
routineLoadTaskInfoList.stream()
.filter(entity -> entity.getTxnId() == txnState.getTransactionId()).findFirst();
if (!routineLoadTaskInfoOptional.isPresent()) {
writeUnlock();
throw new TransactionException("txn " + txnState.getTransactionId()
+ " could not be " + transactionStatus
+ " while task " + txnState.getLabel() + " has been aborted.");
}
} catch (InterruptedException e) {
// The lock of job has been locked by thread1 more then timeout seconds.
// The commit txn by thread2 will be failed after waiting for timeout seconds.
// Maybe thread1 hang on somewhere
LOG.warn(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel())
.add("job_id", id)
.add("txn_status", transactionStatus)
.add("error_msg", "txn could not be transformed "
+ "while waiting for timeout of routine load job"));
throw new TransactionException("txn " + txnState.getTransactionId() + "could not be " + transactionStatus
+ "while waiting for timeout of routine load job.", e);
} catch (TransactionException e) {
writeUnlock();
throw e;
}
}
@ -614,7 +631,7 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable
try {
if (txnOperated) {
// find task in job
Optional<RoutineLoadTaskInfo> routineLoadTaskInfoOptional = routineLoadTaskInfoList.parallelStream().filter(
Optional<RoutineLoadTaskInfo> routineLoadTaskInfoOptional = routineLoadTaskInfoList.stream().filter(
entity -> entity.getTxnId() == txnState.getTransactionId()).findFirst();
RoutineLoadTaskInfo routineLoadTaskInfo = routineLoadTaskInfoOptional.get();
taskBeId = routineLoadTaskInfo.getBeId();
@ -622,7 +639,7 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable
++committedTaskNum;
}
} catch (Throwable e) {
LOG.warn(e.getMessage(), e);
LOG.warn("after committed failed", e);
updateState(JobState.PAUSED, "be " + taskBeId + " commit task failed " + txnState.getLabel()
+ " with error " + e.getMessage()
+ " while transaction " + txnState.getTransactionId() + " has been committed", false /* not replay */);
@ -676,7 +693,7 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable
break;
}
}
// todo(ml): use previous be id depend on change reason
// TODO(ml): use previous be id depend on change reason
}
// step2: commit task , update progress, maybe create a new task
executeCommitTask(routineLoadTaskInfo, txnState);
@ -722,6 +739,10 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable
updateProgress(rlTaskTxnCommitAttachment);
}
if (!Strings.isNullOrEmpty(rlTaskTxnCommitAttachment.getErrorLogUrl())) {
errorLogUrls.add(rlTaskTxnCommitAttachment.getErrorLogUrl());
}
if (state == JobState.RUNNING) {
// step2: create a new task for partitions
RoutineLoadTaskInfo newRoutineLoadTaskInfo = unprotectRenewTask(routineLoadTaskInfo);
@ -925,31 +946,37 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable
}
}
List<String> row = Lists.newArrayList();
row.add(String.valueOf(id));
row.add(name);
row.add(TimeUtils.longToTimeString(createTimestamp));
row.add(TimeUtils.longToTimeString(endTimestamp));
row.add(db == null ? String.valueOf(dbId) : db.getFullName());
row.add(tbl == null ? String.valueOf(tableId) : tbl.getName());
row.add(getState().name());
row.add(dataSourceType.name());
row.add(String.valueOf(getSizeOfRoutineLoadTaskInfoList()));
row.add(jobPropertiesToJsonString());
row.add(dataSourcePropertiesJsonToString());
row.add(getStatistic());
row.add(getProgress().toJsonString());
switch (state) {
case PAUSED:
row.add(pausedReason);
break;
case CANCELLED:
row.add(cancelReason);
break;
default:
row.add("");
readLock();
try {
List<String> row = Lists.newArrayList();
row.add(String.valueOf(id));
row.add(name);
row.add(TimeUtils.longToTimeString(createTimestamp));
row.add(TimeUtils.longToTimeString(endTimestamp));
row.add(db == null ? String.valueOf(dbId) : db.getFullName());
row.add(tbl == null ? String.valueOf(tableId) : tbl.getName());
row.add(getState().name());
row.add(dataSourceType.name());
row.add(String.valueOf(getSizeOfRoutineLoadTaskInfoList()));
row.add(jobPropertiesToJsonString());
row.add(dataSourcePropertiesJsonToString());
row.add(getStatistic());
row.add(getProgress().toJsonString());
switch (state) {
case PAUSED:
row.add(pausedReason);
break;
case CANCELLED:
row.add(cancelReason);
break;
default:
row.add("");
}
row.add(Joiner.on(", ").join(errorLogUrls));
return row;
} finally {
readUnlock();
}
return row;
}
public List<List<String>> getTasksShowInfo() {

View File

@ -539,11 +539,9 @@ public class FrontendServiceImpl implements FrontendService.Iface {
@Override
public TLoadTxnBeginResult loadTxnBegin(TLoadTxnBeginRequest request) throws TException {
TNetworkAddress clientAddr = getClientAddr();
LOG.info("receive txn begin request, db: {}, tbl: {}, label: {}, backend: {}",
request.getDb(), request.getTbl(), request.getLabel(),
clientAddr == null ? "unknown" : clientAddr.getHostname());
LOG.debug("txn begin request: {}", request);
TLoadTxnBeginResult result = new TLoadTxnBeginResult();
@ -600,8 +598,10 @@ public class FrontendServiceImpl implements FrontendService.Iface {
@Override
public TLoadTxnCommitResult loadTxnCommit(TLoadTxnCommitRequest request) throws TException {
LOG.info("receive txn commit request. db: {}, tbl: {}, txn id: {}",
request.getDb(), request.getTbl(), request.getTxnId());
TNetworkAddress clientAddr = getClientAddr();
LOG.info("receive txn commit request. db: {}, tbl: {}, txn id: {}, backend: {}",
request.getDb(), request.getTbl(), request.getTxnId(),
clientAddr == null ? "unknown" : clientAddr.getHostname());
LOG.debug("txn commit request: {}", request);
TLoadTxnCommitResult result = new TLoadTxnCommitResult();
@ -659,8 +659,10 @@ public class FrontendServiceImpl implements FrontendService.Iface {
@Override
public TLoadTxnRollbackResult loadTxnRollback(TLoadTxnRollbackRequest request) throws TException {
LOG.info("receive txn rollback request. db: {}, tbl: {}, txn id: {}, reason: {}",
request.getDb(), request.getTbl(), request.getTxnId(), request.getReason());
TNetworkAddress clientAddr = getClientAddr();
LOG.info("receive txn rollback request. db: {}, tbl: {}, txn id: {}, reason: {}, backend: {}",
request.getDb(), request.getTbl(), request.getTxnId(), request.getReason(),
clientAddr == null ? "unknown" : clientAddr.getHostname());
LOG.debug("txn rollback request: {}", request);
TLoadTxnRollbackResult result = new TLoadTxnRollbackResult();

View File

@ -370,6 +370,7 @@ public class GlobalTransactionMgr {
unprotectedCommitTransaction(transactionState, errorReplicaIds, tableToPartition, totalInvolvedBackends,
db);
} catch (Throwable e) {
LOG.warn("unexpected exception", e);
txnOperated = false;
throw e;
} finally {
@ -866,7 +867,7 @@ public class GlobalTransactionMgr {
if (transactionState.getTransactionStatus() != TransactionStatus.PREPARE) {
return;
}
// 4. update transaction state version
// update transaction state version
transactionState.setCommitTime(System.currentTimeMillis());
transactionState.setTransactionStatus(TransactionStatus.COMMITTED);
transactionState.setErrorReplicas(errorReplicaIds);
@ -882,7 +883,7 @@ public class GlobalTransactionMgr {
}
transactionState.putIdToTableCommitInfo(tableId, tableCommitInfo);
}
// 5. persistent transactionState
// persist transactionState
unprotectUpsertTransactionState(transactionState);
// add publish version tasks. set task to null as a placeholder.
@ -891,10 +892,6 @@ public class GlobalTransactionMgr {
transactionState.addPublishVersionTask(backendId, null);
}
}
private void unprotectAbortTransaction(long transactionId, String reason) throws UserException {
unprotectAbortTransaction(transactionId, reason, null);
}
private void unprotectAbortTransaction(long transactionId, String reason, TxnCommitAttachment txnCommitAttachment)
throws UserException {

View File

@ -25,7 +25,6 @@ import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.task.PublishVersionTask;
import org.apache.doris.transaction.TxnStateChangeListener.ListenResult;
import com.google.common.base.Joiner;
import com.google.common.collect.Maps;
@ -141,6 +140,8 @@ public class TransactionState implements Writable {
// optional
private TxnCommitAttachment txnCommitAttachment;
private String errorLogUrl = null;
public TransactionState() {
this.dbId = -1;
this.transactionId = -1;
@ -272,6 +273,14 @@ public class TransactionState implements Writable {
listenerId = -1;
}
public void setErrorLogUrl(String errorLogUrl) {
this.errorLogUrl = errorLogUrl;
}
public String getErrorLogUrl() {
return errorLogUrl;
}
public void setTransactionStatus(TransactionStatus transactionStatus) {
// status changed
this.preStatus = this.transactionStatus;

View File

@ -27,7 +27,6 @@ import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.persist.EditLog;
import org.apache.doris.transaction.TransactionException;
import org.apache.doris.transaction.TransactionState;
@ -40,20 +39,15 @@ import org.apache.kafka.common.PartitionInfo;
import org.junit.Assert;
import org.junit.Test;
import java.io.DataInput;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java_cup.runtime.Symbol;
import mockit.Deencapsulation;
import mockit.Expectations;
import mockit.Injectable;
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;
public class RoutineLoadJobTest {
@ -94,7 +88,7 @@ public class RoutineLoadJobTest {
RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob();
Deencapsulation.setField(routineLoadJob, "routineLoadTaskInfoList", routineLoadTaskInfoList);
Deencapsulation.setField(routineLoadJob, "lock", lock);
routineLoadJob.afterAborted(transactionState, txnStatusChangeReasonString);
routineLoadJob.afterAborted(transactionState, true, txnStatusChangeReasonString);
Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, routineLoadJob.getState());
}
@ -127,7 +121,7 @@ public class RoutineLoadJobTest {
Deencapsulation.setField(routineLoadJob, "routineLoadTaskInfoList", routineLoadTaskInfoList);
Deencapsulation.setField(routineLoadJob, "progress", progress);
Deencapsulation.setField(routineLoadJob, "lock", lock);
routineLoadJob.afterAborted(transactionState, txnStatusChangeReasonString);
routineLoadJob.afterAborted(transactionState, true, txnStatusChangeReasonString);
Assert.assertEquals(RoutineLoadJob.JobState.RUNNING, routineLoadJob.getState());
Assert.assertEquals(new Long(1), Deencapsulation.getField(routineLoadJob, "abortedTaskNum"));
@ -154,7 +148,7 @@ public class RoutineLoadJobTest {
Deencapsulation.setField(routineLoadJob, "progress", progress);
Deencapsulation.setField(routineLoadJob, "lock", lock);
try {
routineLoadJob.afterCommitted(transactionState);
routineLoadJob.afterCommitted(transactionState, true);
Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, routineLoadJob.getState());
} catch (TransactionException e) {
Assert.fail();

View File

@ -518,6 +518,7 @@ struct TRLTaskTxnCommitAttachment {
8: optional i64 loadedBytes
9: optional i64 loadCostMs
10: optional TKafkaRLTaskProgress kafkaRLTaskProgress
11: optional string errorLogUrl
}
struct TTxnCommitAttachment {