[improvement](show backends) show backends print trash used (#23792)
This commit is contained in:
@ -678,6 +678,7 @@ void TaskWorkerPool::_report_disk_state_worker_thread_callback() {
|
||||
disk.__set_data_used_capacity(root_path_info.local_used_capacity);
|
||||
disk.__set_remote_used_capacity(root_path_info.remote_used_capacity);
|
||||
disk.__set_disk_available_capacity(root_path_info.available);
|
||||
disk.__set_trash_used_capacity(root_path_info.trash_used_capacity);
|
||||
disk.__set_used(root_path_info.is_used);
|
||||
request.disks[root_path_info.path] = disk;
|
||||
}
|
||||
|
||||
@ -181,6 +181,8 @@ public:
|
||||
// notify the worker. currently for task/disk/tablet report thread
|
||||
void notify_thread();
|
||||
|
||||
TaskWorkerType task_worker_type() const { return _task_worker_type; }
|
||||
|
||||
protected:
|
||||
bool _register_task_info(const TTaskType::type task_type, int64_t signature);
|
||||
void _remove_task_info(const TTaskType::type task_type, int64_t signature);
|
||||
|
||||
@ -75,6 +75,7 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(disks_total_capacity, MetricUnit::BYTES);
|
||||
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(disks_avail_capacity, MetricUnit::BYTES);
|
||||
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(disks_local_used_capacity, MetricUnit::BYTES);
|
||||
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(disks_remote_used_capacity, MetricUnit::BYTES);
|
||||
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(disks_trash_used_capacity, MetricUnit::BYTES);
|
||||
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(disks_state, MetricUnit::BYTES);
|
||||
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(disks_compaction_score, MetricUnit::NOUNIT);
|
||||
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(disks_compaction_num, MetricUnit::NOUNIT);
|
||||
@ -88,6 +89,7 @@ DataDir::DataDir(const std::string& path, int64_t capacity_bytes,
|
||||
_fs(io::LocalFileSystem::create(path)),
|
||||
_available_bytes(0),
|
||||
_disk_capacity_bytes(0),
|
||||
_trash_used_bytes(0),
|
||||
_storage_medium(storage_medium),
|
||||
_is_used(false),
|
||||
_tablet_manager(tablet_manager),
|
||||
@ -103,6 +105,7 @@ DataDir::DataDir(const std::string& path, int64_t capacity_bytes,
|
||||
INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_avail_capacity);
|
||||
INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_local_used_capacity);
|
||||
INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_remote_used_capacity);
|
||||
INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_trash_used_capacity);
|
||||
INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_state);
|
||||
INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_compaction_score);
|
||||
INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_compaction_num);
|
||||
@ -122,6 +125,7 @@ Status DataDir::init() {
|
||||
"check file exist failed");
|
||||
}
|
||||
|
||||
update_trash_capacity();
|
||||
RETURN_NOT_OK_STATUS_WITH_WARN(update_capacity(), "update_capacity failed");
|
||||
RETURN_NOT_OK_STATUS_WITH_WARN(_init_cluster_id(), "_init_cluster_id failed");
|
||||
RETURN_NOT_OK_STATUS_WITH_WARN(_init_capacity_and_create_shards(),
|
||||
@ -839,6 +843,13 @@ Status DataDir::update_capacity() {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void DataDir::update_trash_capacity() {
|
||||
auto trash_path = fmt::format("{}/{}", _path, TRASH_PREFIX);
|
||||
_trash_used_bytes = StorageEngine::instance()->get_file_or_directory_size(trash_path);
|
||||
disks_trash_used_capacity->set_value(_trash_used_bytes);
|
||||
LOG(INFO) << "path: " << _path << " trash capacity: " << _trash_used_bytes;
|
||||
}
|
||||
|
||||
void DataDir::update_local_data_size(int64_t size) {
|
||||
disks_local_used_capacity->set_value(size);
|
||||
}
|
||||
|
||||
@ -71,6 +71,7 @@ public:
|
||||
info.path_hash = _path_hash;
|
||||
info.disk_capacity = _disk_capacity_bytes;
|
||||
info.available = _available_bytes;
|
||||
info.trash_used_capacity = _trash_used_bytes;
|
||||
info.is_used = _is_used;
|
||||
info.storage_medium = _storage_medium;
|
||||
return info;
|
||||
@ -131,6 +132,8 @@ public:
|
||||
|
||||
Status update_capacity();
|
||||
|
||||
void update_trash_capacity();
|
||||
|
||||
void update_local_data_size(int64_t size);
|
||||
|
||||
void update_remote_data_size(int64_t size);
|
||||
@ -175,6 +178,7 @@ private:
|
||||
size_t _available_bytes;
|
||||
// the actual capacity of the disk of this data dir
|
||||
size_t _disk_capacity_bytes;
|
||||
size_t _trash_used_bytes;
|
||||
TStorageMedium::type _storage_medium;
|
||||
bool _is_used;
|
||||
|
||||
@ -208,6 +212,7 @@ private:
|
||||
IntGauge* disks_avail_capacity;
|
||||
IntGauge* disks_local_used_capacity;
|
||||
IntGauge* disks_remote_used_capacity;
|
||||
IntGauge* disks_trash_used_capacity;
|
||||
IntGauge* disks_state;
|
||||
IntGauge* disks_compaction_score;
|
||||
IntGauge* disks_compaction_num;
|
||||
|
||||
@ -57,6 +57,7 @@ struct DataDirInfo {
|
||||
int64_t available = 0; // available space, in bytes unit
|
||||
int64_t local_used_capacity = 0;
|
||||
int64_t remote_used_capacity = 0;
|
||||
int64_t trash_used_capacity = 0;
|
||||
bool is_used = false; // whether available mark
|
||||
TStorageMedium::type storage_medium = TStorageMedium::HDD; // Storage medium type: SSD|HDD
|
||||
};
|
||||
|
||||
@ -45,7 +45,6 @@
|
||||
#include <unordered_set>
|
||||
#include <utility>
|
||||
|
||||
#include "agent/task_worker_pool.h"
|
||||
#include "common/config.h"
|
||||
#include "common/logging.h"
|
||||
#include "gutil/strings/substitute.h"
|
||||
@ -714,6 +713,7 @@ Status StorageEngine::start_trash_sweep(double* usage, bool ignore_guard) {
|
||||
for (auto data_dir : get_stores()) {
|
||||
data_dir->perform_remote_rowset_gc();
|
||||
data_dir->perform_remote_tablet_gc();
|
||||
data_dir->update_trash_capacity();
|
||||
}
|
||||
|
||||
return res;
|
||||
@ -1144,6 +1144,15 @@ void StorageEngine::notify_listeners() {
|
||||
}
|
||||
}
|
||||
|
||||
void StorageEngine::notify_listener(TaskWorkerPool::TaskWorkerType task_worker_type) {
|
||||
std::lock_guard<std::mutex> l(_report_mtx);
|
||||
for (auto& listener : _report_listeners) {
|
||||
if (listener->task_worker_type() == task_worker_type) {
|
||||
listener->notify_thread();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Status StorageEngine::execute_task(EngineTask* task) {
|
||||
RETURN_IF_ERROR(task->execute());
|
||||
return task->finish();
|
||||
|
||||
@ -35,6 +35,7 @@
|
||||
#include <unordered_set>
|
||||
#include <vector>
|
||||
|
||||
#include "agent/task_worker_pool.h"
|
||||
#include "common/status.h"
|
||||
#include "gutil/ref_counted.h"
|
||||
#include "olap/calc_delete_bitmap_executor.h"
|
||||
@ -54,7 +55,6 @@ namespace doris {
|
||||
class DataDir;
|
||||
class EngineTask;
|
||||
class MemTableFlushExecutor;
|
||||
class TaskWorkerPool;
|
||||
class SegcompactionWorker;
|
||||
class BaseCompaction;
|
||||
class CumulativeCompaction;
|
||||
@ -137,6 +137,7 @@ public:
|
||||
void register_report_listener(TaskWorkerPool* listener);
|
||||
void deregister_report_listener(TaskWorkerPool* listener);
|
||||
void notify_listeners();
|
||||
void notify_listener(TaskWorkerPool::TaskWorkerType task_worker_type);
|
||||
|
||||
Status execute_task(EngineTask* task);
|
||||
|
||||
|
||||
@ -224,10 +224,13 @@ int64_t BackendService::get_trash_used_capacity() {
|
||||
std::vector<DataDirInfo> data_dir_infos;
|
||||
StorageEngine::instance()->get_all_data_dir_info(&data_dir_infos, false /*do not update */);
|
||||
|
||||
// uses excute sql `show trash`, then update backend trash capacity too.
|
||||
StorageEngine::instance()->notify_listener(TaskWorkerPool::TaskWorkerType::REPORT_DISK_STATE);
|
||||
|
||||
for (const auto& root_path_info : data_dir_infos) {
|
||||
auto trash_path = fmt::format("{}/{}", root_path_info.path, TRASH_PREFIX);
|
||||
result += StorageEngine::instance()->get_file_or_directory_size(trash_path);
|
||||
result += root_path_info.trash_used_capacity;
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
@ -235,17 +238,14 @@ void BackendService::get_disk_trash_used_capacity(std::vector<TDiskTrashInfo>& d
|
||||
std::vector<DataDirInfo> data_dir_infos;
|
||||
StorageEngine::instance()->get_all_data_dir_info(&data_dir_infos, false /*do not update */);
|
||||
|
||||
// uses excute sql `show trash on <be>`, then update backend trash capacity too.
|
||||
StorageEngine::instance()->notify_listener(TaskWorkerPool::TaskWorkerType::REPORT_DISK_STATE);
|
||||
|
||||
for (const auto& root_path_info : data_dir_infos) {
|
||||
TDiskTrashInfo diskTrashInfo;
|
||||
|
||||
diskTrashInfo.__set_root_path(root_path_info.path);
|
||||
|
||||
diskTrashInfo.__set_state(root_path_info.is_used ? "ONLINE" : "OFFLINE");
|
||||
|
||||
auto trash_path = fmt::format("{}/{}", root_path_info.path, TRASH_PREFIX);
|
||||
diskTrashInfo.__set_trash_used_capacity(
|
||||
StorageEngine::instance()->get_file_or_directory_size(trash_path));
|
||||
|
||||
diskTrashInfo.__set_trash_used_capacity(root_path_info.trash_used_capacity);
|
||||
diskTrashInfos.push_back(diskTrashInfo);
|
||||
}
|
||||
}
|
||||
@ -381,6 +381,7 @@ void BackendService::get_stream_load_record(TStreamLoadRecordResult& result,
|
||||
|
||||
void BackendService::clean_trash() {
|
||||
StorageEngine::instance()->start_trash_sweep(nullptr, true);
|
||||
StorageEngine::instance()->notify_listener(TaskWorkerPool::TaskWorkerType::REPORT_DISK_STATE);
|
||||
}
|
||||
|
||||
void BackendService::check_storage_format(TCheckStorageFormatResult& result) {
|
||||
|
||||
@ -47,6 +47,8 @@ public class DiskInfo implements Writable {
|
||||
private long totalCapacityB;
|
||||
@SerializedName("dataUsedCapacityB")
|
||||
private long dataUsedCapacityB;
|
||||
@SerializedName("trashUsedCapacityB")
|
||||
private long trashUsedCapacityB;
|
||||
@SerializedName("remoteUsedCapacity")
|
||||
private long remoteUsedCapacity = 0;
|
||||
@SerializedName("diskAvailableCapacityB")
|
||||
@ -65,6 +67,7 @@ public class DiskInfo implements Writable {
|
||||
this.rootPath = rootPath;
|
||||
this.totalCapacityB = DEFAULT_CAPACITY_B;
|
||||
this.dataUsedCapacityB = 0;
|
||||
this.trashUsedCapacityB = 0;
|
||||
this.diskAvailableCapacityB = DEFAULT_CAPACITY_B;
|
||||
this.state = DiskState.ONLINE;
|
||||
this.pathHash = 0;
|
||||
@ -99,6 +102,14 @@ public class DiskInfo implements Writable {
|
||||
this.remoteUsedCapacity = remoteUsedCapacity;
|
||||
}
|
||||
|
||||
public long getTrashUsedCapacityB() {
|
||||
return trashUsedCapacityB;
|
||||
}
|
||||
|
||||
public void setTrashUsedCapacityB(long trashUsedCapacityB) {
|
||||
this.trashUsedCapacityB = trashUsedCapacityB;
|
||||
}
|
||||
|
||||
public long getDiskUsedCapacityB() {
|
||||
return totalCapacityB - diskAvailableCapacityB;
|
||||
}
|
||||
@ -172,8 +183,9 @@ public class DiskInfo implements Writable {
|
||||
@Override
|
||||
public String toString() {
|
||||
return "DiskInfo [rootPath=" + rootPath + "(" + pathHash + "), totalCapacityB=" + totalCapacityB
|
||||
+ ", dataUsedCapacityB=" + dataUsedCapacityB + ", diskAvailableCapacityB="
|
||||
+ diskAvailableCapacityB + ", state=" + state + ", medium: " + storageMedium + "]";
|
||||
+ ", dataUsedCapacityB=" + dataUsedCapacityB + ", trashUsedCapacityB=" + trashUsedCapacityB
|
||||
+ ", diskAvailableCapacityB=" + diskAvailableCapacityB + ", state=" + state
|
||||
+ ", medium: " + storageMedium + "]";
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -46,8 +46,9 @@ public class BackendsProcDir implements ProcDirInterface {
|
||||
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>().add("BackendId")
|
||||
.add("Host").add("HeartbeatPort").add("BePort").add("HttpPort").add("BrpcPort").add("LastStartTime")
|
||||
.add("LastHeartbeat").add("Alive").add("SystemDecommissioned").add("TabletNum").add("DataUsedCapacity")
|
||||
.add("AvailCapacity").add("TotalCapacity").add("UsedPct").add("MaxDiskUsedPct").add("RemoteUsedCapacity")
|
||||
.add("Tag").add("ErrMsg").add("Version").add("Status").add("HeartbeatFailureCounter").add("NodeRole")
|
||||
.add("TrashUsedCapcacity").add("AvailCapacity").add("TotalCapacity").add("UsedPct").add("MaxDiskUsedPct")
|
||||
.add("RemoteUsedCapacity").add("Tag").add("ErrMsg").add("Version").add("Status")
|
||||
.add("HeartbeatFailureCounter").add("NodeRole")
|
||||
.build();
|
||||
|
||||
public static final int HOSTNAME_INDEX = 3;
|
||||
@ -117,6 +118,11 @@ public class BackendsProcDir implements ProcDirInterface {
|
||||
long dataUsedB = backend.getDataUsedCapacityB();
|
||||
Pair<Double, String> usedCapacity = DebugUtil.getByteUint(dataUsedB);
|
||||
backendInfo.add(DebugUtil.DECIMAL_FORMAT_SCALE_3.format(usedCapacity.first) + " " + usedCapacity.second);
|
||||
// trash used
|
||||
long trashUsedB = backend.getTrashUsedCapacityB();
|
||||
Pair<Double, String> trashUsedCapacity = DebugUtil.getByteUint(trashUsedB);
|
||||
backendInfo.add(DebugUtil.DECIMAL_FORMAT_SCALE_3.format(
|
||||
trashUsedCapacity.first) + " " + trashUsedCapacity.second);
|
||||
// available
|
||||
long availB = backend.getAvailableCapacityB();
|
||||
Pair<Double, String> availCapacity = DebugUtil.getByteUint(availB);
|
||||
|
||||
@ -399,6 +399,17 @@ public class Backend implements Writable {
|
||||
return dataUsedCapacityB;
|
||||
}
|
||||
|
||||
public long getTrashUsedCapacityB() {
|
||||
ImmutableMap<String, DiskInfo> disks = disksRef;
|
||||
long trashUsedCapacityB = 0L;
|
||||
for (DiskInfo diskInfo : disks.values()) {
|
||||
if (diskInfo.getState() == DiskState.ONLINE) {
|
||||
trashUsedCapacityB += diskInfo.getTrashUsedCapacityB();
|
||||
}
|
||||
}
|
||||
return trashUsedCapacityB;
|
||||
}
|
||||
|
||||
public long getRemoteUsedCapacityB() {
|
||||
ImmutableMap<String, DiskInfo> disks = disksRef;
|
||||
long totalRemoteUsedCapacityB = 0L;
|
||||
@ -485,6 +496,7 @@ public class Backend implements Writable {
|
||||
String rootPath = tDisk.getRootPath();
|
||||
long totalCapacityB = tDisk.getDiskTotalCapacity();
|
||||
long dataUsedCapacityB = tDisk.getDataUsedCapacity();
|
||||
long trashUsedCapacityB = tDisk.getTrashUsedCapacity();
|
||||
long diskAvailableCapacityB = tDisk.getDiskAvailableCapacity();
|
||||
boolean isUsed = tDisk.isUsed();
|
||||
DiskInfo diskInfo = disks.get(rootPath);
|
||||
@ -498,6 +510,7 @@ public class Backend implements Writable {
|
||||
|
||||
diskInfo.setTotalCapacityB(totalCapacityB);
|
||||
diskInfo.setDataUsedCapacityB(dataUsedCapacityB);
|
||||
diskInfo.setTrashUsedCapacityB(trashUsedCapacityB);
|
||||
diskInfo.setAvailableCapacityB(diskAvailableCapacityB);
|
||||
if (tDisk.isSetRemoteUsedCapacity()) {
|
||||
diskInfo.setRemoteUsedCapacity(tDisk.getRemoteUsedCapacity());
|
||||
|
||||
@ -51,6 +51,7 @@ public class BackendsTableValuedFunction extends MetadataTableValuedFunction {
|
||||
new Column("SystemDecommissioned", ScalarType.createType(PrimitiveType.BOOLEAN)),
|
||||
new Column("TabletNum", ScalarType.createType(PrimitiveType.BIGINT)),
|
||||
new Column("DataUsedCapacity", ScalarType.createType(PrimitiveType.BIGINT)),
|
||||
new Column("TrashUsedCapacity", ScalarType.createType(PrimitiveType.BIGINT)),
|
||||
new Column("AvailCapacity", ScalarType.createType(PrimitiveType.BIGINT)),
|
||||
new Column("TotalCapacity", ScalarType.createType(PrimitiveType.BIGINT)),
|
||||
new Column("UsedPct", ScalarType.createType(PrimitiveType.DOUBLE)),
|
||||
|
||||
@ -199,6 +199,9 @@ public class MetadataGenerator {
|
||||
// data used
|
||||
trow.addToColumnValue(new TCell().setLongVal(backend.getDataUsedCapacityB()));
|
||||
|
||||
// trash used
|
||||
trow.addToColumnValue(new TCell().setLongVal(backend.getTrashUsedCapacityB()));
|
||||
|
||||
// available
|
||||
long availB = backend.getAvailableCapacityB();
|
||||
trow.addToColumnValue(new TCell().setLongVal(availB));
|
||||
|
||||
@ -198,7 +198,7 @@ public class DemoMultiBackendsTest {
|
||||
BackendsProcDir dir = new BackendsProcDir(Env.getCurrentSystemInfo());
|
||||
ProcResult result = dir.fetchResult();
|
||||
Assert.assertEquals(BackendsProcDir.TITLE_NAMES.size(), result.getColumnNames().size());
|
||||
Assert.assertEquals("{\"location\" : \"default\"}", result.getRows().get(0).get(17));
|
||||
Assert.assertEquals("{\"location\" : \"default\"}", result.getRows().get(0).get(18));
|
||||
Assert.assertEquals(
|
||||
"{\"lastSuccessReportTabletsTime\":\"N/A\",\"lastStreamLoadTime\":-1,\"isQueryDisabled\":false,\"isLoadDisabled\":false}",
|
||||
result.getRows().get(0).get(BackendsProcDir.TITLE_NAMES.size() - 3));
|
||||
|
||||
@ -81,6 +81,7 @@ struct TDisk {
|
||||
6: optional i64 path_hash
|
||||
7: optional Types.TStorageMedium storage_medium
|
||||
8: optional Types.TSize remote_used_capacity
|
||||
9: optional Types.TSize trash_used_capacity
|
||||
}
|
||||
|
||||
struct TPluginInfo {
|
||||
|
||||
@ -19,7 +19,7 @@
|
||||
suite("test_backends_tvf","p0,external,tvf,external_docker") {
|
||||
List<List<Object>> table = sql """ select * from backends(); """
|
||||
assertTrue(table.size() > 0)
|
||||
assertEquals(23, table[0].size)
|
||||
assertEquals(24, table[0].size)
|
||||
|
||||
// filter columns
|
||||
table = sql """ select BackendId, Host, Alive, TotalCapacity, Version, NodeRole from backends();"""
|
||||
|
||||
@ -18,7 +18,7 @@
|
||||
suite("information_schema") {
|
||||
List<List<Object>> table = sql """ select * from backends(); """
|
||||
assertTrue(table.size() > 0)
|
||||
assertTrue(table[0].size == 23)
|
||||
assertTrue(table[0].size == 24)
|
||||
|
||||
sql "SELECT DATABASE();"
|
||||
sql "select USER();"
|
||||
|
||||
Reference in New Issue
Block a user