From 145683ccdbc9ad37f23b2a8b91888f7b24aaa688 Mon Sep 17 00:00:00 2001 From: huanghaibin <284824253@qq.com> Date: Sun, 24 Dec 2023 21:17:39 +0800 Subject: [PATCH] [improvement](group commit) make get column function more reliable when replaying wal (#28900) --- be/src/olap/wal_table.cpp | 46 +++++++++++-------- be/src/olap/wal_table.h | 1 + .../doris/service/FrontendServiceImpl.java | 38 ++++++++------- gensrc/thrift/FrontendService.thrift | 7 ++- 4 files changed, 53 insertions(+), 39 deletions(-) diff --git a/be/src/olap/wal_table.cpp b/be/src/olap/wal_table.cpp index bde0e8dd69..7f98c410b0 100644 --- a/be/src/olap/wal_table.cpp +++ b/be/src/olap/wal_table.cpp @@ -204,7 +204,17 @@ Status WalTable::_replay_wal_internal(const std::string& wal) { if (!st.ok()) { LOG(WARNING) << "abort txn " << wal_id << " fail"; } - RETURN_IF_ERROR(_get_column_info(_db_id, _table_id)); + auto get_st = _get_column_info(_db_id, _table_id); + if (!get_st.ok()) { + if (get_st.is()) { + { + std::lock_guard lock(_replay_wal_lock); + _replay_wal_map.erase(wal); + } + RETURN_IF_ERROR(_delete_wal(wal_id)); + } + return get_st; + } #endif RETURN_IF_ERROR(_send_request(wal_id, wal, label)); return Status::OK(); @@ -354,8 +364,7 @@ Status WalTable::_send_request(int64_t wal_id, const std::string& wal, const std } } else { LOG(INFO) << "success to replay wal =" << wal; - RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(wal_id)); - RETURN_IF_ERROR(_exec_env->wal_mgr()->erase_wal_status_queue(_table_id, wal_id)); + RETURN_IF_ERROR(_delete_wal(wal_id)); std::lock_guard lock(_replay_wal_lock); if (_replay_wal_map.erase(wal)) { LOG(INFO) << "erase " << wal << " from _replay_wal_map"; @@ -414,26 +423,21 @@ Status WalTable::_get_column_info(int64_t db_id, int64_t tb_id) { [&request, &result](FrontendServiceConnection& client) { client->getColumnInfo(result, request); })); - std::string columns_str = result.column_info; - std::vector column_element; - doris::vectorized::WalReader::string_split(columns_str, ",", column_element); + status = Status::create(result.status); + if (!status.ok()) { + return status; + } + std::vector column_element = result.columns; int64_t column_index = 1; _column_id_name_map.clear(); _column_id_index_map.clear(); for (auto column : column_element) { - auto pos = column.find(":"); - try { - auto column_name = column.substr(0, pos); - int64_t column_id = std::strtoll(column.substr(pos + 1).c_str(), NULL, 10); - _column_id_name_map.emplace(column_id, column_name); - _column_id_index_map.emplace(column_id, column_index); - column_index++; - } catch (const std::invalid_argument& e) { - return Status::InvalidArgument("Invalid format, {}", e.what()); - } + auto column_name = column.columnName; + auto column_id = column.columnId; + _column_id_name_map.emplace(column_id, column_name); + _column_id_index_map.emplace(column_id, column_index); + column_index++; } - - status = Status::create(result.status); } return status; } @@ -447,4 +451,10 @@ Status WalTable::_read_wal_header(const std::string& wal_path, std::string& colu return Status::OK(); } +Status WalTable::_delete_wal(int64_t wal_id) { + RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(wal_id)); + RETURN_IF_ERROR(_exec_env->wal_mgr()->erase_wal_status_queue(_table_id, wal_id)); + return Status::OK(); +} + } // namespace doris \ No newline at end of file diff --git a/be/src/olap/wal_table.h b/be/src/olap/wal_table.h index 354f4f16b0..e3d66d577a 100644 --- a/be/src/olap/wal_table.h +++ b/be/src/olap/wal_table.h @@ -50,6 +50,7 @@ private: Status _read_wal_header(const std::string& wal, std::string& columns); bool _need_replay(const replay_wal_info& info); Status _replay_wal_internal(const std::string& wal); + Status _delete_wal(int64_t wal_id); private: ExecEnv* _exec_env; diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 4f88d22c83..75405854ca 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -114,6 +114,7 @@ import org.apache.doris.thrift.TCheckAuthRequest; import org.apache.doris.thrift.TCheckAuthResult; import org.apache.doris.thrift.TColumnDef; import org.apache.doris.thrift.TColumnDesc; +import org.apache.doris.thrift.TColumnInfo; import org.apache.doris.thrift.TCommitTxnRequest; import org.apache.doris.thrift.TCommitTxnResult; import org.apache.doris.thrift.TConfirmUnusedRemoteFilesRequest; @@ -240,7 +241,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; @@ -3280,40 +3280,38 @@ public class FrontendServiceImpl implements FrontendService.Iface { @Override public TGetColumnInfoResult getColumnInfo(TGetColumnInfoRequest request) { TGetColumnInfoResult result = new TGetColumnInfoResult(); - TStatus errorStatus = new TStatus(TStatusCode.RUNTIME_ERROR); + TStatus status = new TStatus(TStatusCode.OK); + result.setStatus(status); long dbId = request.getDbId(); long tableId = request.getTableId(); if (!Env.getCurrentEnv().isMaster()) { - errorStatus.setStatusCode(TStatusCode.NOT_MASTER); - errorStatus.addToErrorMsgs(NOT_MASTER_ERR_MSG); + status.setStatusCode(TStatusCode.NOT_MASTER); + status.addToErrorMsgs(NOT_MASTER_ERR_MSG); LOG.error("failed to getColumnInfo: {}", NOT_MASTER_ERR_MSG); return result; } - Database db = Env.getCurrentEnv().getInternalCatalog().getDbNullable(dbId); + Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); if (db == null) { - errorStatus.setErrorMsgs(Lists.newArrayList(String.format("dbId=%d is not exists", dbId))); - result.setStatus(errorStatus); + status.setStatusCode(TStatusCode.NOT_FOUND); + status.setErrorMsgs(Lists.newArrayList(String.format("dbId=%d is not exists", dbId))); return result; } - - Table table; - try { - table = db.getTable(tableId).get(); - } catch (NoSuchElementException e) { - errorStatus.setErrorMsgs( + Table table = db.getTableNullable(tableId); + if (table == null) { + status.setStatusCode(TStatusCode.NOT_FOUND); + status.setErrorMsgs( (Lists.newArrayList(String.format("dbId=%d tableId=%d is not exists", dbId, tableId)))); - result.setStatus(errorStatus); return result; } - StringBuilder sb = new StringBuilder(); + List columnsResult = Lists.newArrayList(); for (Column column : table.getBaseSchema(true)) { - sb.append(column.getName() + ":" + column.getUniqueId() + ","); + final TColumnInfo info = new TColumnInfo(); + info.setColumnName(column.getName()); + info.setColumnId(column.getUniqueId()); + columnsResult.add(info); } - String columnInfo = sb.toString(); - columnInfo = columnInfo.substring(0, columnInfo.length() - 1); - result.setStatus(new TStatus(TStatusCode.OK)); - result.setColumnInfo(columnInfo); + result.setColumns(columnsResult); return result; } diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index d8a53d3566..d672597a0d 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1302,6 +1302,11 @@ struct TGetBackendMetaResult { 3: optional Types.TNetworkAddress master_address } +struct TColumnInfo { + 1: optional string columnName + 2: optional i64 columnId +} + struct TGetColumnInfoRequest { 1: optional i64 db_id 2: optional i64 table_id @@ -1309,7 +1314,7 @@ struct TGetColumnInfoRequest { struct TGetColumnInfoResult { 1: optional Status.TStatus status - 2: optional string column_info + 2: optional list columns } service FrontendService {