From 2b4d02b2fa0bfd5d9fb32fd4d53ad5b5adc768c6 Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Tue, 16 Apr 2019 21:12:21 +0800 Subject: [PATCH] Add error load log url for routine load job (#938) --- be/src/exec/broker_scanner.cpp | 1 + be/src/runtime/fragment_mgr.cpp | 3 + be/src/runtime/routine_load/data_consumer.cpp | 6 +- be/src/runtime/runtime_state.cpp | 2 + .../runtime/stream_load/stream_load_context.h | 2 +- .../stream_load/stream_load_executor.cpp | 10 +- .../doris/analysis/ShowRoutineLoadStmt.java | 1 + .../RLTaskTxnCommitAttachment.java | 9 ++ .../load/routineload/RoutineLoadJob.java | 117 +++++++++++------- .../doris/service/FrontendServiceImpl.java | 14 ++- .../transaction/GlobalTransactionMgr.java | 9 +- .../doris/transaction/TransactionState.java | 11 +- .../load/routineload/RoutineLoadJobTest.java | 12 +- gensrc/thrift/FrontendService.thrift | 1 + 14 files changed, 126 insertions(+), 72 deletions(-) diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp index 7c7d4f5bee..1b5fdd26a1 100644 --- a/be/src/exec/broker_scanner.cpp +++ b/be/src/exec/broker_scanner.cpp @@ -583,6 +583,7 @@ bool BrokerScanner::fill_dest_tuple(const Slice& line, Tuple* dest_tuple, MemPoo if (!slot_desc->is_materialized()) { continue; } + ExprContext* ctx = _dest_expr_ctx[ctx_idx++]; void* value = ctx->get_value(_src_tuple_row); if (value == nullptr) { diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 78358ea3c8..d443cabc6e 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -44,6 +44,9 @@ namespace doris { std::string to_load_error_http_path(const std::string& file_name) { + if (file_name.empty()) { + return ""; + } std::stringstream url; url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port << "/api/_load_error_log?" diff --git a/be/src/runtime/routine_load/data_consumer.cpp b/be/src/runtime/routine_load/data_consumer.cpp index ce0d08e6c4..50260ecb7e 100644 --- a/be/src/runtime/routine_load/data_consumer.cpp +++ b/be/src/runtime/routine_load/data_consumer.cpp @@ -136,6 +136,7 @@ Status KafkaDataConsumer::group_consume( << ", max running time(ms): " << left_time; int64_t received_rows = 0; + int64_t put_rows = 0; Status st = Status::OK; MonotonicStopWatch consumer_watch; MonotonicStopWatch watch; @@ -158,6 +159,8 @@ Status KafkaDataConsumer::group_consume( if (!queue->blocking_put(msg)) { // queue is shutdown done = true; + } else { + ++put_rows; } ++received_rows; break; @@ -183,7 +186,8 @@ Status KafkaDataConsumer::group_consume( << ", left time(ms): " << left_time << ", total cost(ms): " << watch.elapsed_time() / 1000 / 1000 << ", consume cost(ms): " << consumer_watch.elapsed_time() / 1000 / 1000 - << ", received rows: " << received_rows; + << ", received rows: " << received_rows + << ", put rows: " << put_rows; return st; } diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 9528d68209..6533404c64 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -65,6 +65,7 @@ RuntimeState::RuntimeState( _num_print_error_rows(0), _normal_row_number(0), _error_row_number(0), + _error_log_file_path(""), _error_log_file(nullptr), _instance_buffer_reservation(new ReservationTracker) { Status status = init(fragment_instance_id, query_options, now, exec_env); @@ -91,6 +92,7 @@ RuntimeState::RuntimeState( _num_print_error_rows(0), _normal_row_number(0), _error_row_number(0), + _error_log_file_path(""), _error_log_file(nullptr), _instance_buffer_reservation(new ReservationTracker) { Status status = init(fragment_params.params.fragment_instance_id, query_options, now, exec_env); diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index b4e278bf08..0afb271034 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -154,7 +154,7 @@ public: int64_t loaded_bytes = 0; int64_t start_nanos = 0; int64_t load_cost_nanos = 0; - std::string error_url; + std::string error_url = ""; KafkaLoadInfo* kafka_info = nullptr; diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index ffa2680a84..194709588b 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -66,10 +66,10 @@ Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) { if (ctx->number_filtered_rows > 0 && !executor->runtime_state()->get_error_log_file_path().empty()) { - if (ctx->load_type == TLoadType::MANUL_LOAD) { + // if (ctx->load_type == TLoadType::MANUL_LOAD) { ctx->error_url = to_load_error_http_path( executor->runtime_state()->get_error_log_file_path()); - } + // } } } else { LOG(WARNING) << "fragment execute failed" @@ -180,7 +180,6 @@ void StreamLoadExecutor::rollback_txn(StreamLoadContext* ctx) { request.tbl = ctx->table; request.txnId = ctx->txn_id; request.__set_reason(ctx->status.get_error_msg()); - TLoadTxnRollbackResult result; // set attachment if has TTxnCommitAttachment attachment; @@ -189,6 +188,7 @@ void StreamLoadExecutor::rollback_txn(StreamLoadContext* ctx) { request.__isset.txnCommitAttachment = true; } + TLoadTxnRollbackResult result; #ifndef BE_TEST auto rpc_st = FrontendHelper::rpc( master_addr.hostname, master_addr.port, @@ -230,6 +230,10 @@ bool StreamLoadExecutor::collect_load_stat(StreamLoadContext* ctx, TTxnCommitAtt rl_attach.kafkaRLTaskProgress = std::move(kafka_progress); rl_attach.__isset.kafkaRLTaskProgress = true; + if (!ctx->error_url.empty()) { + rl_attach.__set_errorLogUrl(ctx->error_url); + } + attach->rlTaskTxnCommitAttachment = std::move(rl_attach); attach->__isset.rlTaskTxnCommitAttachment = true; diff --git a/fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadStmt.java b/fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadStmt.java index 6e81ac850a..8b54b6730b 100644 --- a/fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadStmt.java @@ -78,6 +78,7 @@ public class ShowRoutineLoadStmt extends ShowStmt { .add("Statistic") .add("Progress") .add("ReasonOfStateChanged") + .add("ErrorLogUrls") .build(); private final LabelName labelName; diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RLTaskTxnCommitAttachment.java b/fe/src/main/java/org/apache/doris/load/routineload/RLTaskTxnCommitAttachment.java index b0577f04b8..ef6a7f38d3 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RLTaskTxnCommitAttachment.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RLTaskTxnCommitAttachment.java @@ -38,6 +38,7 @@ public class RLTaskTxnCommitAttachment extends TxnCommitAttachment { private long receivedBytes; private long taskExecutionTimeMs; private RoutineLoadProgress progress; + private String errorLogUrl; public RLTaskTxnCommitAttachment() { super(TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK); @@ -60,6 +61,10 @@ public class RLTaskTxnCommitAttachment extends TxnCommitAttachment { default: break; } + + if (rlTaskTxnCommitAttachment.isSetErrorLogUrl()) { + this.errorLogUrl = rlTaskTxnCommitAttachment.getErrorLogUrl(); + } } public TUniqueId getTaskId() { @@ -94,6 +99,10 @@ public class RLTaskTxnCommitAttachment extends TxnCommitAttachment { return progress; } + public String getErrorLogUrl() { + return errorLogUrl; + } + @Override public String toString() { return "RLTaskTxnCommitAttachment [filteredRows=" + filteredRows diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 89a18f710c..76660703ce 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -53,6 +53,8 @@ import org.apache.doris.transaction.TxnStateChangeListener; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.EvictingQueue; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.gson.Gson; @@ -69,6 +71,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Queue; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -196,6 +199,9 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable protected ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); // TODO(ml): error sample + // save the latest 3 error log urls + private Queue errorLogUrls = EvictingQueue.create(3); + protected boolean isTypeRead = false; public void setTypeRead(boolean isTypeRead) { @@ -280,8 +286,13 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable lock.writeLock().unlock(); } - public void tryWriteLock() throws InterruptedException { - lock.writeLock().tryLock(5, TimeUnit.SECONDS); + public boolean tryWriteLock(long timeout, TimeUnit unit) { + try { + return this.lock.writeLock().tryLock(timeout, unit); + } catch (InterruptedException e) { + LOG.warn("failed to try write lock at db[" + id + "]", e); + return false; + } } public String getName() { @@ -431,7 +442,7 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable public boolean containsTask(UUID taskId) { readLock(); try { - return routineLoadTaskInfoList.parallelStream() + return routineLoadTaskInfoList.stream() .anyMatch(entity -> entity.getId().equals(taskId)); } finally { readUnlock(); @@ -549,7 +560,7 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable } // if task not exists, before aborted will throw exception - // if task pass the checker, lock job will be locked + // if task pass the checker, job lock will be locked // if tryLock timeout, txn will be aborted by timeout progress. // *** Please do not call before individually. It must be combined use with after *** @Override @@ -574,32 +585,38 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable executeBeforeCheck(txnState, TransactionStatus.COMMITTED); } + /* + * try lock the write lock. + * Make sure lock is released if any exception being thrown + */ private void executeBeforeCheck(TransactionState txnState, TransactionStatus transactionStatus) throws TransactionException { + if (!tryWriteLock(2000, TimeUnit.MILLISECONDS)) { + // The lock of job has been locked by another thread more then timeout seconds. + // The commit txn by thread2 will be failed after waiting for timeout seconds. + // Maybe thread1 hang on somewhere + LOG.warn(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel()).add("job_id", id).add("txn_status", + transactionStatus).add("error_msg", + "txn could not be transformed while waiting for timeout of routine load job")); + throw new TransactionException("txn " + txnState.getTransactionId() + "could not be " + transactionStatus + + "while waiting for timeout of routine load job."); + } + // task already pass the checker try { - tryWriteLock(); // check if task has been aborted Optional routineLoadTaskInfoOptional = - routineLoadTaskInfoList.parallelStream() + routineLoadTaskInfoList.stream() .filter(entity -> entity.getTxnId() == txnState.getTransactionId()).findFirst(); if (!routineLoadTaskInfoOptional.isPresent()) { - writeUnlock(); + throw new TransactionException("txn " + txnState.getTransactionId() + " could not be " + transactionStatus + " while task " + txnState.getLabel() + " has been aborted."); } - } catch (InterruptedException e) { - // The lock of job has been locked by thread1 more then timeout seconds. - // The commit txn by thread2 will be failed after waiting for timeout seconds. - // Maybe thread1 hang on somewhere - LOG.warn(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel()) - .add("job_id", id) - .add("txn_status", transactionStatus) - .add("error_msg", "txn could not be transformed " - + "while waiting for timeout of routine load job")); - throw new TransactionException("txn " + txnState.getTransactionId() + "could not be " + transactionStatus - + "while waiting for timeout of routine load job.", e); + } catch (TransactionException e) { + writeUnlock(); + throw e; } } @@ -614,7 +631,7 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable try { if (txnOperated) { // find task in job - Optional routineLoadTaskInfoOptional = routineLoadTaskInfoList.parallelStream().filter( + Optional routineLoadTaskInfoOptional = routineLoadTaskInfoList.stream().filter( entity -> entity.getTxnId() == txnState.getTransactionId()).findFirst(); RoutineLoadTaskInfo routineLoadTaskInfo = routineLoadTaskInfoOptional.get(); taskBeId = routineLoadTaskInfo.getBeId(); @@ -622,7 +639,7 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable ++committedTaskNum; } } catch (Throwable e) { - LOG.warn(e.getMessage(), e); + LOG.warn("after committed failed", e); updateState(JobState.PAUSED, "be " + taskBeId + " commit task failed " + txnState.getLabel() + " with error " + e.getMessage() + " while transaction " + txnState.getTransactionId() + " has been committed", false /* not replay */); @@ -676,7 +693,7 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable break; } } - // todo(ml): use previous be id depend on change reason + // TODO(ml): use previous be id depend on change reason } // step2: commit task , update progress, maybe create a new task executeCommitTask(routineLoadTaskInfo, txnState); @@ -722,6 +739,10 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable updateProgress(rlTaskTxnCommitAttachment); } + if (!Strings.isNullOrEmpty(rlTaskTxnCommitAttachment.getErrorLogUrl())) { + errorLogUrls.add(rlTaskTxnCommitAttachment.getErrorLogUrl()); + } + if (state == JobState.RUNNING) { // step2: create a new task for partitions RoutineLoadTaskInfo newRoutineLoadTaskInfo = unprotectRenewTask(routineLoadTaskInfo); @@ -925,31 +946,37 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable } } - List row = Lists.newArrayList(); - row.add(String.valueOf(id)); - row.add(name); - row.add(TimeUtils.longToTimeString(createTimestamp)); - row.add(TimeUtils.longToTimeString(endTimestamp)); - row.add(db == null ? String.valueOf(dbId) : db.getFullName()); - row.add(tbl == null ? String.valueOf(tableId) : tbl.getName()); - row.add(getState().name()); - row.add(dataSourceType.name()); - row.add(String.valueOf(getSizeOfRoutineLoadTaskInfoList())); - row.add(jobPropertiesToJsonString()); - row.add(dataSourcePropertiesJsonToString()); - row.add(getStatistic()); - row.add(getProgress().toJsonString()); - switch (state) { - case PAUSED: - row.add(pausedReason); - break; - case CANCELLED: - row.add(cancelReason); - break; - default: - row.add(""); + readLock(); + try { + List row = Lists.newArrayList(); + row.add(String.valueOf(id)); + row.add(name); + row.add(TimeUtils.longToTimeString(createTimestamp)); + row.add(TimeUtils.longToTimeString(endTimestamp)); + row.add(db == null ? String.valueOf(dbId) : db.getFullName()); + row.add(tbl == null ? String.valueOf(tableId) : tbl.getName()); + row.add(getState().name()); + row.add(dataSourceType.name()); + row.add(String.valueOf(getSizeOfRoutineLoadTaskInfoList())); + row.add(jobPropertiesToJsonString()); + row.add(dataSourcePropertiesJsonToString()); + row.add(getStatistic()); + row.add(getProgress().toJsonString()); + switch (state) { + case PAUSED: + row.add(pausedReason); + break; + case CANCELLED: + row.add(cancelReason); + break; + default: + row.add(""); + } + row.add(Joiner.on(", ").join(errorLogUrls)); + return row; + } finally { + readUnlock(); } - return row; } public List> getTasksShowInfo() { diff --git a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 0ef9ee47dc..f4202a7a29 100644 --- a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -539,11 +539,9 @@ public class FrontendServiceImpl implements FrontendService.Iface { @Override public TLoadTxnBeginResult loadTxnBegin(TLoadTxnBeginRequest request) throws TException { TNetworkAddress clientAddr = getClientAddr(); - LOG.info("receive txn begin request, db: {}, tbl: {}, label: {}, backend: {}", request.getDb(), request.getTbl(), request.getLabel(), clientAddr == null ? "unknown" : clientAddr.getHostname()); - LOG.debug("txn begin request: {}", request); TLoadTxnBeginResult result = new TLoadTxnBeginResult(); @@ -600,8 +598,10 @@ public class FrontendServiceImpl implements FrontendService.Iface { @Override public TLoadTxnCommitResult loadTxnCommit(TLoadTxnCommitRequest request) throws TException { - LOG.info("receive txn commit request. db: {}, tbl: {}, txn id: {}", - request.getDb(), request.getTbl(), request.getTxnId()); + TNetworkAddress clientAddr = getClientAddr(); + LOG.info("receive txn commit request. db: {}, tbl: {}, txn id: {}, backend: {}", + request.getDb(), request.getTbl(), request.getTxnId(), + clientAddr == null ? "unknown" : clientAddr.getHostname()); LOG.debug("txn commit request: {}", request); TLoadTxnCommitResult result = new TLoadTxnCommitResult(); @@ -659,8 +659,10 @@ public class FrontendServiceImpl implements FrontendService.Iface { @Override public TLoadTxnRollbackResult loadTxnRollback(TLoadTxnRollbackRequest request) throws TException { - LOG.info("receive txn rollback request. db: {}, tbl: {}, txn id: {}, reason: {}", - request.getDb(), request.getTbl(), request.getTxnId(), request.getReason()); + TNetworkAddress clientAddr = getClientAddr(); + LOG.info("receive txn rollback request. db: {}, tbl: {}, txn id: {}, reason: {}, backend: {}", + request.getDb(), request.getTbl(), request.getTxnId(), request.getReason(), + clientAddr == null ? "unknown" : clientAddr.getHostname()); LOG.debug("txn rollback request: {}", request); TLoadTxnRollbackResult result = new TLoadTxnRollbackResult(); diff --git a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index 3965e7cfd4..a59b6a9a24 100644 --- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -370,6 +370,7 @@ public class GlobalTransactionMgr { unprotectedCommitTransaction(transactionState, errorReplicaIds, tableToPartition, totalInvolvedBackends, db); } catch (Throwable e) { + LOG.warn("unexpected exception", e); txnOperated = false; throw e; } finally { @@ -866,7 +867,7 @@ public class GlobalTransactionMgr { if (transactionState.getTransactionStatus() != TransactionStatus.PREPARE) { return; } - // 4. update transaction state version + // update transaction state version transactionState.setCommitTime(System.currentTimeMillis()); transactionState.setTransactionStatus(TransactionStatus.COMMITTED); transactionState.setErrorReplicas(errorReplicaIds); @@ -882,7 +883,7 @@ public class GlobalTransactionMgr { } transactionState.putIdToTableCommitInfo(tableId, tableCommitInfo); } - // 5. persistent transactionState + // persist transactionState unprotectUpsertTransactionState(transactionState); // add publish version tasks. set task to null as a placeholder. @@ -891,10 +892,6 @@ public class GlobalTransactionMgr { transactionState.addPublishVersionTask(backendId, null); } } - - private void unprotectAbortTransaction(long transactionId, String reason) throws UserException { - unprotectAbortTransaction(transactionId, reason, null); - } private void unprotectAbortTransaction(long transactionId, String reason, TxnCommitAttachment txnCommitAttachment) throws UserException { diff --git a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java index 1591da803b..b7aa0d0b94 100644 --- a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -25,7 +25,6 @@ import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.metric.MetricRepo; import org.apache.doris.task.PublishVersionTask; -import org.apache.doris.transaction.TxnStateChangeListener.ListenResult; import com.google.common.base.Joiner; import com.google.common.collect.Maps; @@ -141,6 +140,8 @@ public class TransactionState implements Writable { // optional private TxnCommitAttachment txnCommitAttachment; + private String errorLogUrl = null; + public TransactionState() { this.dbId = -1; this.transactionId = -1; @@ -272,6 +273,14 @@ public class TransactionState implements Writable { listenerId = -1; } + public void setErrorLogUrl(String errorLogUrl) { + this.errorLogUrl = errorLogUrl; + } + + public String getErrorLogUrl() { + return errorLogUrl; + } + public void setTransactionStatus(TransactionStatus transactionStatus) { // status changed this.preStatus = this.transactionStatus; diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java index 23371f699f..7c7b277cae 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java @@ -27,7 +27,6 @@ import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Table; import org.apache.doris.common.UserException; -import org.apache.doris.common.io.Text; import org.apache.doris.persist.EditLog; import org.apache.doris.transaction.TransactionException; import org.apache.doris.transaction.TransactionState; @@ -40,20 +39,15 @@ import org.apache.kafka.common.PartitionInfo; import org.junit.Assert; import org.junit.Test; -import java.io.DataInput; import java.time.Duration; import java.util.List; import java.util.Map; -import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java_cup.runtime.Symbol; - import mockit.Deencapsulation; import mockit.Expectations; import mockit.Injectable; -import mockit.Mock; -import mockit.MockUp; import mockit.Mocked; public class RoutineLoadJobTest { @@ -94,7 +88,7 @@ public class RoutineLoadJobTest { RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(); Deencapsulation.setField(routineLoadJob, "routineLoadTaskInfoList", routineLoadTaskInfoList); Deencapsulation.setField(routineLoadJob, "lock", lock); - routineLoadJob.afterAborted(transactionState, txnStatusChangeReasonString); + routineLoadJob.afterAborted(transactionState, true, txnStatusChangeReasonString); Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, routineLoadJob.getState()); } @@ -127,7 +121,7 @@ public class RoutineLoadJobTest { Deencapsulation.setField(routineLoadJob, "routineLoadTaskInfoList", routineLoadTaskInfoList); Deencapsulation.setField(routineLoadJob, "progress", progress); Deencapsulation.setField(routineLoadJob, "lock", lock); - routineLoadJob.afterAborted(transactionState, txnStatusChangeReasonString); + routineLoadJob.afterAborted(transactionState, true, txnStatusChangeReasonString); Assert.assertEquals(RoutineLoadJob.JobState.RUNNING, routineLoadJob.getState()); Assert.assertEquals(new Long(1), Deencapsulation.getField(routineLoadJob, "abortedTaskNum")); @@ -154,7 +148,7 @@ public class RoutineLoadJobTest { Deencapsulation.setField(routineLoadJob, "progress", progress); Deencapsulation.setField(routineLoadJob, "lock", lock); try { - routineLoadJob.afterCommitted(transactionState); + routineLoadJob.afterCommitted(transactionState, true); Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, routineLoadJob.getState()); } catch (TransactionException e) { Assert.fail(); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 64057b95a0..6f707b4409 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -518,6 +518,7 @@ struct TRLTaskTxnCommitAttachment { 8: optional i64 loadedBytes 9: optional i64 loadCostMs 10: optional TKafkaRLTaskProgress kafkaRLTaskProgress + 11: optional string errorLogUrl } struct TTxnCommitAttachment {