Support setting timezone for stream load and routine load (#1831)

This commit is contained in:
Mingyu Chen
2019-09-20 07:55:05 +08:00
committed by ZHAO Chun
parent 7bf02d0ae7
commit e8da855cd2
11 changed files with 423 additions and 228 deletions

View File

@ -22,11 +22,13 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.load.RoutineLoadDesc;
import org.apache.doris.load.routineload.KafkaProgress;
import org.apache.doris.load.routineload.LoadDataSourceType;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.qe.ConnectContext;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
@ -106,6 +108,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
.add(MAX_BATCH_ROWS_PROPERTY)
.add(MAX_BATCH_SIZE_PROPERTY)
.add(LoadStmt.STRICT_MODE)
.add(LoadStmt.TIMEZONE)
.build();
private static final ImmutableSet<String> KAFKA_PROPERTIES_SET = new ImmutableSet.Builder<String>()
@ -133,6 +136,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
private long maxBatchRows = -1;
private long maxBatchSizeBytes = -1;
private boolean strictMode = true;
private String timezone = TimeUtils.DEFAULT_TIME_ZONE;
// kafka related properties
private String kafkaBrokerList;
@ -140,7 +144,6 @@ public class CreateRoutineLoadStmt extends DdlStmt {
// pair<partition id, offset>
private List<Pair<Integer, Long>> kafkaPartitionOffsets = Lists.newArrayList();
//custom kafka property map<key, value>
private Map<String, String> customKafkaProperties = Maps.newHashMap();
@ -205,6 +208,10 @@ public class CreateRoutineLoadStmt extends DdlStmt {
return strictMode;
}
public String getTimezone() {
return timezone;
}
public String getKafkaBrokerList() {
return kafkaBrokerList;
}
@ -221,7 +228,6 @@ public class CreateRoutineLoadStmt extends DdlStmt {
return customKafkaProperties;
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
@ -288,7 +294,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
partitionNames);
}
private void checkJobProperties() throws AnalysisException {
private void checkJobProperties() throws UserException {
Optional<String> optional = jobProperties.keySet().stream().filter(
entity -> !PROPERTIES_SET.contains(entity)).findFirst();
if (optional.isPresent()) {
@ -318,6 +324,12 @@ public class CreateRoutineLoadStmt extends DdlStmt {
strictMode = Util.getBooleanPropertyOrDefault(jobProperties.get(LoadStmt.STRICT_MODE),
RoutineLoadJob.DEFAULT_STRICT_MODE,
LoadStmt.STRICT_MODE + " should be a boolean");
if (ConnectContext.get() != null) {
timezone = ConnectContext.get().getSessionVariable().getTimeZone();
}
timezone = jobProperties.getOrDefault(LoadStmt.TIMEZONE, timezone);
TimeUtils.checkTimeZoneValid(timezone);
}
private void checkDataSourceProperties() throws AnalysisException {

View File

@ -247,6 +247,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
this.maxBatchSizeBytes = stmt.getMaxBatchSize();
}
jobProperties.put(LoadStmt.STRICT_MODE, String.valueOf(stmt.isStrictMode()));
jobProperties.put(LoadStmt.TIMEZONE, stmt.getTimezone());
}
private void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) {
@ -380,6 +381,14 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
return Boolean.valueOf(value);
}
public String getTimezone() {
String value = jobProperties.get(LoadStmt.TIMEZONE);
if (value == null) {
return TimeUtils.DEFAULT_TIME_ZONE;
}
return value;
}
public RoutineLoadProgress getProgress() {
return progress;
}

View File

@ -147,6 +147,8 @@ public class StreamLoadPlanner {
params.setQuery_options(queryOptions);
TQueryGlobals queryGlobals = new TQueryGlobals();
queryGlobals.setNow_string(DATE_FORMAT.format(new Date()));
queryGlobals.setTimestamp_ms(new Date().getTime());
queryGlobals.setTime_zone(streamLoadTask.getTimezone());
params.setQuery_globals(queryGlobals);
// set load error hub if exist

View File

@ -27,6 +27,7 @@ import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
@ -59,6 +60,7 @@ public class StreamLoadTask {
private String path;
private boolean negative;
private boolean strictMode = true;
private String timezone = TimeUtils.DEFAULT_TIME_ZONE;
private int timeout = Config.stream_load_default_timeout_second;
public StreamLoadTask(TUniqueId id, long txnId, TFileType fileType, TFileFormatType formatType) {
@ -112,6 +114,10 @@ public class StreamLoadTask {
return strictMode;
}
public String getTimezone() {
return timezone;
}
public int getTimeout() {
return timeout;
}
@ -152,6 +158,10 @@ public class StreamLoadTask {
if (request.isSetStrictMode()) {
strictMode = request.isStrictMode();
}
if (request.isSetTimezone()) {
timezone = request.getTimezone();
TimeUtils.checkTimeZoneValid(timezone);
}
}
public static StreamLoadTask fromRoutineLoadJob(RoutineLoadJob routineLoadJob) {
@ -172,6 +182,7 @@ public class StreamLoadTask {
columnSeparator = routineLoadJob.getColumnSeparator();
partitions = routineLoadJob.getPartitions() == null ? null : Joiner.on(",").join(routineLoadJob.getPartitions());
strictMode = routineLoadJob.isStrictMode();
timezone = routineLoadJob.getTimezone();
}
// used for stream load