Submit routine load task immediately (#682)

1. Use submit_routine_load_task instead of agentTaskQueue
2. Remove thrift dependency in StreamLoadPlanner and StreamLoadScanNode
This commit is contained in:
EmmyMiao87
2019-03-04 11:25:58 +08:00
committed by ZHAO Chun
parent 9618d20a72
commit 152606fbd6
18 changed files with 456 additions and 162 deletions

View File

@ -44,6 +44,10 @@ public class LoadColumnsInfo implements ParseNode {
this.columnMappingList = columnMappingList;
}
public Map<String, Expr> getParsedExprMap() {
return parsedExprMap;
}
@Override
public void analyze(Analyzer analyzer) throws AnalysisException {
checkColumnNames();

View File

@ -75,7 +75,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
// TODO(ml): I will change it after ut.
@VisibleForTesting
public KafkaRoutineLoadJob(String id, String name, long dbId, long tableId,
public KafkaRoutineLoadJob(long id, String name, long dbId, long tableId,
RoutineLoadDesc routineLoadDesc,
int desireTaskConcurrentNum, int maxErrorNum,
String serverAddress, String topic, KafkaProgress kafkaProgress) {
@ -90,6 +90,14 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
setConsumer();
}
public String getTopic() {
return topic;
}
public String getServerAddress() {
return serverAddress;
}
private void setCustomKafkaPartitions(List<Integer> kafkaPartitions) throws LoadException {
writeLock();
try {
@ -123,7 +131,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
// divide kafkaPartitions into tasks
for (int i = 0; i < currentConcurrentTaskNum; i++) {
try {
KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(UUID.randomUUID().toString(), id);
KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), id);
routineLoadTaskInfoList.add(kafkaTaskInfo);
needScheduleTaskInfoList.add(kafkaTaskInfo);
result.add(kafkaTaskInfo);

View File

@ -17,12 +17,26 @@
package org.apache.doris.load.routineload;
import com.google.common.base.Joiner;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.UserException;
import org.apache.doris.planner.StreamLoadPlanner;
import org.apache.doris.task.KafkaRoutineLoadTask;
import org.apache.doris.task.RoutineLoadTask;
import org.apache.doris.task.StreamLoadTask;
import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TKafkaLoadInfo;
import org.apache.doris.thrift.TLoadSourceType;
import org.apache.doris.thrift.TRoutineLoadTask;
import org.apache.doris.thrift.TStreamLoadPutRequest;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.BeginTransactionException;
import com.google.common.collect.Maps;
@ -38,7 +52,7 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
private List<Integer> partitions;
public KafkaTaskInfo(String id, String jobId) throws LabelAlreadyUsedException,
public KafkaTaskInfo(UUID id, long jobId) throws LabelAlreadyUsedException,
BeginTransactionException, AnalysisException {
super(id, jobId);
this.partitions = new ArrayList<>();
@ -46,7 +60,7 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
public KafkaTaskInfo(KafkaTaskInfo kafkaTaskInfo) throws LabelAlreadyUsedException,
BeginTransactionException, AnalysisException {
super(UUID.randomUUID().toString(), kafkaTaskInfo.getJobId());
super(UUID.randomUUID(), kafkaTaskInfo.getJobId());
this.partitions = kafkaTaskInfo.getPartitions();
}
@ -58,9 +72,10 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
return partitions;
}
// todo: reuse plan fragment of stream load
@Override
public RoutineLoadTask createStreamLoadTask(long beId) throws LoadException {
RoutineLoadJob routineLoadJob = routineLoadManager.getJob(jobId);
public TRoutineLoadTask createRoutineLoadTask(long beId) throws LoadException, UserException {
KafkaRoutineLoadJob routineLoadJob = (KafkaRoutineLoadJob) routineLoadManager.getJob(jobId);
Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
for (Integer partitionId : partitions) {
KafkaProgress kafkaProgress = (KafkaProgress) routineLoadJob.getProgress();
@ -69,13 +84,41 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
}
partitionIdToOffset.put(partitionId, kafkaProgress.getPartitionIdToOffset().get(partitionId));
}
RoutineLoadTask routineLoadTask = new KafkaRoutineLoadTask(routineLoadJob.getResourceInfo(),
beId, routineLoadJob.getDbId(),
routineLoadJob.getTableId(),
id, txnId, partitionIdToOffset);
if (routineLoadJob.getRoutineLoadDesc() != null) {
routineLoadTask.setRoutineLoadDesc(routineLoadJob.getRoutineLoadDesc());
}
return routineLoadTask;
// init tRoutineLoadTask and create plan fragment
TRoutineLoadTask tRoutineLoadTask = new TRoutineLoadTask();
TUniqueId queryId = new TUniqueId(id.getMostSignificantBits(), id.getLeastSignificantBits());
tRoutineLoadTask.setId(queryId);
tRoutineLoadTask.setJob_id(jobId);
tRoutineLoadTask.setTxn_id(txnId);
Database database = Catalog.getCurrentCatalog().getDb(routineLoadJob.getDbId());
tRoutineLoadTask.setDb(database.getFullName());
tRoutineLoadTask.setTbl(database.getTable(routineLoadJob.getTableId()).getName());
StringBuilder stringBuilder = new StringBuilder();
// label = (serviceAddress_topic_partition1:offset_partition2:offset).hashcode()
String label = String.valueOf(stringBuilder.append(routineLoadJob.getServerAddress()).append("_")
.append(routineLoadJob.getTopic()).append("_")
.append(Joiner.on("_").withKeyValueSeparator(":")
.join(partitionIdToOffset)).toString().hashCode());
tRoutineLoadTask.setLabel(label);
tRoutineLoadTask.setAuth_code(routineLoadJob.getAuthCode());
TKafkaLoadInfo tKafkaLoadInfo = new TKafkaLoadInfo();
tKafkaLoadInfo.setTopic((routineLoadJob).getTopic());
tKafkaLoadInfo.setBrokers((routineLoadJob).getServerAddress());
tKafkaLoadInfo.setPartition_begin_offset(partitionIdToOffset);
tRoutineLoadTask.setKafka_load_info(tKafkaLoadInfo);
tRoutineLoadTask.setType(TLoadSourceType.KAFKA);
tRoutineLoadTask.setParams(createTExecPlanFragmentParams(routineLoadJob));
return tRoutineLoadTask;
}
private TExecPlanFragmentParams createTExecPlanFragmentParams(RoutineLoadJob routineLoadJob) throws UserException {
StreamLoadTask streamLoadTask = StreamLoadTask.fromRoutineLoadTaskInfo(this);
Database database = Catalog.getCurrentCatalog().getDb(routineLoadJob.getDbId());
StreamLoadPlanner planner = new StreamLoadPlanner(database,
(OlapTable) database.getTable(routineLoadJob.getTableId()),
streamLoadTask);
return planner.plan();
}
}

View File

@ -29,35 +29,12 @@ import java.io.IOException;
// "numOfTotalData": "", "taskId": "", "jobId": ""}
public class RLTaskTxnCommitAttachment extends TxnCommitAttachment {
public enum LoadSourceType {
KAFKA(1);
private final int flag;
private LoadSourceType(int flag) {
this.flag = flag;
}
public int value() {
return flag;
}
public static LoadSourceType valueOf(int flag) {
switch (flag) {
case 1:
return KAFKA;
default:
return null;
}
}
}
private long jobId;
private TUniqueId taskId;
private long filteredRows;
private long loadedRows;
private RoutineLoadProgress progress;
private LoadSourceType loadSourceType;
private LoadDataSourceType loadDataSourceType;
public RLTaskTxnCommitAttachment() {
}
@ -70,7 +47,7 @@ public class RLTaskTxnCommitAttachment extends TxnCommitAttachment {
switch (rlTaskTxnCommitAttachment.getLoadSourceType()) {
case KAFKA:
this.loadSourceType = LoadSourceType.KAFKA;
this.loadDataSourceType = LoadDataSourceType.KAFKA;
this.progress = new KafkaProgress(rlTaskTxnCommitAttachment.getKafkaRLTaskProgress());
default:
break;

View File

@ -24,6 +24,7 @@ import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.IdGenerator;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
@ -31,11 +32,16 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Writable;
import org.apache.doris.load.RoutineLoadDesc;
import org.apache.doris.load.TxnStateChangeListener;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendServiceImpl;
import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TLoadTxnCommitRequest;
import org.apache.doris.thrift.TResourceInfo;
import org.apache.doris.thrift.TStreamLoadPutRequest;
import org.apache.doris.transaction.AbortTransactionException;
import org.apache.doris.transaction.BeginTransactionException;
import org.apache.doris.transaction.TransactionState;
@ -107,10 +113,12 @@ public abstract class RoutineLoadJob implements Writable, TxnStateChangeListener
}
protected String id;
protected long id;
protected String name;
protected long dbId;
protected long tableId;
// this code is used to verify be task request
protected long authCode;
protected RoutineLoadDesc routineLoadDesc; // optional
protected int desireTaskConcurrentNum; // optional
protected JobState state;
@ -134,17 +142,23 @@ public abstract class RoutineLoadJob implements Writable, TxnStateChangeListener
protected List<RoutineLoadTaskInfo> routineLoadTaskInfoList;
protected List<RoutineLoadTaskInfo> needScheduleTaskInfoList;
// plan fragment which will be initialized during job scheduler
protected TExecPlanFragmentParams tExecPlanFragmentParams;
protected ReentrantReadWriteLock lock;
// TODO(ml): error sample
public RoutineLoadJob(String name, long dbId, long tableId, LoadDataSourceType dataSourceType) {
this.id = UUID.randomUUID().toString();
this.id = Catalog.getInstance().getNextId();
this.name = name;
this.dbId = dbId;
this.tableId = tableId;
this.state = JobState.NEED_SCHEDULE;
this.dataSourceType = dataSourceType;
this.resourceInfo = ConnectContext.get().toResourceCtx();
this.authCode = new StringBuilder().append(ConnectContext.get().getQualifiedUser())
.append(ConnectContext.get().getRemoteIP())
.append(id).append(System.currentTimeMillis()).toString().hashCode();
this.routineLoadTaskInfoList = new ArrayList<>();
this.needScheduleTaskInfoList = new ArrayList<>();
lock = new ReentrantReadWriteLock(true);
@ -152,7 +166,7 @@ public abstract class RoutineLoadJob implements Writable, TxnStateChangeListener
// TODO(ml): I will change it after ut.
@VisibleForTesting
public RoutineLoadJob(String id, String name, long dbId, long tableId,
public RoutineLoadJob(long id, String name, long dbId, long tableId,
RoutineLoadDesc routineLoadDesc,
int desireTaskConcurrentNum, LoadDataSourceType dataSourceType,
int maxErrorNum) {
@ -187,7 +201,7 @@ public abstract class RoutineLoadJob implements Writable, TxnStateChangeListener
lock.writeLock().unlock();
}
public String getId() {
public long getId() {
return id;
}
@ -227,6 +241,10 @@ public abstract class RoutineLoadJob implements Writable, TxnStateChangeListener
this.state = state;
}
public long getAuthCode() {
return authCode;
}
protected void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) throws LoadException {
writeLock();
try {
@ -324,7 +342,7 @@ public abstract class RoutineLoadJob implements Writable, TxnStateChangeListener
for (RoutineLoadTaskInfo routineLoadTaskInfo : runningTasks) {
if ((System.currentTimeMillis() - routineLoadTaskInfo.getLoadStartTimeMs())
> DEFAULT_TASK_TIMEOUT_SECONDS * 1000) {
String oldSignature = routineLoadTaskInfo.getId();
String oldSignature = routineLoadTaskInfo.getId().toString();
// abort txn if not committed
try {
Catalog.getCurrentGlobalTransactionMgr()

View File

@ -43,6 +43,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
@ -55,7 +56,7 @@ public class RoutineLoadManager {
private Map<Long, Integer> beIdToConcurrentTasks;
// stream load job meta
private Map<String, RoutineLoadJob> idToRoutineLoadJob;
private Map<Long, RoutineLoadJob> idToRoutineLoadJob;
private Map<Long, Map<String, List<RoutineLoadJob>>> dbToNameToRoutineLoadJob;
private Queue<RoutineLoadTaskInfo> needScheduleTasksQueue;
@ -328,7 +329,7 @@ public class RoutineLoadManager {
}
}
public RoutineLoadJob getJob(String jobId) {
public RoutineLoadJob getJob(long jobId) {
return idToRoutineLoadJob.get(jobId);
}

View File

@ -22,10 +22,15 @@ import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.UserException;
import org.apache.doris.task.RoutineLoadTask;
import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TRoutineLoadTask;
import org.apache.doris.transaction.BeginTransactionException;
import org.apache.doris.transaction.TransactionState;
import java.util.UUID;
/**
* Routine load task info is the task info include the only id (signature).
* For the kafka type of task info, it also include partitions which will be obtained data in this task.
@ -36,13 +41,14 @@ public abstract class RoutineLoadTaskInfo {
private RoutineLoadManager routineLoadManager = Catalog.getCurrentCatalog().getRoutineLoadManager();
protected String id;
protected UUID id;
protected long txnId;
protected String jobId;
protected long jobId;
private long createTimeMs;
private long loadStartTimeMs;
private TExecPlanFragmentParams tExecPlanFragmentParams;
public RoutineLoadTaskInfo(String id, String jobId) throws BeginTransactionException,
public RoutineLoadTaskInfo(UUID id, long jobId) throws BeginTransactionException,
LabelAlreadyUsedException, AnalysisException {
this.id = id;
this.jobId = jobId;
@ -50,15 +56,15 @@ public abstract class RoutineLoadTaskInfo {
// begin a txn for task
RoutineLoadJob routineLoadJob = routineLoadManager.getJob(jobId);
txnId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(
routineLoadJob.getDbId(), id, -1, "streamLoad",
routineLoadJob.getDbId(), id.toString(), -1, "streamLoad",
TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK, routineLoadJob);
}
public String getId() {
public UUID getId() {
return id;
}
public String getJobId() {
public long getJobId() {
return jobId;
}
@ -74,13 +80,13 @@ public abstract class RoutineLoadTaskInfo {
return txnId;
}
abstract RoutineLoadTask createStreamLoadTask(long beId) throws LoadException;
abstract TRoutineLoadTask createRoutineLoadTask(long beId) throws LoadException, UserException;
@Override
public boolean equals(Object obj) {
if (obj instanceof RoutineLoadTaskInfo) {
RoutineLoadTaskInfo routineLoadTaskInfo = (RoutineLoadTaskInfo) obj;
return this.id.equals(routineLoadTaskInfo.getId());
return this.id.toString().equals(routineLoadTaskInfo.getId().toString());
} else {
return false;
}

View File

@ -17,17 +17,23 @@
package org.apache.doris.load.routineload;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.Daemon;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.RoutineLoadTask;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TRoutineLoadTask;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
/**
@ -62,13 +68,13 @@ public class RoutineLoadTaskScheduler extends Daemon {
}
}
private void process() throws LoadException {
private void process() throws LoadException, UserException {
// update current beIdMaps for tasks
routineLoadManager.updateBeIdTaskMaps();
LOG.info("There are {} need schedule task in queue when {}",
needScheduleTasksQueue.size(), System.currentTimeMillis());
AgentBatchTask batchTask = new AgentBatchTask();
Map<Long, List<TRoutineLoadTask>> beIdTobatchTask = Maps.newHashMap();
int sizeOfTasksQueue = needScheduleTasksQueue.size();
int clusterIdleSlotNum = routineLoadManager.getClusterIdleSlotNum();
int needScheduleTaskNum = sizeOfTasksQueue < clusterIdleSlotNum ? sizeOfTasksQueue : clusterIdleSlotNum;
@ -88,27 +94,56 @@ public class RoutineLoadTaskScheduler extends Daemon {
long beId = routineLoadManager.getMinTaskBeId();
RoutineLoadJob routineLoadJob = null;
try {
routineLoadJob = routineLoadManager.getJobByTaskId(routineLoadTaskInfo.getId());
routineLoadJob = routineLoadManager.getJobByTaskId(routineLoadTaskInfo.getId().toString());
} catch (MetaNotFoundException e) {
LOG.warn("task {} has been abandoned", routineLoadTaskInfo.getId());
return;
}
RoutineLoadTask routineLoadTask = routineLoadTaskInfo.createStreamLoadTask(beId);
TRoutineLoadTask tRoutineLoadTask = routineLoadTaskInfo.createRoutineLoadTask(beId);
// remove task for needScheduleTasksList in job
routineLoadJob.removeNeedScheduleTask(routineLoadTaskInfo);
routineLoadTaskInfo.setLoadStartTimeMs(System.currentTimeMillis());
AgentTaskQueue.addTask(routineLoadTask);
batchTask.addTask(routineLoadTask);
// add to batch task map
if (beIdTobatchTask.containsKey(beId)) {
beIdTobatchTask.get(beId).add(tRoutineLoadTask);
} else {
List<TRoutineLoadTask> tRoutineLoadTaskList = Lists.newArrayList();
tRoutineLoadTaskList.add(tRoutineLoadTask);
beIdTobatchTask.put(beId, tRoutineLoadTaskList);
}
// count
clusterIdleSlotNum--;
scheduledTaskNum++;
routineLoadManager.addNumOfConcurrentTasksByBeId(beId);
needScheduleTaskNum--;
}
submitBatchTask(beIdTobatchTask);
LOG.info("{} tasks have bean allocated to be.", scheduledTaskNum);
}
// todo: change to batch submit and reuse client
private void submitBatchTask(Map<Long, List<TRoutineLoadTask>> beIdToRoutineLoadTask) {
for (Map.Entry<Long, List<TRoutineLoadTask>> entry : beIdToRoutineLoadTask.entrySet()) {
Backend backend = Catalog.getCurrentSystemInfo().getBackend(entry.getKey());
TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBePort());
BackendService.Client client = null;
boolean ok = false;
try {
client = ClientPool.backendPool.borrowObject(address);
for (TRoutineLoadTask tRoutineLoadTask : entry.getValue()) {
client.submit_routine_load_task(tRoutineLoadTask);
}
ok = true;
} catch (Exception e) {
LOG.warn("task exec error. backend[{}]", backend.getId(), e);
} finally {
if (ok) {
ClientPool.backendPool.returnObject(address, client);
} else {
ClientPool.backendPool.invalidateObject(address, client);
}
}
if (batchTask.getTaskNum() > 0) {
AgentTaskExecutor.submit(batchTask);
}
}
}

View File

@ -27,6 +27,7 @@ import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.UserException;
import org.apache.doris.load.LoadErrorHub;
import org.apache.doris.task.StreamLoadTask;
import org.apache.doris.thrift.PaloInternalServiceVersion;
import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TLoadErrorHubInfo;
@ -63,15 +64,15 @@ public class StreamLoadPlanner {
// Data will load to this table
private Database db;
private OlapTable destTable;
private TStreamLoadPutRequest request;
private StreamLoadTask streamLoadTask;
private Analyzer analyzer;
private DescriptorTable descTable;
public StreamLoadPlanner(Database db, OlapTable destTable, TStreamLoadPutRequest request) {
public StreamLoadPlanner(Database db, OlapTable destTable, StreamLoadTask streamLoadTask) {
this.db = db;
this.destTable = destTable;
this.request = request;
this.streamLoadTask = streamLoadTask;
analyzer = new Analyzer(Catalog.getInstance(), null);
descTable = analyzer.getDescTbl();
@ -92,14 +93,14 @@ public class StreamLoadPlanner {
}
// create scan node
StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(0), tupleDesc, destTable, request);
StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(0), tupleDesc, destTable, streamLoadTask);
scanNode.init(analyzer);
descTable.computeMemLayout();
scanNode.finalize(analyzer);
// create dest sink
OlapTableSink olapTableSink = new OlapTableSink(destTable, tupleDesc, request.getPartitions());
olapTableSink.init(request.getLoadId(), request.getTxnId(), db.getId());
OlapTableSink olapTableSink = new OlapTableSink(destTable, tupleDesc, streamLoadTask.getPartitions());
olapTableSink.init(streamLoadTask.getId(), streamLoadTask.getTxnId(), db.getId());
olapTableSink.finalize();
// for stream load, we only need one fragment, ScanNode -> DataSink.
@ -150,7 +151,7 @@ public class StreamLoadPlanner {
}
}
LOG.debug("stream load txn id: {}, plan: {}", request.txnId, params);
LOG.debug("stream load txn id: {}, plan: {}", streamLoadTask.getTxnId(), params);
return params;
}
}

View File

@ -39,6 +39,7 @@ import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.task.StreamLoadTask;
import org.apache.doris.thrift.TBrokerRangeDesc;
import org.apache.doris.thrift.TBrokerScanNode;
import org.apache.doris.thrift.TBrokerScanRange;
@ -70,7 +71,7 @@ public class StreamLoadScanNode extends ScanNode {
// TODO(zc): now we use scanRange
// input parameter
private Table dstTable;
private TStreamLoadPutRequest request;
private StreamLoadTask streamLoadTask;
// helper
private Analyzer analyzer;
@ -82,10 +83,10 @@ public class StreamLoadScanNode extends ScanNode {
// used to construct for streaming loading
public StreamLoadScanNode(
PlanNodeId id, TupleDescriptor tupleDesc, Table dstTable, TStreamLoadPutRequest request) {
PlanNodeId id, TupleDescriptor tupleDesc, Table dstTable, StreamLoadTask streamLoadTask) {
super(id, tupleDesc, "StreamLoadScanNode");
this.dstTable = dstTable;
this.request = request;
this.streamLoadTask = streamLoadTask;
}
@Override
@ -97,19 +98,19 @@ public class StreamLoadScanNode extends ScanNode {
brokerScanRange = new TBrokerScanRange();
TBrokerRangeDesc rangeDesc = new TBrokerRangeDesc();
rangeDesc.file_type = request.getFileType();
rangeDesc.format_type = request.getFormatType();
rangeDesc.file_type = streamLoadTask.getFileType();
rangeDesc.format_type = streamLoadTask.getFormatType();
rangeDesc.splittable = false;
switch (request.getFileType()) {
switch (streamLoadTask.getFileType()) {
case FILE_LOCAL:
rangeDesc.path = request.getPath();
rangeDesc.path = streamLoadTask.getPath();
break;
case FILE_STREAM:
rangeDesc.path = "Invalid Path";
rangeDesc.load_id = request.getLoadId();
rangeDesc.load_id = streamLoadTask.getId();
break;
default:
throw new UserException("unsupported file type, type=" + request.getFileType());
throw new UserException("unsupported file type, type=" + streamLoadTask.getFileType());
}
rangeDesc.start_offset = 0;
rangeDesc.size = -1;
@ -123,35 +124,14 @@ public class StreamLoadScanNode extends ScanNode {
// columns: k1, k2, v1, v2=k1 + k2
// this means that there are three columns(k1, k2, v1) in source file,
// and v2 is derived from (k1 + k2)
if (request.isSetColumns()) {
String columnsSQL = new String("COLUMNS " + request.getColumns());
SqlParser parser = new SqlParser(new SqlScanner(new StringReader(columnsSQL)));
ImportColumnsStmt columnsStmt;
try {
columnsStmt = (ImportColumnsStmt) parser.parse().value;
} catch (Error e) {
LOG.warn("error happens when parsing columns, sql={}", columnsSQL, e);
throw new AnalysisException("failed to parsing columns' header, maybe contain unsupported character");
} catch (AnalysisException e) {
LOG.warn("analyze columns' statement failed, sql={}, error={}",
columnsSQL, parser.getErrorMsg(columnsSQL), e);
String errorMessage = parser.getErrorMsg(columnsSQL);
if (errorMessage == null) {
throw e;
} else {
throw new AnalysisException(errorMessage, e);
}
} catch (Exception e) {
LOG.warn("failed to parse columns header, sql={}", columnsSQL, e);
throw new UserException("parse columns header failed", e);
}
for (ImportColumnDesc columnDesc : columnsStmt.getColumns()) {
if (streamLoadTask.getColumnToColumnExpr() != null || streamLoadTask.getColumnToColumnExpr().size() != 0) {
for (Map.Entry<String, Expr> entry : streamLoadTask.getColumnToColumnExpr().entrySet()) {
// make column name case match with real column name
String realColName = dstTable.getColumn(columnDesc.getColumn()) == null ? columnDesc.getColumn()
: dstTable.getColumn(columnDesc.getColumn()).getName();
if (columnDesc.getExpr() != null) {
exprsByName.put(realColName, columnDesc.getExpr());
String column = entry.getKey();
String realColName = dstTable.getColumn(column) == null ? column
: dstTable.getColumn(column).getName();
if (entry.getValue() != null) {
exprsByName.put(realColName, entry.getValue());
} else {
SlotDescriptor slotDesc = analyzer.getDescTbl().addSlotDescriptor(srcTupleDesc);
slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR));
@ -203,36 +183,14 @@ public class StreamLoadScanNode extends ScanNode {
}
// analyze where statement
if (request.isSetWhere()) {
if (streamLoadTask.getWhereExpr() != null) {
Map<String, SlotDescriptor> dstDescMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
for (SlotDescriptor slotDescriptor : desc.getSlots()) {
dstDescMap.put(slotDescriptor.getColumn().getName(), slotDescriptor);
}
String whereSQL = new String("WHERE " + request.getWhere());
SqlParser parser = new SqlParser(new SqlScanner(new StringReader(whereSQL)));
ImportWhereStmt whereStmt;
try {
whereStmt = (ImportWhereStmt) parser.parse().value;
} catch (Error e) {
LOG.warn("error happens when parsing where header, sql={}", whereSQL, e);
throw new AnalysisException("failed to parsing where header, maybe contain unsupported character");
} catch (AnalysisException e) {
LOG.warn("analyze where statement failed, sql={}, error={}",
whereSQL, parser.getErrorMsg(whereSQL), e);
String errorMessage = parser.getErrorMsg(whereSQL);
if (errorMessage == null) {
throw e;
} else {
throw new AnalysisException(errorMessage, e);
}
} catch (Exception e) {
LOG.warn("failed to parse where header, sql={}", whereSQL, e);
throw new UserException("parse columns header failed", e);
}
// substitute SlotRef in filter expression
Expr whereExpr = whereStmt.getExpr();
Expr whereExpr = streamLoadTask.getWhereExpr();
List<SlotRef> slots = Lists.newArrayList();
whereExpr.collect(SlotRef.class, slots);
@ -258,8 +216,8 @@ public class StreamLoadScanNode extends ScanNode {
computeStats(analyzer);
createDefaultSmap(analyzer);
if (request.isSetColumnSeparator()) {
String sep = ColumnSeparator.convertSeparator(request.getColumnSeparator());
if (streamLoadTask.getColumnSeparator() != null) {
String sep = streamLoadTask.getColumnSeparator().getColumnSeparator();
params.setColumn_separator(sep.getBytes(Charset.forName("UTF-8"))[0]);
} else {
params.setColumn_separator((byte) '\t');

View File

@ -814,7 +814,7 @@ public class ShowExecutor {
// get routine load info
List<List<String>> rows = Lists.newArrayList();
List<String> row = Lists.newArrayList();
row.add(routineLoadJob.getId());
row.add(String.valueOf(routineLoadJob.getId()));
row.add(routineLoadJob.getName());
row.add(String.valueOf(routineLoadJob.getDbId()));
row.add(String.valueOf(routineLoadJob.getTableId()));

View File

@ -47,6 +47,7 @@ import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.system.Frontend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.task.StreamLoadTask;
import org.apache.doris.thrift.FrontendService;
import org.apache.doris.thrift.FrontendServiceVersion;
import org.apache.doris.thrift.TColumnDef;
@ -728,7 +729,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
if (!(table instanceof OlapTable)) {
throw new UserException("load table type is not OlapTable, type=" + table.getClass());
}
StreamLoadPlanner planner = new StreamLoadPlanner(db, (OlapTable) table, request);
StreamLoadPlanner planner = new StreamLoadPlanner(db, (OlapTable) table, StreamLoadTask.fromTStreamLoadPutRequest(request));
return planner.plan();
} finally {
db.readUnlock();

View File

@ -0,0 +1,224 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.doris.task;
import com.google.common.base.Joiner;
import com.google.common.collect.Maps;
import org.apache.doris.analysis.ColumnSeparator;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ImportColumnDesc;
import org.apache.doris.analysis.ImportColumnsStmt;
import org.apache.doris.analysis.ImportWhereStmt;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.load.RoutineLoadDesc;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.load.routineload.RoutineLoadManager;
import org.apache.doris.load.routineload.RoutineLoadTaskInfo;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TStreamLoadPutRequest;
import org.apache.doris.thrift.TUniqueId;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.StringReader;
import java.util.Map;
public class StreamLoadTask {
private static final Logger LOG = LogManager.getLogger(StreamLoadTask.class);
private TUniqueId id;
private long txnId;
private TFileType fileType;
private TFileFormatType formatType;
// optional
private Map<String, Expr> columnToColumnExpr;
private Expr whereExpr;
private ColumnSeparator columnSeparator;
private String partitions;
private String path;
public StreamLoadTask(TUniqueId id, long txnId, TFileType fileType, TFileFormatType formatType) {
this.id = id;
this.txnId = txnId;
this.fileType = fileType;
this.formatType = formatType;
}
public TUniqueId getId() {
return id;
}
public long getTxnId() {
return txnId;
}
public TFileType getFileType() {
return fileType;
}
public TFileFormatType getFormatType() {
return formatType;
}
public Map<String, Expr> getColumnToColumnExpr() {
return columnToColumnExpr;
}
public Expr getWhereExpr() {
return whereExpr;
}
public ColumnSeparator getColumnSeparator() {
return columnSeparator;
}
public String getPartitions() {
return partitions;
}
public String getPath() {
return path;
}
public static StreamLoadTask fromTStreamLoadPutRequest(TStreamLoadPutRequest request) throws UserException {
StreamLoadTask streamLoadTask = new StreamLoadTask(request.getLoadId(), request.getTxnId(),
request.getFileType(), request.getFormatType());
streamLoadTask.setOptionalFromTSLPutRequest(request);
return streamLoadTask;
}
private void setOptionalFromTSLPutRequest(TStreamLoadPutRequest request) throws UserException {
if (request.isSetColumns()) {
setColumnToColumnExpr(request.getColumns());
}
if (request.isSetWhere()) {
setWhereExpr(request.getWhere());
}
if (request.isSetColumnSeparator()) {
setColumnSeparator(request.getColumnSeparator());
}
if (request.isSetPartitions()) {
partitions = request.getPartitions();
}
switch (request.getFileType()) {
case FILE_LOCAL:
path = request.getPath();
}
}
public static StreamLoadTask fromRoutineLoadTaskInfo(RoutineLoadTaskInfo routineLoadTaskInfo) {
TUniqueId queryId = new TUniqueId(routineLoadTaskInfo.getId().getMostSignificantBits(),
routineLoadTaskInfo.getId().getLeastSignificantBits());
StreamLoadTask streamLoadTask = new StreamLoadTask(queryId, routineLoadTaskInfo.getTxnId(),
TFileType.FILE_STREAM, TFileFormatType.FORMAT_CSV_PLAIN);
RoutineLoadManager routineLoadManager = Catalog.getCurrentCatalog().getRoutineLoadManager();
streamLoadTask.setOptionalFromRoutineLoadTaskInfo(routineLoadTaskInfo,
routineLoadManager.getJob(routineLoadTaskInfo.getJobId()));
return streamLoadTask;
}
private void setOptionalFromRoutineLoadTaskInfo(RoutineLoadTaskInfo routineLoadTaskInfo,
RoutineLoadJob routineLoadJob) {
if (routineLoadJob.getRoutineLoadDesc() != null) {
RoutineLoadDesc routineLoadDesc = routineLoadJob.getRoutineLoadDesc();
if (routineLoadDesc.getColumnsInfo() != null) {
columnToColumnExpr = routineLoadDesc.getColumnsInfo().getParsedExprMap();
}
if (routineLoadDesc.getWherePredicate() != null) {
whereExpr = routineLoadDesc.getWherePredicate();
}
if (routineLoadDesc.getColumnSeparator() != null) {
columnSeparator = routineLoadDesc.getColumnSeparator();
}
if (routineLoadDesc.getPartitionNames() != null && routineLoadDesc.getPartitionNames().size() != 0) {
partitions = Joiner.on(",").join(routineLoadDesc.getPartitionNames());
}
}
}
private void setColumnToColumnExpr(String columns) throws UserException {
String columnsSQL = new String("COLUMNS " + columns);
SqlParser parser = new SqlParser(new SqlScanner(new StringReader(columnsSQL)));
ImportColumnsStmt columnsStmt;
try {
columnsStmt = (ImportColumnsStmt) parser.parse().value;
} catch (Error e) {
LOG.warn("error happens when parsing columns, sql={}", columnsSQL, e);
throw new AnalysisException("failed to parsing columns' header, maybe contain unsupported character");
} catch (AnalysisException e) {
LOG.warn("analyze columns' statement failed, sql={}, error={}",
columnsSQL, parser.getErrorMsg(columnsSQL), e);
String errorMessage = parser.getErrorMsg(columnsSQL);
if (errorMessage == null) {
throw e;
} else {
throw new AnalysisException(errorMessage, e);
}
} catch (Exception e) {
LOG.warn("failed to parse columns header, sql={}", columnsSQL, e);
throw new UserException("parse columns header failed", e);
}
if (columnsStmt.getColumns() != null || columnsStmt.getColumns().size() != 0) {
columnToColumnExpr = Maps.newHashMap();
for (ImportColumnDesc columnDesc : columnsStmt.getColumns()) {
columnToColumnExpr.put(columnDesc.getColumn(), columnDesc.getExpr());
}
}
}
private void setWhereExpr(String whereString) throws UserException {
String whereSQL = new String("WHERE " + whereString);
SqlParser parser = new SqlParser(new SqlScanner(new StringReader(whereSQL)));
ImportWhereStmt whereStmt;
try {
whereStmt = (ImportWhereStmt) parser.parse().value;
} catch (Error e) {
LOG.warn("error happens when parsing where header, sql={}", whereSQL, e);
throw new AnalysisException("failed to parsing where header, maybe contain unsupported character");
} catch (AnalysisException e) {
LOG.warn("analyze where statement failed, sql={}, error={}",
whereSQL, parser.getErrorMsg(whereSQL), e);
String errorMessage = parser.getErrorMsg(whereSQL);
if (errorMessage == null) {
throw e;
} else {
throw new AnalysisException(errorMessage, e);
}
} catch (Exception e) {
LOG.warn("failed to parse where header, sql={}", whereSQL, e);
throw new UserException("parse columns header failed", e);
}
whereExpr = whereStmt.getExpr();
}
private void setColumnSeparator(String oriSeparator) throws AnalysisException {
columnSeparator = new ColumnSeparator(oriSeparator);
columnSeparator.analyze();
}
}

View File

@ -62,6 +62,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.UUID;
public class KafkaRoutineLoadJobTest {
@ -125,7 +126,7 @@ public class KafkaRoutineLoadJobTest {
};
KafkaRoutineLoadJob kafkaRoutineLoadJob =
new KafkaRoutineLoadJob("1", "kafka_routine_load_job", 1L,
new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", 1L,
1L, routineLoadDesc, 3, 0,
"", "", new KafkaProgress());
Deencapsulation.setField(kafkaRoutineLoadJob, "consumer", kafkaConsumer);
@ -149,7 +150,7 @@ public class KafkaRoutineLoadJobTest {
};
KafkaRoutineLoadJob kafkaRoutineLoadJob =
new KafkaRoutineLoadJob("1", "kafka_routine_load_job", 1L,
new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", 1L,
1L, routineLoadDesc, 3, 0,
"", "", null);
@ -200,7 +201,7 @@ public class KafkaRoutineLoadJobTest {
};
RoutineLoadJob routineLoadJob =
new KafkaRoutineLoadJob("1", "kafka_routine_load_job", 1L,
new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", 1L,
1L, routineLoadDesc, 3, 0,
"", "", null);
new Expectations() {
@ -214,7 +215,7 @@ public class KafkaRoutineLoadJobTest {
};
List<RoutineLoadTaskInfo> routineLoadTaskInfoList = new ArrayList<>();
KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo("1", "1");
KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(new UUID(1, 1), 1L);
kafkaTaskInfo.addKafkaPartition(100);
kafkaTaskInfo.setLoadStartTimeMs(System.currentTimeMillis() - DEFAULT_TASK_TIMEOUT_SECONDS * 60 * 1000);
routineLoadTaskInfoList.add(kafkaTaskInfo);
@ -231,7 +232,7 @@ public class KafkaRoutineLoadJobTest {
new Expectations() {
{
routineLoadManager.getJob("1");
routineLoadManager.getJob(1L);
result = routineLoadJob;
}
};

View File

@ -74,7 +74,7 @@ public class RoutineLoadSchedulerTest {
};
RoutineLoadJob routineLoadJob =
new KafkaRoutineLoadJob("1", "kafka_routine_load_job", 1L,
new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", 1L,
1L, routineLoadDesc ,3, 0,
"", "", new KafkaProgress());
routineLoadJob.setState(RoutineLoadJob.JobState.NEED_SCHEDULE);

View File

@ -39,6 +39,7 @@ import org.junit.Test;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import mockit.Deencapsulation;
import mockit.Expectations;
@ -64,7 +65,7 @@ public class RoutineLoadTaskSchedulerTest {
long beId = 100L;
Queue<RoutineLoadTaskInfo> routineLoadTaskInfoQueue = Queues.newLinkedBlockingQueue();
KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo("1", "1");
KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1), 1l);
routineLoadTaskInfo1.addKafkaPartition(1);
routineLoadTaskInfo1.addKafkaPartition(2);
routineLoadTaskInfoQueue.add(routineLoadTaskInfo1);
@ -116,7 +117,7 @@ public class RoutineLoadTaskSchedulerTest {
result = beId;
routineLoadManager.getJobByTaskId(anyString);
result = kafkaRoutineLoadJob1;
routineLoadManager.getJob(anyString);
routineLoadManager.getJob(anyLong);
result = kafkaRoutineLoadJob1;
}
};
@ -127,7 +128,7 @@ public class RoutineLoadTaskSchedulerTest {
//
// new Expectations() {
// {
// routineLoadTaskInfo1.createStreamLoadTask(anyLong);
// routineLoadTaskInfo1.createRoutineLoadTask(anyLong);
// result = kafkaRoutineLoadTask;
// }
// };

View File

@ -23,6 +23,7 @@ import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.common.UserException;
import org.apache.doris.task.StreamLoadTask;
import org.apache.doris.thrift.TStreamLoadPutRequest;
import org.apache.doris.thrift.TUniqueId;
@ -70,7 +71,8 @@ public class StreamLoadPlannerTest {
TStreamLoadPutRequest request = new TStreamLoadPutRequest();
request.setTxnId(1);
request.setLoadId(new TUniqueId(2, 3));
StreamLoadPlanner planner = new StreamLoadPlanner(db, destTable, request);
StreamLoadPlanner planner = new StreamLoadPlanner(db, destTable,
StreamLoadTask.fromTStreamLoadPutRequest(request));
planner.plan();
}
}

View File

@ -35,6 +35,7 @@ import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.task.StreamLoadTask;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TPlanNode;
@ -140,7 +141,8 @@ public class StreamLoadScanNodeTest {
}
TStreamLoadPutRequest request = getBaseRequest();
StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, request);
StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable,
StreamLoadTask.fromTStreamLoadPutRequest(request));
new Expectations() {{
dstTable.getBaseSchema(); result = columns;
}};
@ -174,7 +176,8 @@ public class StreamLoadScanNodeTest {
TStreamLoadPutRequest request = getBaseRequest();
request.setColumns("k1, k2, v1");
StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, request);
StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable,
StreamLoadTask.fromTStreamLoadPutRequest(request));
scanNode.init(analyzer);
scanNode.finalize(analyzer);
@ -203,7 +206,8 @@ public class StreamLoadScanNodeTest {
TStreamLoadPutRequest request = getBaseRequest();
request.setColumns("k1 k2 v1");
StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, request);
StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable,
StreamLoadTask.fromTStreamLoadPutRequest(request));
scanNode.init(analyzer);
scanNode.finalize(analyzer);
@ -249,7 +253,8 @@ public class StreamLoadScanNodeTest {
TStreamLoadPutRequest request = getBaseRequest();
request.setFileType(TFileType.FILE_LOCAL);
request.setColumns("k1,k2,v1, v2=k2");
StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, request);
StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable,
StreamLoadTask.fromTStreamLoadPutRequest(request));
scanNode.init(analyzer);
scanNode.finalize(analyzer);
@ -297,7 +302,8 @@ public class StreamLoadScanNodeTest {
TStreamLoadPutRequest request = getBaseRequest();
request.setFileType(TFileType.FILE_LOCAL);
request.setColumns("k1,k2, v1=hll_hash(k2)");
StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, request);
StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable,
StreamLoadTask.fromTStreamLoadPutRequest(request));
scanNode.init(analyzer);
scanNode.finalize(analyzer);
@ -345,7 +351,8 @@ public class StreamLoadScanNodeTest {
TStreamLoadPutRequest request = getBaseRequest();
request.setFileType(TFileType.FILE_LOCAL);
request.setColumns("k1,k2, v1=hll_hash1(k2)");
StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, request);
StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable,
StreamLoadTask.fromTStreamLoadPutRequest(request));
scanNode.init(analyzer);
scanNode.finalize(analyzer);
@ -375,7 +382,8 @@ public class StreamLoadScanNodeTest {
TStreamLoadPutRequest request = getBaseRequest();
request.setFileType(TFileType.FILE_LOCAL);
request.setColumns("k1,k2, v1=k2");
StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, request);
StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable,
StreamLoadTask.fromTStreamLoadPutRequest(request));
scanNode.init(analyzer);
scanNode.finalize(analyzer);
@ -405,7 +413,8 @@ public class StreamLoadScanNodeTest {
TStreamLoadPutRequest request = getBaseRequest();
request.setFileType(TFileType.FILE_BROKER);
request.setColumns("k1,k2,v1, v2=k2");
StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, request);
StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable,
StreamLoadTask.fromTStreamLoadPutRequest(request));
scanNode.init(analyzer);
scanNode.finalize(analyzer);
@ -434,7 +443,8 @@ public class StreamLoadScanNodeTest {
TStreamLoadPutRequest request = getBaseRequest();
request.setColumns("k1,k2,v1, v2=k3");
StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, request);
StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable,
StreamLoadTask.fromTStreamLoadPutRequest(request));
scanNode.init(analyzer);
scanNode.finalize(analyzer);
@ -480,7 +490,8 @@ public class StreamLoadScanNodeTest {
TStreamLoadPutRequest request = getBaseRequest();
request.setColumns("k1,k2,v1, v2=k1");
request.setWhere("k1 = 1");
StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, request);
StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable,
StreamLoadTask.fromTStreamLoadPutRequest(request));
scanNode.init(analyzer);
scanNode.finalize(analyzer);
@ -526,7 +537,8 @@ public class StreamLoadScanNodeTest {
TStreamLoadPutRequest request = getBaseRequest();
request.setColumns("k1,k2,v1, v2=k2");
request.setWhere("k1 1");
StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, request);
StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable,
StreamLoadTask.fromTStreamLoadPutRequest(request));
scanNode.init(analyzer);
scanNode.finalize(analyzer);
@ -556,7 +568,8 @@ public class StreamLoadScanNodeTest {
TStreamLoadPutRequest request = getBaseRequest();
request.setColumns("k1,k2,v1, v2=k1");
request.setWhere("k5 = 1");
StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, request);
StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable,
StreamLoadTask.fromTStreamLoadPutRequest(request));
scanNode.init(analyzer);
scanNode.finalize(analyzer);
@ -586,7 +599,8 @@ public class StreamLoadScanNodeTest {
TStreamLoadPutRequest request = getBaseRequest();
request.setColumns("k1,k2,v1, v2=k1");
request.setWhere("k1 + v2");
StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, request);
StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable,
StreamLoadTask.fromTStreamLoadPutRequest(request));
scanNode.init(analyzer);
scanNode.finalize(analyzer);