[refactor](remove unused code) remove log error hub (#16183)

---------

Co-authored-by: yiguolei <yiguolei@gmail.com>
This commit is contained in:
yiguolei
2023-01-30 16:53:56 +08:00
committed by GitHub
parent fdc042bb39
commit c59a8cb15d
29 changed files with 116 additions and 1480 deletions

View File

@ -21,7 +21,6 @@ import org.apache.doris.analysis.AddBackendClause;
import org.apache.doris.analysis.AddFollowerClause;
import org.apache.doris.analysis.AddObserverClause;
import org.apache.doris.analysis.AlterClause;
import org.apache.doris.analysis.AlterLoadErrorUrlClause;
import org.apache.doris.analysis.CancelAlterSystemStmt;
import org.apache.doris.analysis.CancelStmt;
import org.apache.doris.analysis.DecommissionBackendClause;
@ -167,9 +166,6 @@ public class SystemHandler extends AlterHandler {
} else if (alterClause instanceof ModifyBrokerClause) {
ModifyBrokerClause clause = (ModifyBrokerClause) alterClause;
Env.getCurrentEnv().getBrokerMgr().execute(clause);
} else if (alterClause instanceof AlterLoadErrorUrlClause) {
AlterLoadErrorUrlClause clause = (AlterLoadErrorUrlClause) alterClause;
Env.getCurrentEnv().getLoadInstance().setLoadErrorHubInfo(clause.getProperties());
} else if (alterClause instanceof ModifyBackendClause) {
Env.getCurrentSystemInfo().modifyBackends(((ModifyBackendClause) alterClause));
} else {

View File

@ -20,9 +20,7 @@ package org.apache.doris.analysis;
import org.apache.doris.alter.AlterOpType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.load.LoadErrorHub;
import com.google.common.base.Strings;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -35,7 +33,6 @@ public class AlterLoadErrorUrlClause extends AlterClause {
private static final Logger LOG = LogManager.getLogger(AlterLoadErrorUrlClause.class);
private Map<String, String> properties;
private LoadErrorHub.Param param;
public AlterLoadErrorUrlClause(Map<String, String> properties) {
super(AlterOpType.ALTER_OTHER);
@ -49,18 +46,7 @@ public class AlterLoadErrorUrlClause extends AlterClause {
@Override
public void analyze(Analyzer analyzer) throws AnalysisException {
if (properties == null || properties.isEmpty()) {
throw new AnalysisException("Load errors hub's properties are missing");
}
String type = properties.get("type");
if (Strings.isNullOrEmpty(type)) {
throw new AnalysisException("Load errors hub's type is missing");
}
if (!type.equalsIgnoreCase("MYSQL") && !type.equalsIgnoreCase("BROKER")) {
throw new AnalysisException("Load errors hub's type should be MYSQL or BROKER");
}
throw new AnalysisException("Load errors hub is not supported");
}
@Override

View File

@ -149,7 +149,6 @@ import org.apache.doris.load.ExportChecker;
import org.apache.doris.load.ExportJob;
import org.apache.doris.load.ExportMgr;
import org.apache.doris.load.Load;
import org.apache.doris.load.LoadErrorHub;
import org.apache.doris.load.StreamLoadRecordMgr;
import org.apache.doris.load.loadv2.LoadEtlChecker;
import org.apache.doris.load.loadv2.LoadJobScheduler;
@ -1712,62 +1711,6 @@ public class Env {
return getInternalCatalog().loadDb(dis, checksum);
}
public long saveLoadJob(CountingDataOutputStream dos, long checksum) throws IOException {
// 1. save load.dbToLoadJob, force set to 0 since there should be none load jobs
int jobSize = 0;
checksum ^= jobSize;
dos.writeInt(jobSize);
// 2. save delete jobs
// delete jobs are moved to DeleteHandler. So here we just set job size as 0.
jobSize = 0;
checksum ^= jobSize;
dos.writeInt(jobSize);
// 3. load error hub info
LoadErrorHub.Param param = load.getLoadErrorHubInfo();
param.write(dos);
// 4. save delete load job info
// delete jobs are moved to DeleteHandler. So here we just set job size as 0.
int deleteJobSize = 0;
checksum ^= deleteJobSize;
dos.writeInt(deleteJobSize);
return checksum;
}
public long loadLoadJob(DataInputStream dis, long checksum) throws IOException, DdlException {
// load jobs
int jobSize = dis.readInt();
long newChecksum = checksum ^ jobSize;
if (jobSize > 0) {
LOG.warn("there should be no load jobs, please rollback to 1.2.x and check if there are hadoop load jobs");
throw new RuntimeException("there should be no load jobs, please rollback to 1.2.x "
+ "and check if there are hadoop load jobs");
}
// delete jobs
// Delete job has been moved to DeleteHandler. Here the jobSize is always 0, we need do nothing.
jobSize = dis.readInt();
Preconditions.checkState(jobSize == 0, jobSize);
newChecksum ^= jobSize;
// load error hub info
LoadErrorHub.Param param = new LoadErrorHub.Param();
param.readFields(dis);
load.setLoadErrorHubInfo(param);
// 4. load delete jobs
// Delete job has been moved to DeleteHandler. Here the jobSize is always 0, we need do nothing.
int deleteJobSize = dis.readInt();
Preconditions.checkState(deleteJobSize == 0, deleteJobSize);
newChecksum ^= deleteJobSize;
LOG.info("finished replay loadJob from image");
return newChecksum;
}
public long loadExportJob(DataInputStream dis, long checksum) throws IOException, DdlException {
long curTime = System.currentTimeMillis();
long newChecksum = checksum;

View File

@ -1,47 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.proc;
import org.apache.doris.catalog.Env;
import org.apache.doris.load.LoadErrorHub;
import com.google.common.collect.ImmutableList;
public class LoadErrorHubProcNode implements ProcNodeInterface {
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
.add("Type").add("Properties")
.build();
private Env env;
public LoadErrorHubProcNode(Env env) {
this.env = env;
}
@Override
public ProcResult fetchResult() {
BaseProcResult result = new BaseProcResult();
result.setNames(TITLE_NAMES);
LoadErrorHub.Param param = env.getLoadInstance().getLoadErrorHubInfo();
if (param != null) {
result.addRow(param.getInfo());
}
return result;
}
}

View File

@ -45,7 +45,6 @@ public final class ProcService {
root.register("frontends", new FrontendsProcNode(Env.getCurrentEnv()));
root.register("brokers", Env.getCurrentEnv().getBrokerMgr().getProcNode());
root.register("resources", Env.getCurrentEnv().getResourceMgr().getProcNode());
root.register("load_error_hub", new LoadErrorHubProcNode(Env.getCurrentEnv()));
root.register("transactions", new TransDbProcDir());
root.register("trash", new TrashProcDir());
root.register("monitor", new MonitorProcDir());

View File

@ -1,127 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FsBroker;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.thrift.TBrokerErrorHubInfo;
import org.apache.doris.thrift.TNetworkAddress;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import java.util.Map;
public class BrokerLoadErrorHub extends LoadErrorHub {
private BrokerParam brokerParam;
public BrokerLoadErrorHub(BrokerParam brokerParam) {
this.brokerParam = brokerParam;
}
public BrokerParam getBrokerParam() {
return brokerParam;
}
public static class BrokerParam implements Writable {
private String brokerName;
private String path;
private Map<String, String> prop = Maps.newHashMap();
// for persist
public BrokerParam() {
}
public BrokerParam(String brokerName, String path, Map<String, String> prop) {
this.brokerName = brokerName;
this.path = path;
this.prop = prop;
}
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, brokerName);
Text.writeString(out, path);
out.writeInt(prop.size());
for (Map.Entry<String, String> entry : prop.entrySet()) {
Text.writeString(out, entry.getKey());
Text.writeString(out, entry.getValue());
}
}
public void readFields(DataInput in) throws IOException {
brokerName = Text.readString(in);
path = Text.readString(in);
int size = in.readInt();
for (int i = 0; i < size; i++) {
String key = Text.readString(in);
String val = Text.readString(in);
prop.put(key, val);
}
}
public TBrokerErrorHubInfo toThrift() {
FsBroker fsBroker = Env.getCurrentEnv().getBrokerMgr().getAnyBroker(brokerName);
if (fsBroker == null) {
return null;
}
TBrokerErrorHubInfo info = new TBrokerErrorHubInfo(new TNetworkAddress(fsBroker.ip, fsBroker.port),
path, prop);
return info;
}
public String getBrief() {
Map<String, String> briefMap = Maps.newHashMap(prop);
briefMap.put("name", brokerName);
briefMap.put("path", path);
PrintableMap<String, String> printableMap = new PrintableMap<>(briefMap, "=", true, false, true);
return printableMap.toString();
}
}
// Broker load error hub does not support showing detail error msg in 'show load warnings' stmt.
// User need to file error file in remote storage with file name showed in 'show load' stmt
@Override
public List<ErrorMsg> fetchLoadError(long jobId) {
List<ErrorMsg> result = Lists.newArrayList();
final String hint = "Find detail load error info on '"
+ brokerParam.path + "' with file name showed in 'SHOW LOAD' stmt";
ErrorMsg errorMsg = new ErrorMsg(0, hint);
result.add(errorMsg);
return result;
}
@Override
public boolean prepare() {
return true;
}
@Override
public boolean close() {
return true;
}
}

View File

@ -33,11 +33,8 @@ import org.apache.doris.analysis.NullLiteral;
import org.apache.doris.analysis.PartitionNames;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.backup.BlobStorage;
import org.apache.doris.backup.Status;
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
@ -129,8 +126,6 @@ public class Load {
// dbId -> set of (label, timestamp)
private Map<Long, Map<String, Long>> dbToMiniLabels; // db to mini uncommitted label
private volatile LoadErrorHub.Param loadErrorHubParam = new LoadErrorHub.Param();
// lock for load job
// lock is private and must use after db lock
private ReentrantReadWriteLock lock;
@ -1596,83 +1591,6 @@ public class Load {
return infos;
}
public LoadErrorHub.Param getLoadErrorHubInfo() {
return loadErrorHubParam;
}
public void setLoadErrorHubInfo(LoadErrorHub.Param info) {
this.loadErrorHubParam = info;
}
public void setLoadErrorHubInfo(Map<String, String> properties) throws DdlException {
String type = properties.get("type");
if (type.equalsIgnoreCase("MYSQL")) {
String host = properties.get("host");
if (Strings.isNullOrEmpty(host)) {
throw new DdlException("mysql host is missing");
}
int port = -1;
try {
port = Integer.valueOf(properties.get("port"));
} catch (NumberFormatException e) {
throw new DdlException("invalid mysql port: " + properties.get("port"));
}
String user = properties.get("user");
if (Strings.isNullOrEmpty(user)) {
throw new DdlException("mysql user name is missing");
}
String db = properties.get("database");
if (Strings.isNullOrEmpty(db)) {
throw new DdlException("mysql database is missing");
}
String tbl = properties.get("table");
if (Strings.isNullOrEmpty(tbl)) {
throw new DdlException("mysql table is missing");
}
String pwd = Strings.nullToEmpty(properties.get("password"));
MysqlLoadErrorHub.MysqlParam param = new MysqlLoadErrorHub.MysqlParam(host, port, user, pwd, db, tbl);
loadErrorHubParam = LoadErrorHub.Param.createMysqlParam(param);
} else if (type.equalsIgnoreCase("BROKER")) {
String brokerName = properties.get("name");
if (Strings.isNullOrEmpty(brokerName)) {
throw new DdlException("broker name is missing");
}
properties.remove("name");
if (!Env.getCurrentEnv().getBrokerMgr().containsBroker(brokerName)) {
throw new DdlException("broker does not exist: " + brokerName);
}
String path = properties.get("path");
if (Strings.isNullOrEmpty(path)) {
throw new DdlException("broker path is missing");
}
properties.remove("path");
// check if broker info is invalid
BlobStorage blobStorage = BlobStorage.create(brokerName, StorageBackend.StorageType.BROKER, properties);
Status st = blobStorage.checkPathExist(path);
if (!st.ok()) {
throw new DdlException("failed to visit path: " + path + ", err: " + st.getErrMsg());
}
BrokerLoadErrorHub.BrokerParam param = new BrokerLoadErrorHub.BrokerParam(brokerName, path, properties);
loadErrorHubParam = LoadErrorHub.Param.createBrokerParam(param);
} else if (type.equalsIgnoreCase("null")) {
loadErrorHubParam = LoadErrorHub.Param.createNullParam();
}
Env.getCurrentEnv().getEditLog().logSetLoadErrorHub(loadErrorHubParam);
LOG.info("set load error hub info: {}", loadErrorHubParam);
}
public static class JobInfo {
public String dbName;
public Set<String> tblNames = Sets.newHashSet();

View File

@ -19,25 +19,113 @@ package org.apache.doris.load;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.thrift.TErrorHubType;
import org.apache.doris.thrift.TLoadErrorHubInfo;
import org.apache.doris.common.util.PrintableMap;
import com.google.common.base.MoreObjects;
import com.google.common.base.MoreObjects.ToStringHelper;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import java.util.Map;
public abstract class LoadErrorHub {
private static final Logger LOG = LogManager.getLogger(LoadErrorHub.class);
public static class MysqlParam implements Writable {
private String host;
private int port;
private String user;
private String passwd;
private String db;
private String table;
public MysqlParam() {
host = "";
port = 0;
user = "";
passwd = "";
db = "";
table = "";
}
public MysqlParam(String host, int port, String user, String passwd, String db, String table) {
this.host = host;
this.port = port;
this.user = user;
this.passwd = passwd;
this.db = db;
this.table = table;
}
public String getBrief() {
Map<String, String> briefMap = Maps.newHashMap();
briefMap.put("host", host);
briefMap.put("port", String.valueOf(port));
briefMap.put("user", user);
briefMap.put("password", passwd);
briefMap.put("database", db);
briefMap.put("table", table);
PrintableMap<String, String> printableMap = new PrintableMap<>(briefMap, "=", true, false, true);
return printableMap.toString();
}
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, host);
out.writeInt(port);
Text.writeString(out, user);
Text.writeString(out, passwd);
Text.writeString(out, db);
Text.writeString(out, table);
}
public void readFields(DataInput in) throws IOException {
host = Text.readString(in);
port = in.readInt();
user = Text.readString(in);
passwd = Text.readString(in);
db = Text.readString(in);
table = Text.readString(in);
}
}
public static class BrokerParam implements Writable {
private String brokerName;
private String path;
private Map<String, String> prop = Maps.newHashMap();
// for persist
public BrokerParam() {
}
public BrokerParam(String brokerName, String path, Map<String, String> prop) {
this.brokerName = brokerName;
this.path = path;
this.prop = prop;
}
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, brokerName);
Text.writeString(out, path);
out.writeInt(prop.size());
for (Map.Entry<String, String> entry : prop.entrySet()) {
Text.writeString(out, entry.getKey());
Text.writeString(out, entry.getValue());
}
}
public void readFields(DataInput in) throws IOException {
brokerName = Text.readString(in);
path = Text.readString(in);
int size = in.readInt();
for (int i = 0; i < size; i++) {
String key = Text.readString(in);
String val = Text.readString(in);
prop.put(key, val);
}
}
}
public static final String MYSQL_PROTOCOL = "MYSQL";
public static final String BROKER_PROTOCOL = "BROKER";
@ -48,137 +136,16 @@ public abstract class LoadErrorHub {
NULL_TYPE
}
public class ErrorMsg {
private long jobId;
private String msg;
public ErrorMsg(long id, String message) {
jobId = id;
msg = message;
}
public long getJobId() {
return jobId;
}
public String getMsg() {
return msg;
}
}
public static class Param implements Writable {
private HubType type;
private MysqlLoadErrorHub.MysqlParam mysqlParam;
private BrokerLoadErrorHub.BrokerParam brokerParam;
private MysqlParam mysqlParam;
private BrokerParam brokerParam;
// for replay
public Param() {
type = HubType.NULL_TYPE;
}
public static Param createMysqlParam(MysqlLoadErrorHub.MysqlParam mysqlParam) {
Param param = new Param();
param.type = HubType.MYSQL_TYPE;
param.mysqlParam = mysqlParam;
return param;
}
public static Param createBrokerParam(BrokerLoadErrorHub.BrokerParam brokerParam) {
Param param = new Param();
param.type = HubType.BROKER_TYPE;
param.brokerParam = brokerParam;
return param;
}
public static Param createNullParam() {
Param param = new Param();
param.type = HubType.NULL_TYPE;
return param;
}
public HubType getType() {
return type;
}
public MysqlLoadErrorHub.MysqlParam getMysqlParam() {
return mysqlParam;
}
public BrokerLoadErrorHub.BrokerParam getBrokerParam() {
return brokerParam;
}
public String toString() {
ToStringHelper helper = MoreObjects.toStringHelper(this);
helper.add("type", type.toString());
switch (type) {
case MYSQL_TYPE:
helper.add("mysql_info", mysqlParam.toString());
break;
case NULL_TYPE:
helper.add("mysql_info", "null");
break;
default:
Preconditions.checkState(false, "unknown hub type");
}
return helper.toString();
}
public TLoadErrorHubInfo toThrift() {
TLoadErrorHubInfo info = new TLoadErrorHubInfo();
switch (type) {
case MYSQL_TYPE:
info.setType(TErrorHubType.MYSQL);
info.setMysqlInfo(mysqlParam.toThrift());
break;
case BROKER_TYPE:
info.setType(TErrorHubType.BROKER);
info.setBrokerInfo(brokerParam.toThrift());
break;
case NULL_TYPE:
info.setType(TErrorHubType.NULL_TYPE);
break;
default:
Preconditions.checkState(false, "unknown hub type");
}
return info;
}
public Map<String, Object> toDppConfigInfo() {
Map<String, Object> dppHubInfo = Maps.newHashMap();
dppHubInfo.put("type", type.toString());
switch (type) {
case MYSQL_TYPE:
dppHubInfo.put("info", mysqlParam);
break;
case BROKER_TYPE:
Preconditions.checkState(false, "hadoop load do not support broker error hub");
break;
case NULL_TYPE:
break;
default:
Preconditions.checkState(false, "unknown hub type");
}
return dppHubInfo;
}
public List<String> getInfo() {
List<String> info = Lists.newArrayList();
info.add(type.name());
switch (type) {
case MYSQL_TYPE:
info.add(mysqlParam.getBrief());
break;
case BROKER_TYPE:
info.add(brokerParam.getBrief());
break;
default:
info.add("");
}
return info;
}
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, type.name());
@ -200,11 +167,11 @@ public abstract class LoadErrorHub {
type = HubType.valueOf(Text.readString(in));
switch (type) {
case MYSQL_TYPE:
mysqlParam = new MysqlLoadErrorHub.MysqlParam();
mysqlParam = new MysqlParam();
mysqlParam.readFields(in);
break;
case BROKER_TYPE:
brokerParam = new BrokerLoadErrorHub.BrokerParam();
brokerParam = new BrokerParam();
brokerParam.readFields(in);
break;
case NULL_TYPE:
@ -214,29 +181,4 @@ public abstract class LoadErrorHub {
}
}
}
public abstract List<ErrorMsg> fetchLoadError(long jobId);
public abstract boolean prepare();
public abstract boolean close();
public static LoadErrorHub createHub(Param param) {
switch (param.getType()) {
case MYSQL_TYPE: {
LoadErrorHub hub = new MysqlLoadErrorHub(param.getMysqlParam());
hub.prepare();
return hub;
}
case BROKER_TYPE: {
LoadErrorHub hub = new BrokerLoadErrorHub(param.getBrokerParam());
hub.prepare();
return hub;
}
default:
Preconditions.checkState(false, "unknown hub type");
}
return null;
}
}

View File

@ -1,210 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.MysqlUtil;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.thrift.TMysqlErrorHubInfo;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
public class MysqlLoadErrorHub extends LoadErrorHub {
private static final Logger LOG = LogManager.getLogger(MysqlLoadErrorHub.class);
private static final String QUERY_SQL_FIRST = "SELECT job_id, error_msg FROM ";
private static final String QUERY_SQL_LAST = " WHERE job_id = ? LIMIT ? ";
private static final long MAX_LINE = 10;
private static final int STMT_TIMEOUT_S = 5;
private MysqlParam param;
public static class MysqlParam implements Writable {
private String host;
private int port;
private String user;
private String passwd;
private String db;
private String table;
public MysqlParam() {
host = "";
port = 0;
user = "";
passwd = "";
db = "";
table = "";
}
public MysqlParam(String host, int port, String user, String passwd, String db, String table) {
this.host = host;
this.port = port;
this.user = user;
this.passwd = passwd;
this.db = db;
this.table = table;
}
public String getBrief() {
Map<String, String> briefMap = Maps.newHashMap();
briefMap.put("host", host);
briefMap.put("port", String.valueOf(port));
briefMap.put("user", user);
briefMap.put("password", passwd);
briefMap.put("database", db);
briefMap.put("table", table);
PrintableMap<String, String> printableMap = new PrintableMap<>(briefMap, "=", true, false, true);
return printableMap.toString();
}
public String getHost() {
return host;
}
public int getPort() {
return port;
}
public String getUser() {
return user;
}
public String getPasswd() {
return passwd;
}
public String getDb() {
return db;
}
public String getTable() {
return table;
}
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, host);
out.writeInt(port);
Text.writeString(out, user);
Text.writeString(out, passwd);
Text.writeString(out, db);
Text.writeString(out, table);
}
public void readFields(DataInput in) throws IOException {
host = Text.readString(in);
port = in.readInt();
user = Text.readString(in);
passwd = Text.readString(in);
db = Text.readString(in);
table = Text.readString(in);
}
public TMysqlErrorHubInfo toThrift() {
TMysqlErrorHubInfo info = new TMysqlErrorHubInfo(host, port, user, passwd, db, table);
return info;
}
}
public MysqlLoadErrorHub(MysqlParam mysqlParam) {
Preconditions.checkNotNull(mysqlParam);
param = mysqlParam;
}
@Override
public List<ErrorMsg> fetchLoadError(long jobId) {
List<ErrorMsg> result = Lists.newArrayList();
Connection conn = null;
PreparedStatement stmt = null;
ResultSet resultSet = null;
conn = MysqlUtil.getConnection(
param.getHost(), param.getPort(), param.getDb(),
param.getUser(), param.getPasswd());
if (conn == null) {
return result;
}
String sql = null;
try {
sql = QUERY_SQL_FIRST + param.getTable() + QUERY_SQL_LAST;
stmt = conn.prepareStatement(sql);
stmt.setLong(1, jobId);
stmt.setLong(2, MAX_LINE);
stmt.setQueryTimeout(STMT_TIMEOUT_S);
resultSet = stmt.executeQuery();
while (resultSet.next()) {
String msg = resultSet.getString("error_msg");
result.add(new ErrorMsg(jobId, msg));
}
} catch (SQLException e) {
LOG.warn("fail to query load error mysql. "
+ "sql={}, table={}, jobId={}, max_line={}, exception={}",
sql, param.getTable(), jobId, MAX_LINE, e);
} finally {
if (resultSet != null) {
try {
resultSet.close();
} catch (SQLException sqlEx) {
LOG.warn("fail to close resultSet of load error.");
}
resultSet = null;
}
if (stmt != null) {
try {
stmt.close();
} catch (SQLException sqlEx) {
LOG.warn("fail to close stmt.");
}
stmt = null;
}
MysqlUtil.closeConnection(conn);
conn = null;
}
return result;
}
@Override
public boolean prepare() {
return true;
}
@Override
public boolean close() {
return true;
}
}

View File

@ -528,8 +528,8 @@ public class EditLog {
break;
}
case OperationType.OP_SET_LOAD_ERROR_HUB: {
final LoadErrorHub.Param param = (LoadErrorHub.Param) journal.getData();
env.getLoadInstance().setLoadErrorHubInfo(param);
// final LoadErrorHub.Param param = (LoadErrorHub.Param) journal.getData();
// ignore load error hub
break;
}
case OperationType.OP_UPDATE_CLUSTER_AND_BACKENDS: {

View File

@ -71,12 +71,6 @@ public class MetaPersistMethod {
metaPersistMethod.writeMethod =
Env.class.getDeclaredMethod("saveDb", CountingDataOutputStream.class, long.class);
break;
case "loadJob":
metaPersistMethod.readMethod =
Env.class.getDeclaredMethod("loadLoadJob", DataInputStream.class, long.class);
metaPersistMethod.writeMethod =
Env.class.getDeclaredMethod("saveLoadJob", CountingDataOutputStream.class, long.class);
break;
case "alterJob":
metaPersistMethod.readMethod =
Env.class.getDeclaredMethod("loadAlterJob", DataInputStream.class, long.class);

View File

@ -81,11 +81,21 @@ public class MetaReader {
checksum = env.loadHeader(dis, metaHeader, checksum);
// 3. Read other meta modules
// Modules must be read in the order in which the metadata was written
for (MetaIndex metaIndex : metaFooter.metaIndices) {
for (int i = 0; i < metaFooter.metaIndices.size(); ++i) {
MetaIndex metaIndex = metaFooter.metaIndices.get(i);
if (metaIndex.name.equals("header")) {
// skip meta header, which has been read before.
continue;
}
// Should skip some bytes because ignore some meta, such as load job
if (metaIndex.name.equals("loadJob")) {
LOG.info("This is {}, skip {} bytes", metaIndex.name,
metaFooter.metaIndices.get(i + 1).offset - metaIndex.offset);
if (i < metaFooter.metaIndices.size() - 1) {
IOUtils.skipFully(dis, metaFooter.metaIndices.get(i + 1).offset - metaIndex.offset);
}
continue;
}
MetaPersistMethod persistMethod = PersistMetaModules.MODULES_MAP.get(metaIndex.name);
if (persistMethod == null) {
throw new IOException("Unknown meta module: " + metaIndex.name + ". Known modules: "

View File

@ -35,7 +35,7 @@ public class PersistMetaModules {
public static final List<MetaPersistMethod> MODULES_IN_ORDER;
public static final ImmutableList<String> MODULE_NAMES = ImmutableList.of(
"masterInfo", "frontends", "backends", "datasource", "db", "loadJob", "alterJob", "recycleBin",
"masterInfo", "frontends", "backends", "datasource", "db", "alterJob", "recycleBin",
"globalVariable", "cluster", "broker", "resources", "exportJob", "syncJob", "backupHandler",
"paloAuth", "transactionState", "colocateTableIndex", "routineLoadJobs", "loadJobV2", "smallFiles",
"plugins", "deleteHandler", "sqlBlockRule", "policy", "mtmvJobManager", "cooldownJob");

View File

@ -43,7 +43,6 @@ import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.load.LoadErrorHub;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.planner.external.ExternalFileScanNode;
@ -53,7 +52,6 @@ import org.apache.doris.thrift.PaloInternalServiceVersion;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TLoadErrorHubInfo;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPlanFragmentExecParams;
import org.apache.doris.thrift.TQueryGlobals;
@ -266,15 +264,6 @@ public class StreamLoadPlanner {
params.setQueryGlobals(queryGlobals);
// set load error hub if exist
LoadErrorHub.Param param = Env.getCurrentEnv().getLoadInstance().getLoadErrorHubInfo();
if (param != null) {
TLoadErrorHubInfo info = param.toThrift();
if (info != null) {
params.setLoadErrorHubInfo(info);
}
}
// LOG.debug("stream load txn id: {}, plan: {}", streamLoadTask.getTxnId(), params);
return params;
}

View File

@ -37,7 +37,6 @@ import org.apache.doris.common.util.ProfileWriter;
import org.apache.doris.common.util.RuntimeProfile;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.VectorizedUtil;
import org.apache.doris.load.LoadErrorHub;
import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.planner.DataPartition;
@ -78,7 +77,6 @@ import org.apache.doris.thrift.TErrorTabletInfo;
import org.apache.doris.thrift.TEsScanRange;
import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TExecPlanFragmentParamsList;
import org.apache.doris.thrift.TLoadErrorHubInfo;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPaloScanRange;
import org.apache.doris.thrift.TPlanFragmentDestination;
@ -2482,15 +2480,6 @@ public class Coordinator {
rf.getFilterId().asInt(), rf.toThrift());
}
}
if (queryOptions.getQueryType() == TQueryType.LOAD) {
LoadErrorHub.Param param = Env.getCurrentEnv().getLoadInstance().getLoadErrorHubInfo();
if (param != null) {
TLoadErrorHubInfo info = param.toThrift();
if (info != null) {
params.setLoadErrorHubInfo(info);
}
}
}
paramsList.add(params);
}
return paramsList;

View File

@ -172,8 +172,6 @@ import org.apache.doris.load.DeleteHandler;
import org.apache.doris.load.ExportJob;
import org.apache.doris.load.ExportMgr;
import org.apache.doris.load.Load;
import org.apache.doris.load.LoadErrorHub;
import org.apache.doris.load.LoadErrorHub.HubType;
import org.apache.doris.load.LoadJob;
import org.apache.doris.load.LoadJob.JobState;
import org.apache.doris.load.routineload.RoutineLoadJob;
@ -1240,24 +1238,7 @@ public class ShowExecutor {
}
}
}
LoadErrorHub.Param param = load.getLoadErrorHubInfo();
if (param == null || param.getType() == HubType.NULL_TYPE) {
throw new AnalysisException("no load error hub be supplied.");
}
LoadErrorHub errorHub = LoadErrorHub.createHub(param);
List<LoadErrorHub.ErrorMsg> errors = errorHub.fetchLoadError(jobId);
errorHub.close();
List<List<String>> rows = Lists.newArrayList();
for (LoadErrorHub.ErrorMsg error : errors) {
List<String> oneInfo = Lists.newArrayList();
oneInfo.add(String.valueOf(jobId));
oneInfo.add(label);
oneInfo.add(error.getMsg());
rows.add(oneInfo);
}
long limit = showWarningsStmt.getLimitNum();
if (limit != -1L && limit < rows.size()) {
rows = rows.subList(0, (int) limit);