[Bug] Fix bug that routine load task throw exception when calling afterVisible() (#3979)
This commit is contained in:
@ -21,7 +21,6 @@
|
||||
#include <boost/scoped_ptr.hpp>
|
||||
|
||||
#include "testutil/function_utils.h"
|
||||
#include "exprs/timezone_db.h"
|
||||
#include "udf/udf.h"
|
||||
#include "udf/udf_internal.h"
|
||||
#include "runtime/runtime_state.h"
|
||||
@ -37,8 +36,6 @@ public:
|
||||
TimestampFunctionsTest() { }
|
||||
|
||||
void SetUp() {
|
||||
TimezoneDatabase::init();
|
||||
|
||||
TQueryGlobals globals;
|
||||
globals.__set_now_string("2019-08-06 01:38:57");
|
||||
globals.__set_timestamp_ms(1565080737805);
|
||||
|
||||
@ -23,13 +23,13 @@
|
||||
|
||||
#include "common/logging.h"
|
||||
#include "util/logging.h"
|
||||
#include "util/timezone_utils.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
class DateTimeValueTest : public testing::Test {
|
||||
public:
|
||||
DateTimeValueTest() {
|
||||
TimezoneDatabase::init();
|
||||
}
|
||||
|
||||
protected:
|
||||
@ -295,15 +295,15 @@ TEST_F(DateTimeValueTest, from_unixtime) {
|
||||
char str[MAX_DTVALUE_STR_LEN];
|
||||
DateTimeValue value;
|
||||
|
||||
value.from_unixtime(570672000, TimezoneDatabase::default_time_zone);
|
||||
value.from_unixtime(570672000, TimezoneUtils::default_time_zone);
|
||||
value.to_string(str);
|
||||
ASSERT_STREQ("1988-02-01 08:00:00", str);
|
||||
|
||||
value.from_unixtime(253402271999, TimezoneDatabase::default_time_zone);
|
||||
value.from_unixtime(253402271999, TimezoneUtils::default_time_zone);
|
||||
value.to_string(str);
|
||||
ASSERT_STREQ("9999-12-31 23:59:59", str);
|
||||
|
||||
value.from_unixtime(0, TimezoneDatabase::default_time_zone);
|
||||
value.from_unixtime(0, TimezoneUtils::default_time_zone);
|
||||
value.to_string(str);
|
||||
ASSERT_STREQ("1970-01-01 08:00:00", str);
|
||||
|
||||
@ -316,26 +316,26 @@ TEST_F(DateTimeValueTest, unix_timestamp) {
|
||||
DateTimeValue value;
|
||||
int64_t timestamp;
|
||||
value.from_date_int64(19691231);
|
||||
value.unix_timestamp(×tamp, TimezoneDatabase::default_time_zone);
|
||||
value.unix_timestamp(×tamp, TimezoneUtils::default_time_zone);
|
||||
ASSERT_EQ(-115200, timestamp);
|
||||
value.from_date_int64(19700101);
|
||||
value.unix_timestamp(×tamp, TimezoneDatabase::default_time_zone);
|
||||
value.unix_timestamp(×tamp, TimezoneUtils::default_time_zone);
|
||||
ASSERT_EQ(0 - 28800, timestamp);
|
||||
value.from_date_int64(19700102);
|
||||
value.unix_timestamp(×tamp, TimezoneDatabase::default_time_zone);
|
||||
value.unix_timestamp(×tamp, TimezoneUtils::default_time_zone);
|
||||
ASSERT_EQ(86400 - 28800, timestamp);
|
||||
value.from_date_int64(19880201000000);
|
||||
value.unix_timestamp(×tamp, TimezoneDatabase::default_time_zone);
|
||||
value.unix_timestamp(×tamp, TimezoneUtils::default_time_zone);
|
||||
ASSERT_EQ(570672000 - 28800, timestamp);
|
||||
value.from_date_int64(20380119);
|
||||
value.unix_timestamp(×tamp, TimezoneDatabase::default_time_zone);
|
||||
value.unix_timestamp(×tamp, TimezoneUtils::default_time_zone);
|
||||
ASSERT_EQ(2147472000 - 28800, timestamp);
|
||||
value.from_date_int64(20380120);
|
||||
value.unix_timestamp(×tamp, TimezoneDatabase::default_time_zone);
|
||||
value.unix_timestamp(×tamp, TimezoneUtils::default_time_zone);
|
||||
ASSERT_EQ(2147529600, timestamp);
|
||||
|
||||
value.from_date_int64(10000101);
|
||||
value.unix_timestamp(×tamp, TimezoneDatabase::default_time_zone);
|
||||
value.unix_timestamp(×tamp, TimezoneUtils::default_time_zone);
|
||||
ASSERT_EQ(-30610252800, timestamp);
|
||||
}
|
||||
|
||||
|
||||
@ -17,14 +17,6 @@
|
||||
|
||||
package org.apache.doris.load.routineload;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.EvictingQueue;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.GsonBuilder;
|
||||
import org.apache.doris.analysis.ColumnSeparator;
|
||||
import org.apache.doris.analysis.CreateRoutineLoadStmt;
|
||||
import org.apache.doris.analysis.Expr;
|
||||
@ -69,6 +61,16 @@ import org.apache.doris.transaction.AbstractTxnStateChangeCallback;
|
||||
import org.apache.doris.transaction.TransactionException;
|
||||
import org.apache.doris.transaction.TransactionState;
|
||||
import org.apache.doris.transaction.TransactionStatus;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.EvictingQueue;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.GsonBuilder;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
@ -803,6 +805,16 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
|
||||
|
||||
Optional<RoutineLoadTaskInfo> routineLoadTaskInfoOptional = routineLoadTaskInfoList.stream().filter(
|
||||
entity -> entity.getTxnId() == txnState.getTransactionId()).findFirst();
|
||||
if (!routineLoadTaskInfoOptional.isPresent()) {
|
||||
// not find task in routineLoadTaskInfoList. this may happen in following case:
|
||||
// After the txn of the task is COMMITTED, but before it becomes VISIBLE,
|
||||
// the routine load job has been paused and then start again.
|
||||
// The routineLoadTaskInfoList will be cleared when job being paused.
|
||||
// So the task can not be found here.
|
||||
// This is a normal case, we just print a log here to observe.
|
||||
LOG.info("Can not find task with transaction {} after visible, job: {}", txnState.getTransactionId(), id);
|
||||
return;
|
||||
}
|
||||
RoutineLoadTaskInfo routineLoadTaskInfo = routineLoadTaskInfoOptional.get();
|
||||
if (routineLoadTaskInfo.getTxnStatus() != TransactionStatus.COMMITTED) {
|
||||
// TODO(cmy): Normally, this should not happen. But for safe reason, just pause the job
|
||||
|
||||
Reference in New Issue
Block a user