diff --git a/be/src/exec/broker_writer.cpp b/be/src/exec/broker_writer.cpp index f894b44c93..e7305cceeb 100644 --- a/be/src/exec/broker_writer.cpp +++ b/be/src/exec/broker_writer.cpp @@ -182,9 +182,9 @@ Status BrokerWriter::write(const uint8_t* buf, size_t buf_len, size_t* written_l return Status::OK(); } -void BrokerWriter::close() { +Status BrokerWriter::close() { if (_is_closed) { - return; + return Status::OK(); } TBrokerCloseWriterRequest request; @@ -198,40 +198,49 @@ void BrokerWriter::close() { TBrokerOperationStatus response; try { Status status; - BrokerServiceConnection client(client_cache(_env), broker_addr, 10000, &status); + // use 20 second because close may take longer in remote storage, sometimes. + // TODO(cmy): optimize this if necessary. + BrokerServiceConnection client(client_cache(_env), broker_addr, 20000, &status); if (!status.ok()) { LOG(WARNING) << "Create broker write client failed. broker=" << broker_addr << ", status=" << status.get_error_msg(); - return; + return status; } try { client->closeWriter(response, request); } catch (apache::thrift::transport::TTransportException& e) { + LOG(WARNING) << "Close broker writer failed. broker=" << broker_addr + << ", status=" << status.get_error_msg(); status = client.reopen(); if (!status.ok()) { - LOG(WARNING) << "Close broker writer failed. broker=" << broker_addr + LOG(WARNING) << "Reopen broker writer failed. broker=" << broker_addr << ", status=" << status.get_error_msg(); - return; + return status; } client->closeWriter(response, request); } } catch (apache::thrift::TException& e) { - LOG(WARNING) << "Close broker writer failed, broker:" << broker_addr + std::stringstream ss; + ss << "Close broker writer failed, broker:" << broker_addr << " msg:" << e.what(); - return; + LOG(WARNING) << ss.str(); + return Status::InternalError(ss.str()); } VLOG_ROW << "debug: send broker close writer response: " << apache::thrift::ThriftDebugString(response).c_str(); if (response.statusCode != TBrokerOperationStatusCode::OK) { - LOG(WARNING) << "Close broker writer failed, broker:" << broker_addr + std::stringstream ss; + ss << "Close broker writer failed, broker:" << broker_addr << " msg:" << response.message; - return; + LOG(WARNING) << ss.str(); + return Status::InternalError(ss.str()); } _is_closed = true; + return Status::OK(); } } // end namespace doris diff --git a/be/src/exec/broker_writer.h b/be/src/exec/broker_writer.h index 53f029782a..f5efe7b20e 100644 --- a/be/src/exec/broker_writer.h +++ b/be/src/exec/broker_writer.h @@ -48,7 +48,7 @@ public: virtual Status write(const uint8_t* buf, size_t buf_len, size_t* written_len) override; - virtual void close() override; + virtual Status close() override; private: ExecEnv* _env; diff --git a/be/src/exec/file_writer.h b/be/src/exec/file_writer.h index 3a056d0d16..9c0ebf4488 100644 --- a/be/src/exec/file_writer.h +++ b/be/src/exec/file_writer.h @@ -35,7 +35,7 @@ public: // NOTE: the number of bytes written may be less than count if. virtual Status write(const uint8_t* buf, size_t buf_len, size_t* written_len) = 0; - virtual void close() = 0; + virtual Status close() = 0; }; } // end namespace doris diff --git a/be/src/exec/local_file_writer.cpp b/be/src/exec/local_file_writer.cpp index dcb103ae78..d140e6b97c 100644 --- a/be/src/exec/local_file_writer.cpp +++ b/be/src/exec/local_file_writer.cpp @@ -68,11 +68,12 @@ Status LocalFileWriter::write(const uint8_t* buf, size_t buf_len, size_t* writte return Status::OK(); } -void LocalFileWriter::close() { +Status LocalFileWriter::close() { if (_fp != nullptr) { fclose(_fp); _fp = nullptr; } + return Status::OK(); } } // end namespace doris diff --git a/be/src/exec/local_file_writer.h b/be/src/exec/local_file_writer.h index 76845e4792..dc116698e0 100644 --- a/be/src/exec/local_file_writer.h +++ b/be/src/exec/local_file_writer.h @@ -33,7 +33,7 @@ public: virtual Status write(const uint8_t* buf, size_t buf_len, size_t* written_len) override; - virtual void close() override; + virtual Status close() override; private: std::string _path; diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index d89eb1f577..ea09cf3b82 100755 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -120,9 +120,9 @@ OLAPStatus CumulativeCompaction::pick_rowsets_to_compact() { } if (_input_rowsets.size() <= 1) { - LOG(WARNING) << "There is no enough rowsets to cumulative compaction." - << " The size of rowsets to compact=" << candidate_rowsets.size() - << ", cumulative_point=" << _tablet->cumulative_layer_point(); + LOG(INFO) << "There is no enough rowsets to cumulative compaction." + << ", the size of rowsets to compact=" << candidate_rowsets.size() + << ", cumulative_point=" << _tablet->cumulative_layer_point(); return OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS; } diff --git a/be/src/runtime/snapshot_loader.cpp b/be/src/runtime/snapshot_loader.cpp index 481729b462..8f560e74bf 100644 --- a/be/src/runtime/snapshot_loader.cpp +++ b/be/src/runtime/snapshot_loader.cpp @@ -222,6 +222,10 @@ Status SnapshotLoader::upload( read_offset += read_len; left_len -= read_len; } + + // close manually, because we need to check its close status + RETURN_IF_ERROR(broker_writer->close()); + LOG(INFO) << "finished to write file via broker. file: " << full_local_file << ", length: " << file_len; } diff --git a/fe/src/main/java/org/apache/doris/backup/BackupJob.java b/fe/src/main/java/org/apache/doris/backup/BackupJob.java index 9c411a81e2..880c5daf2d 100644 --- a/fe/src/main/java/org/apache/doris/backup/BackupJob.java +++ b/fe/src/main/java/org/apache/doris/backup/BackupJob.java @@ -51,7 +51,6 @@ import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Collections2; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -69,7 +68,6 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.stream.Collectors; @@ -95,8 +93,8 @@ public class BackupJob extends AbstractJob { private long snapshotFinishedTime = -1; private long snapshopUploadFinishedTime = -1; - // save all tablets which tasks are not finished. - private Set unfinishedTaskIds = Sets.newConcurrentHashSet(); + // save task id map to the backend it be executed + private Map unfinishedTaskIds = Maps.newConcurrentMap(); // tablet id -> snapshot info private Map snapshotInfos = Maps.newConcurrentMap(); // save all related table[partition] info @@ -164,12 +162,12 @@ public class BackupJob extends AbstractJob { snapshotInfos.put(task.getTabletId(), info); taskProgress.remove(task.getTabletId()); - boolean res = unfinishedTaskIds.remove(task.getTabletId()); + Long oldValue = unfinishedTaskIds.remove(task.getTabletId()); taskErrMsg.remove(task.getTabletId()); LOG.debug("get finished snapshot info: {}, unfinished tasks num: {}, remove result: {}. {}", - info, unfinishedTaskIds.size(), res, this); + info, unfinishedTaskIds.size(), (oldValue != null), this); - return res; + return oldValue != null; } public synchronized boolean finishSnapshotUploadTask(UploadTask task, TFinishTaskRequest request) { @@ -221,11 +219,11 @@ public class BackupJob extends AbstractJob { } taskProgress.remove(task.getSignature()); - boolean res = unfinishedTaskIds.remove(task.getSignature()); + Long oldValue = unfinishedTaskIds.remove(task.getSignature()); taskErrMsg.remove(task.getTabletId()); LOG.debug("get finished upload snapshot task, unfinished tasks num: {}, remove result: {}. {}", - unfinishedTaskIds.size(), res, this); - return res; + unfinishedTaskIds.size(), (oldValue != null), this); + return oldValue != null; } @Override @@ -406,7 +404,7 @@ public class BackupJob extends AbstractJob { visibleVersion, visibleVersionHash, schemaHash, timeoutMs, false /* not restore task */); batchTask.addTask(task); - unfinishedTaskIds.add(tablet.getId()); + unfinishedTaskIds.put(tablet.getId(), replica.getBackendId()); } } @@ -505,7 +503,7 @@ public class BackupJob extends AbstractJob { UploadTask task = new UploadTask(null, beId, signature, jobId, dbId, srcToDest, brokers.get(0), repo.getStorage().getProperties()); batchTask.addTask(task); - unfinishedTaskIds.add(signature); + unfinishedTaskIds.put(signature, beId); } } @@ -680,13 +678,13 @@ public class BackupJob extends AbstractJob { switch (state) { case SNAPSHOTING: // remove all snapshot tasks in AgentTaskQueue - for (Long taskId : unfinishedTaskIds) { + for (Long taskId : unfinishedTaskIds.keySet()) { AgentTaskQueue.removeTaskOfType(TTaskType.MAKE_SNAPSHOT, taskId); } break; case UPLOADING: // remove all upload tasks in AgentTaskQueue - for (Long taskId : unfinishedTaskIds) { + for (Long taskId : unfinishedTaskIds.keySet()) { AgentTaskQueue.removeTaskOfType(TTaskType.UPLOAD, taskId); } break; @@ -729,7 +727,7 @@ public class BackupJob extends AbstractJob { info.add(TimeUtils.longToTimeString(snapshotFinishedTime)); info.add(TimeUtils.longToTimeString(snapshopUploadFinishedTime)); info.add(TimeUtils.longToTimeString(finishedTime)); - info.add(Joiner.on(", ").join(unfinishedTaskIds)); + info.add(Joiner.on(", ").join(unfinishedTaskIds.entrySet())); info.add(Joiner.on(", ").join(taskProgress.entrySet().stream().map( e -> "[" + e.getKey() + ": " + e.getValue().first + "/" + e.getValue().second + "]").collect( Collectors.toList()))); diff --git a/fe/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/src/main/java/org/apache/doris/backup/RestoreJob.java index 6d2c4fef3e..3c9831a642 100644 --- a/fe/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -1294,7 +1294,7 @@ public class RestoreJob extends AbstractJob { info.add(TimeUtils.longToTimeString(snapshotFinishedTime)); info.add(TimeUtils.longToTimeString(downloadFinishedTime)); info.add(TimeUtils.longToTimeString(finishedTime)); - info.add(Joiner.on(", ").join(unfinishedSignatureToId.keySet())); + info.add(Joiner.on(", ").join(unfinishedSignatureToId.entrySet())); info.add(Joiner.on(", ").join(taskProgress.entrySet().stream().map( e -> "[" + e.getKey() + ": " + e.getValue().first + "/" + e.getValue().second + "]").collect( Collectors.toList()))); diff --git a/fe/src/main/java/org/apache/doris/common/proc/TransProcDir.java b/fe/src/main/java/org/apache/doris/common/proc/TransProcDir.java index bb73966a12..1121ffcec7 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/TransProcDir.java +++ b/fe/src/main/java/org/apache/doris/common/proc/TransProcDir.java @@ -19,14 +19,11 @@ package org.apache.doris.common.proc; import org.apache.doris.catalog.Catalog; import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.util.ListComparator; import org.apache.doris.transaction.GlobalTransactionMgr; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; public class TransProcDir implements ProcDirInterface { @@ -60,17 +57,8 @@ public class TransProcDir implements ProcDirInterface { BaseProcResult result = new BaseProcResult(); result.setNames(TITLE_NAMES); GlobalTransactionMgr transactionMgr = Catalog.getCurrentGlobalTransactionMgr(); - List> infos = transactionMgr.getDbTransInfo(dbId, state.equals("running"), MAX_SHOW_ENTRIES); - // order by transactionId, desc - ListComparator> comparator = new ListComparator>(true, 0); - Collections.sort(infos, comparator); - for (List info : infos) { - List row = new ArrayList(info.size()); - for (Comparable comparable : info) { - row.add(comparable.toString()); - } - result.addRow(row); - } + List> infos = transactionMgr.getDbTransInfo(dbId, state.equals("running"), MAX_SHOW_ENTRIES); + result.setRows(infos); return result; } diff --git a/fe/src/main/java/org/apache/doris/common/proc/TransTablesProcDir.java b/fe/src/main/java/org/apache/doris/common/proc/TransTablesProcDir.java index 7830a1b5a5..350817fa91 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/TransTablesProcDir.java +++ b/fe/src/main/java/org/apache/doris/common/proc/TransTablesProcDir.java @@ -35,10 +35,10 @@ public class TransTablesProcDir implements ProcDirInterface { .add("CommittedPartitionIds") .build(); - private long tid; + private long txnId; - public TransTablesProcDir(long tid) { - this.tid = tid; + public TransTablesProcDir(long txnId) { + this.txnId = txnId; } @Override @@ -50,7 +50,7 @@ public class TransTablesProcDir implements ProcDirInterface { public ProcResult fetchResult() throws AnalysisException { // get info GlobalTransactionMgr transactionMgr = Catalog.getCurrentGlobalTransactionMgr(); - List> tableInfos = transactionMgr.getTableTransInfo(tid); + List> tableInfos = transactionMgr.getTableTransInfo(txnId); // sort by table id ListComparator> comparator = new ListComparator>(0); Collections.sort(tableInfos, comparator); @@ -83,6 +83,6 @@ public class TransTablesProcDir implements ProcDirInterface { throw new AnalysisException("Invalid table id format: " + tableIdStr); } - return new TransPartitionProcNode(tid, tableId); + return new TransPartitionProcNode(txnId, tableId); } } diff --git a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index a0331d432d..ad88468a44 100644 --- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -1190,8 +1190,8 @@ public class GlobalTransactionMgr { return infos; } - public List> getDbTransInfo(long dbId, boolean running, int limit) throws AnalysisException { - List> infos = new ArrayList>(); + public List> getDbTransInfo(long dbId, boolean running, int limit) throws AnalysisException { + List> infos = new ArrayList>(); readLock(); try { Database db = Catalog.getInstance().getDb(dbId); @@ -1199,24 +1199,25 @@ public class GlobalTransactionMgr { throw new AnalysisException("Database[" + dbId + "] does not exist"); } + // get transaction order by txn id desc limit 'limit' idToTransactionState.values().stream() .filter(t -> (t.getDbId() == dbId && (running ? !t.getTransactionStatus().isFinalStatus() - : t.getTransactionStatus().isFinalStatus()))) + : t.getTransactionStatus().isFinalStatus()))).sorted(TransactionState.TXN_ID_COMPARATOR) .limit(limit) .forEach(t -> { - List info = new ArrayList(); - info.add(t.getTransactionId()); + List info = new ArrayList(); + info.add(String.valueOf(t.getTransactionId())); info.add(t.getLabel()); info.add(t.getCoordinator()); - info.add(t.getTransactionStatus()); - info.add(t.getSourceType()); + info.add(t.getTransactionStatus().name()); + info.add(t.getSourceType().name()); info.add(TimeUtils.longToTimeString(t.getPrepareTime())); info.add(TimeUtils.longToTimeString(t.getCommitTime())); info.add(TimeUtils.longToTimeString(t.getFinishTime())); info.add(t.getReason()); - info.add(t.getErrorReplicas().size()); - info.add(t.getCallbackId()); - info.add(t.getTimeoutMs()); + info.add(String.valueOf(t.getErrorReplicas().size())); + info.add(String.valueOf(t.getCallbackId())); + info.add(String.valueOf(t.getTimeoutMs())); infos.add(info); }); } finally { @@ -1225,13 +1226,13 @@ public class GlobalTransactionMgr { return infos; } - public List> getTableTransInfo(long tid) throws AnalysisException { + public List> getTableTransInfo(long txnId) throws AnalysisException { List> tableInfos = new ArrayList>(); readLock(); try { - TransactionState transactionState = idToTransactionState.get(tid); + TransactionState transactionState = idToTransactionState.get(txnId); if (null == transactionState) { - throw new AnalysisException("Transaction[" + tid + "] does not exist."); + throw new AnalysisException("Transaction[" + txnId + "] does not exist."); } for (Map.Entry entry : transactionState.getIdToTableCommitInfos().entrySet()) { diff --git a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java index 82ae27574a..5973b46e93 100644 --- a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -39,6 +39,7 @@ import org.apache.logging.log4j.Logger; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.Comparator; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -47,6 +48,22 @@ import java.util.concurrent.TimeUnit; public class TransactionState implements Writable { private static final Logger LOG = LogManager.getLogger(TransactionState.class); + // compare the TransactionState by txn id, desc + public static class TxnStateComparator implements Comparator { + @Override + public int compare(TransactionState t1, TransactionState t2) { + if (t1.getTransactionId() > t2.getTransactionId()) { + return -1; + } else if (t1.getTransactionId() < t2.getTransactionId()) { + return 1; + } else { + return 0; + } + } + } + + public static final TxnStateComparator TXN_ID_COMPARATOR = new TxnStateComparator(); + public enum LoadJobSourceType { FRONTEND(1), // old dpp load, mini load, insert stmt(not streaming type) use this type BACKEND_STREAMING(2), // streaming load use this type diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java index 0907281813..8070e780ce 100644 --- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java +++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java @@ -534,6 +534,7 @@ public class FileSystemManager { FSDataOutputStream fsDataOutputStream = clientContextManager.getFsDataOutputStream(fd); synchronized (fsDataOutputStream) { try { + fsDataOutputStream.flush(); fsDataOutputStream.close(); } catch (IOException e) { logger.error("errors while close file output stream", e);