[fix](log) regularise some BE error type and fix a load task check #28729
This commit is contained in:
@ -273,8 +273,8 @@ Status check_migrate_request(StorageEngine& engine, const TStorageMediumMigrateR
|
||||
// check local disk capacity
|
||||
int64_t tablet_size = tablet->tablet_local_size();
|
||||
if ((*dest_store)->reach_capacity_limit(tablet_size)) {
|
||||
return Status::InternalError("reach the capacity limit of path {}, tablet_size={}",
|
||||
(*dest_store)->path(), tablet_size);
|
||||
return Status::Error<EXCEEDED_LIMIT>("reach the capacity limit of path {}, tablet_size={}",
|
||||
(*dest_store)->path(), tablet_size);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -42,8 +42,7 @@
|
||||
#include "io/fs/file_writer.h"
|
||||
#include "io/fs/local_file_system.h"
|
||||
|
||||
namespace doris {
|
||||
namespace config {
|
||||
namespace doris::config {
|
||||
|
||||
// Dir of custom config file
|
||||
DEFINE_String(custom_config_dir, "${DORIS_HOME}/conf");
|
||||
@ -109,10 +108,6 @@ DEFINE_mInt32(hash_table_double_grow_degree, "31");
|
||||
DEFINE_mInt32(max_fill_rate, "2");
|
||||
|
||||
DEFINE_mInt32(double_resize_threshold, "23");
|
||||
// Expand the hash table before inserting data, the maximum expansion size.
|
||||
// There are fewer duplicate keys, reducing the number of resize hash tables
|
||||
// There are many duplicate keys, and the hash table filled bucket is far less than the hash table build bucket.
|
||||
DEFINE_mInt64(hash_table_pre_expanse_max_rows, "65535");
|
||||
|
||||
// The maximum low water mark of the system `/proc/meminfo/MemAvailable`, Unit byte, default 1.6G,
|
||||
// actual low water mark=min(1.6G, MemTotal * 10%), avoid wasting too much memory on machines
|
||||
@ -845,16 +840,6 @@ DEFINE_String(function_service_protocol, "h2:grpc");
|
||||
// use which load balancer to select server to connect
|
||||
DEFINE_String(rpc_load_balancer, "rr");
|
||||
|
||||
// The maximum buffer/queue size to collect span. After the size is reached, spans are dropped.
|
||||
// An export will be triggered when the number of spans in the queue reaches half of the maximum.
|
||||
DEFINE_Int32(max_span_queue_size, "2048");
|
||||
|
||||
// The maximum batch size of every export spans. It must be smaller or equal to max_queue_size.
|
||||
DEFINE_Int32(max_span_export_batch_size, "512");
|
||||
|
||||
// The time interval between two consecutive export spans.
|
||||
DEFINE_Int32(export_span_schedule_delay_millis, "500");
|
||||
|
||||
// a soft limit of string type length, the hard limit is 2GB - 4, but if too long will cause very low performance,
|
||||
// so we set a soft limit, default is 1MB
|
||||
DEFINE_mInt32(string_type_length_soft_limit_bytes, "1048576");
|
||||
@ -867,10 +852,6 @@ DEFINE_mInt32(jsonb_type_length_soft_limit_bytes, "1048576");
|
||||
DEFINE_Validator(jsonb_type_length_soft_limit_bytes,
|
||||
[](const int config) -> bool { return config > 0 && config <= 2147483643; });
|
||||
|
||||
// used for olap scanner to save memory, when the size of unused_object_pool
|
||||
// is greater than object_pool_buffer_size, release the object in the unused_object_pool.
|
||||
DEFINE_Int32(object_pool_buffer_size, "100");
|
||||
|
||||
// Threshold of reading a small file into memory
|
||||
DEFINE_mInt32(in_memory_file_size, "1048576"); // 1MB
|
||||
|
||||
@ -906,7 +887,7 @@ DEFINE_Int32(concurrency_per_dir, "2");
|
||||
// "whole_file_cache": the whole file.
|
||||
DEFINE_mString(file_cache_type, "file_block_cache");
|
||||
DEFINE_Validator(file_cache_type, [](std::string_view config) -> bool {
|
||||
return config == "" || config == "file_block_cache";
|
||||
return config.empty() || config == "file_block_cache";
|
||||
});
|
||||
|
||||
DEFINE_Int32(s3_transfer_executor_pool_size, "2");
|
||||
@ -970,8 +951,8 @@ DEFINE_Bool(enable_fuzzy_mode, "false");
|
||||
DEFINE_Bool(enable_debug_points, "false");
|
||||
|
||||
DEFINE_Int32(pipeline_executor_size, "0");
|
||||
// 128 MB
|
||||
DEFINE_mInt64(local_exchange_buffer_mem_limit, "134217728");
|
||||
DEFINE_Bool(enable_workload_group_for_scan, "false");
|
||||
DEFINE_mInt64(workload_group_scan_task_wait_timeout_ms, "10000");
|
||||
|
||||
// Temp config. True to use optimization for bitmap_index apply predicate except leaf node of the and node.
|
||||
// Will remove after fully test.
|
||||
@ -1152,6 +1133,9 @@ DEFINE_Bool(enable_snapshot_action, "false");
|
||||
|
||||
DEFINE_mInt32(variant_max_merged_tablet_schema_size, "2048");
|
||||
|
||||
// 128 MB
|
||||
DEFINE_mInt64(local_exchange_buffer_mem_limit, "134217728");
|
||||
|
||||
// clang-format off
|
||||
#ifdef BE_TEST
|
||||
// test s3
|
||||
@ -1204,7 +1188,7 @@ bool replaceenv(std::string& s) {
|
||||
std::size_t pos = 0;
|
||||
std::size_t start = 0;
|
||||
while ((start = s.find("${", pos)) != std::string::npos) {
|
||||
std::size_t end = s.find("}", start + 2);
|
||||
std::size_t end = s.find('}', start + 2);
|
||||
if (end == std::string::npos) {
|
||||
return false;
|
||||
}
|
||||
@ -1242,9 +1226,9 @@ bool strtox(const std::string& valstr, std::vector<T>& retval) {
|
||||
}
|
||||
|
||||
bool strtox(const std::string& valstr, bool& retval) {
|
||||
if (valstr.compare("true") == 0) {
|
||||
if (valstr == "true") {
|
||||
retval = true;
|
||||
} else if (valstr.compare("false") == 0) {
|
||||
} else if (valstr == "false") {
|
||||
retval = false;
|
||||
} else {
|
||||
return false;
|
||||
@ -1604,18 +1588,17 @@ std::vector<std::vector<std::string>> get_config_info() {
|
||||
std::vector<std::string> _config;
|
||||
_config.push_back(it.first);
|
||||
|
||||
_config.push_back(field_it->second.type);
|
||||
_config.emplace_back(field_it->second.type);
|
||||
if (0 == strcmp(field_it->second.type, "bool")) {
|
||||
_config.push_back(it.second == "1" ? "true" : "false");
|
||||
_config.emplace_back(it.second == "1" ? "true" : "false");
|
||||
} else {
|
||||
_config.push_back(it.second);
|
||||
}
|
||||
_config.push_back(field_it->second.valmutable ? "true" : "false");
|
||||
_config.emplace_back(field_it->second.valmutable ? "true" : "false");
|
||||
|
||||
configs.push_back(_config);
|
||||
}
|
||||
return configs;
|
||||
}
|
||||
|
||||
} // namespace config
|
||||
} // namespace doris
|
||||
} // namespace doris::config
|
||||
|
||||
@ -152,11 +152,6 @@ DECLARE_mInt32(max_fill_rate);
|
||||
|
||||
DECLARE_mInt32(double_resize_threshold);
|
||||
|
||||
// Expand the hash table before inserting data, the maximum expansion size.
|
||||
// There are fewer duplicate keys, reducing the number of resize hash tables
|
||||
// There are many duplicate keys, and the hash table filled bucket is far less than the hash table build bucket.
|
||||
DECLARE_mInt64(hash_table_pre_expanse_max_rows);
|
||||
|
||||
// The maximum low water mark of the system `/proc/meminfo/MemAvailable`, Unit byte, default 1.6G,
|
||||
// actual low water mark=min(1.6G, MemTotal * 10%), avoid wasting too much memory on machines
|
||||
// with large memory larger than 16G.
|
||||
@ -907,26 +902,12 @@ DECLARE_String(function_service_protocol);
|
||||
// use which load balancer to select server to connect
|
||||
DECLARE_String(rpc_load_balancer);
|
||||
|
||||
// The maximum buffer/queue size to collect span. After the size is reached, spans are dropped.
|
||||
// An export will be triggered when the number of spans in the queue reaches half of the maximum.
|
||||
DECLARE_Int32(max_span_queue_size);
|
||||
|
||||
// The maximum batch size of every export spans. It must be smaller or equal to max_queue_size.
|
||||
DECLARE_Int32(max_span_export_batch_size);
|
||||
|
||||
// The time interval between two consecutive export spans.
|
||||
DECLARE_Int32(export_span_schedule_delay_millis);
|
||||
|
||||
// a soft limit of string type length, the hard limit is 2GB - 4, but if too long will cause very low performance,
|
||||
// so we set a soft limit, default is 1MB
|
||||
DECLARE_mInt32(string_type_length_soft_limit_bytes);
|
||||
|
||||
DECLARE_mInt32(jsonb_type_length_soft_limit_bytes);
|
||||
|
||||
// used for olap scanner to save memory, when the size of unused_object_pool
|
||||
// is greater than object_pool_buffer_size, release the object in the unused_object_pool.
|
||||
DECLARE_Int32(object_pool_buffer_size);
|
||||
|
||||
// Threshold fo reading a small file into memory
|
||||
DECLARE_mInt32(in_memory_file_size);
|
||||
|
||||
|
||||
@ -8,8 +8,8 @@
|
||||
#include <gen_cpp/Status_types.h> // for TStatus
|
||||
#include <gen_cpp/types.pb.h>
|
||||
#include <glog/logging.h>
|
||||
#include <stdint.h>
|
||||
|
||||
#include <cstdint>
|
||||
#include <iostream>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
@ -93,7 +93,8 @@ namespace ErrorCode {
|
||||
E(VERSION_NOT_EXIST, -214, false); \
|
||||
E(TABLE_NOT_FOUND, -215, true); \
|
||||
E(TRY_LOCK_FAILED, -216, false); \
|
||||
E(OUT_OF_BOUND, -218, true); \
|
||||
E(EXCEEDED_LIMIT, -217, false); \
|
||||
E(OUT_OF_BOUND, -218, false); \
|
||||
E(INVALID_ROOT_PATH, -222, true); \
|
||||
E(NO_AVAILABLE_ROOT_PATH, -223, true); \
|
||||
E(CHECK_LINES_ERROR, -224, true); \
|
||||
|
||||
@ -194,7 +194,7 @@ int HttpStreamAction::on_header(HttpRequest* req) {
|
||||
<< " Bytes) exceeds the WAL (Write-Ahead Log) limit ("
|
||||
<< config::wal_max_disk_size * 0.8
|
||||
<< " Bytes). Please set this load to \"group commit\"=false.";
|
||||
st = Status::InternalError("Http load size too large.");
|
||||
st = Status::Error<EXCEEDED_LIMIT>("Http load size too large.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -219,7 +219,7 @@ int StreamLoadAction::on_header(HttpRequest* req) {
|
||||
<< " Bytes) exceeds the WAL (Write-Ahead Log) limit ("
|
||||
<< config::wal_max_disk_size * 0.8
|
||||
<< " Bytes). Please set this load to \"group commit\"=false.";
|
||||
st = Status::InternalError("Stream load size too large.");
|
||||
st = Status::Error<EXCEEDED_LIMIT>("Stream load size too large.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -23,6 +23,7 @@
|
||||
#include <string>
|
||||
|
||||
#include "common/config.h"
|
||||
#include "common/status.h"
|
||||
#include "http/http_channel.h"
|
||||
#include "http/http_headers.h"
|
||||
#include "http/http_request.h"
|
||||
@ -209,7 +210,9 @@ Status TabletMigrationAction::_check_migrate_request(int64_t tablet_id, int32_t
|
||||
if ((*dest_store)->reach_capacity_limit(tablet_size)) {
|
||||
LOG(WARNING) << "reach the capacity limit of path: " << (*dest_store)->path()
|
||||
<< ", tablet size: " << tablet_size;
|
||||
return Status::InternalError("Insufficient disk capacity");
|
||||
return Status::Error<ErrorCode::EXCEEDED_LIMIT>(
|
||||
"reach the capacity limit of path {}, tablet_size={}", (*dest_store)->path(),
|
||||
tablet_size);
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
|
||||
@ -21,12 +21,14 @@
|
||||
#include <CLucene/search/IndexSearcher.h>
|
||||
#include <CLucene/util/bkd/bkd_reader.h>
|
||||
// IWYU pragma: no_include <bthread/errno.h>
|
||||
#include <errno.h> // IWYU pragma: keep
|
||||
#include <string.h>
|
||||
#include <sys/resource.h>
|
||||
|
||||
#include <cerrno> // IWYU pragma: keep
|
||||
#include <cstring>
|
||||
// IWYU pragma: no_include <bits/chrono.h>
|
||||
#include <chrono> // IWYU pragma: keep
|
||||
#include <iostream>
|
||||
#include <memory>
|
||||
|
||||
#include "common/logging.h"
|
||||
#include "olap/olap_common.h"
|
||||
@ -38,8 +40,7 @@
|
||||
#include "util/defer_op.h"
|
||||
#include "util/runtime_profile.h"
|
||||
|
||||
namespace doris {
|
||||
namespace segment_v2 {
|
||||
namespace doris::segment_v2 {
|
||||
|
||||
Status FulltextIndexSearcherBuilder::build(DorisCompoundReader* directory,
|
||||
OptionalIndexSearcherPtr& output_searcher) {
|
||||
@ -109,8 +110,7 @@ InvertedIndexSearcherCache::InvertedIndexSearcherCache(size_t capacity, uint32_t
|
||||
|
||||
if (config::enable_inverted_index_cache_check_timestamp) {
|
||||
auto get_last_visit_time = [](const void* value) -> int64_t {
|
||||
InvertedIndexSearcherCache::CacheValue* cache_value =
|
||||
(InvertedIndexSearcherCache::CacheValue*)value;
|
||||
auto* cache_value = (InvertedIndexSearcherCache::CacheValue*)value;
|
||||
return cache_value->last_visit_time;
|
||||
};
|
||||
_cache = std::unique_ptr<Cache>(
|
||||
@ -146,8 +146,7 @@ Status InvertedIndexSearcherCache::get_index_searcher(
|
||||
cache_handle->owned = !use_cache;
|
||||
IndexSearcherPtr index_searcher;
|
||||
std::unique_ptr<IndexSearcherBuilder> index_builder = nullptr;
|
||||
auto mem_tracker =
|
||||
std::unique_ptr<MemTracker>(new MemTracker("InvertedIndexSearcherCacheWithRead"));
|
||||
auto mem_tracker = std::make_unique<MemTracker>("InvertedIndexSearcherCacheWithRead");
|
||||
#ifndef BE_TEST
|
||||
{
|
||||
bool exists = false;
|
||||
@ -280,7 +279,7 @@ Status InvertedIndexSearcherCache::insert(const io::FileSystemSPtr& fs,
|
||||
cache_value->index_searcher = std::move(index_searcher);
|
||||
cache_value->size = mem_tracker->consumption();
|
||||
cache_value->last_visit_time = UnixMillis();
|
||||
auto lru_handle = _insert(cache_key, cache_value.release());
|
||||
auto* lru_handle = _insert(cache_key, cache_value.release());
|
||||
_cache->release(lru_handle);
|
||||
return Status::OK();
|
||||
}
|
||||
@ -300,7 +299,7 @@ int64_t InvertedIndexSearcherCache::mem_consumption() {
|
||||
|
||||
bool InvertedIndexSearcherCache::_lookup(const InvertedIndexSearcherCache::CacheKey& key,
|
||||
InvertedIndexCacheHandle* handle) {
|
||||
auto lru_handle = _cache->lookup(key.index_file_path);
|
||||
auto* lru_handle = _cache->lookup(key.index_file_path);
|
||||
if (lru_handle == nullptr) {
|
||||
return false;
|
||||
}
|
||||
@ -311,8 +310,7 @@ bool InvertedIndexSearcherCache::_lookup(const InvertedIndexSearcherCache::Cache
|
||||
Cache::Handle* InvertedIndexSearcherCache::_insert(const InvertedIndexSearcherCache::CacheKey& key,
|
||||
CacheValue* value) {
|
||||
auto deleter = [](const doris::CacheKey& key, void* value) {
|
||||
InvertedIndexSearcherCache::CacheValue* cache_value =
|
||||
(InvertedIndexSearcherCache::CacheValue*)value;
|
||||
auto* cache_value = (InvertedIndexSearcherCache::CacheValue*)value;
|
||||
delete cache_value;
|
||||
};
|
||||
|
||||
@ -325,7 +323,7 @@ bool InvertedIndexQueryCache::lookup(const CacheKey& key, InvertedIndexQueryCach
|
||||
if (key.encode().empty()) {
|
||||
return false;
|
||||
}
|
||||
auto lru_handle = _cache->lookup(key.encode());
|
||||
auto* lru_handle = _cache->lookup(key.encode());
|
||||
if (lru_handle == nullptr) {
|
||||
return false;
|
||||
}
|
||||
@ -348,8 +346,8 @@ void InvertedIndexQueryCache::insert(const CacheKey& key, std::shared_ptr<roarin
|
||||
return;
|
||||
}
|
||||
|
||||
auto lru_handle = _cache->insert(key.encode(), (void*)cache_value_ptr.release(),
|
||||
bitmap->getSizeInBytes(), deleter, CachePriority::NORMAL);
|
||||
auto* lru_handle = _cache->insert(key.encode(), (void*)cache_value_ptr.release(),
|
||||
bitmap->getSizeInBytes(), deleter, CachePriority::NORMAL);
|
||||
*handle = InvertedIndexQueryCacheHandle(_cache.get(), lru_handle);
|
||||
}
|
||||
|
||||
@ -360,5 +358,4 @@ int64_t InvertedIndexQueryCache::mem_consumption() {
|
||||
return 0L;
|
||||
}
|
||||
|
||||
} // namespace segment_v2
|
||||
} // namespace doris
|
||||
} // namespace doris::segment_v2
|
||||
|
||||
@ -1339,10 +1339,10 @@ Status SchemaChangeHandler::_parse_request(const SchemaChangeParams& sc_params,
|
||||
Status SchemaChangeHandler::_init_column_mapping(ColumnMapping* column_mapping,
|
||||
const TabletColumn& column_schema,
|
||||
const std::string& value) {
|
||||
column_mapping->default_value = WrapperField::create(column_schema);
|
||||
|
||||
if (column_mapping->default_value == nullptr) {
|
||||
return Status::Error<MEM_ALLOC_FAILED>("column_mapping->default_value is nullptr");
|
||||
if (auto field = WrapperField::create(column_schema); field.has_value()) {
|
||||
column_mapping->default_value = field.value();
|
||||
} else {
|
||||
return field.error();
|
||||
}
|
||||
|
||||
if (column_schema.is_nullable() && value.length() == 0) {
|
||||
|
||||
@ -437,7 +437,9 @@ Status SingleReplicaCompaction::_download_files(DataDir* data_dir,
|
||||
HttpClient::execute_with_retry(DOWNLOAD_FILE_MAX_RETRY, 1, get_file_size_cb));
|
||||
// check disk capacity
|
||||
if (data_dir->reach_capacity_limit(file_size)) {
|
||||
return Status::InternalError("Disk reach capacity limit");
|
||||
return Status::Error<EXCEEDED_LIMIT>(
|
||||
"reach the capacity limit of path {}, file_size={}", data_dir->path(),
|
||||
file_size);
|
||||
}
|
||||
|
||||
total_file_size += file_size;
|
||||
|
||||
@ -402,7 +402,9 @@ Status StorageEngine::_check_file_descriptor_number() {
|
||||
LOG(ERROR) << "File descriptor number is less than " << config::min_file_descriptor_number
|
||||
<< ". Please use (ulimit -n) to set a value equal or greater than "
|
||||
<< config::min_file_descriptor_number;
|
||||
return Status::InternalError("file descriptors limit is too small");
|
||||
return Status::Error<ErrorCode::EXCEEDED_LIMIT>(
|
||||
"file descriptors limit {} is small than {}", l.rlim_cur,
|
||||
config::min_file_descriptor_number);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -110,7 +110,7 @@ Status EngineBatchLoadTask::_init() {
|
||||
|
||||
// check disk capacity
|
||||
if (_push_req.push_type == TPushType::LOAD_V2) {
|
||||
if (tablet->data_dir()->reach_capacity_limit(_push_req.__isset.http_file_size)) {
|
||||
if (tablet->data_dir()->reach_capacity_limit(_push_req.http_file_size)) {
|
||||
return Status::IOError("Disk does not have enough capacity");
|
||||
}
|
||||
}
|
||||
|
||||
@ -515,7 +515,9 @@ Status EngineCloneTask::_download_files(DataDir* data_dir, const std::string& re
|
||||
HttpClient::execute_with_retry(DOWNLOAD_FILE_MAX_RETRY, 1, get_file_size_cb));
|
||||
// check disk capacity
|
||||
if (data_dir->reach_capacity_limit(file_size)) {
|
||||
return Status::InternalError("Disk reach capacity limit");
|
||||
return Status::Error<EXCEEDED_LIMIT>(
|
||||
"reach the capacity limit of path {}, file_size={}", data_dir->path(),
|
||||
file_size);
|
||||
}
|
||||
|
||||
total_file_size += file_size;
|
||||
|
||||
@ -18,21 +18,23 @@
|
||||
#include "olap/wrapper_field.h"
|
||||
|
||||
#include <glog/logging.h>
|
||||
#include <string.h>
|
||||
|
||||
#include <algorithm>
|
||||
#include <cstring>
|
||||
#include <ostream>
|
||||
|
||||
#include "common/config.h"
|
||||
#include "common/status.h"
|
||||
#include "olap/olap_common.h"
|
||||
#include "olap/olap_define.h"
|
||||
#include "olap/row_cursor.h"
|
||||
#include "util/expected.hpp"
|
||||
|
||||
namespace doris {
|
||||
|
||||
const size_t DEFAULT_STRING_LENGTH = 50;
|
||||
|
||||
WrapperField* WrapperField::create(const TabletColumn& column, uint32_t len) {
|
||||
Result<WrapperField*> WrapperField::create(const TabletColumn& column, uint32_t len) {
|
||||
bool is_string_type = (column.type() == FieldType::OLAP_FIELD_TYPE_CHAR ||
|
||||
column.type() == FieldType::OLAP_FIELD_TYPE_VARCHAR ||
|
||||
column.type() == FieldType::OLAP_FIELD_TYPE_HLL ||
|
||||
@ -44,12 +46,13 @@ WrapperField* WrapperField::create(const TabletColumn& column, uint32_t len) {
|
||||
if (is_string_type && len > max_length) {
|
||||
LOG(WARNING) << "length of string parameter is too long[len=" << len
|
||||
<< ", max_len=" << max_length << "].";
|
||||
return nullptr;
|
||||
return unexpected {Status::Error<ErrorCode::EXCEEDED_LIMIT>(
|
||||
"length of string parameter is too long[len={}, max_len={}].", len, max_length)};
|
||||
}
|
||||
|
||||
Field* rep = FieldFactory::create(column);
|
||||
if (rep == nullptr) {
|
||||
return nullptr;
|
||||
return unexpected {Status::Uninitialized("Unsupport field creation of {}", column.name())};
|
||||
}
|
||||
|
||||
size_t variable_len = 0;
|
||||
@ -67,9 +70,7 @@ WrapperField* WrapperField::create(const TabletColumn& column, uint32_t len) {
|
||||
} else {
|
||||
variable_len = column.length();
|
||||
}
|
||||
|
||||
WrapperField* wrapper = new WrapperField(rep, variable_len, is_string_type);
|
||||
return wrapper;
|
||||
return new WrapperField(rep, variable_len, is_string_type);
|
||||
}
|
||||
|
||||
WrapperField* WrapperField::create_by_type(const FieldType& type, int32_t var_length) {
|
||||
@ -83,8 +84,7 @@ WrapperField* WrapperField::create_by_type(const FieldType& type, int32_t var_le
|
||||
type == FieldType::OLAP_FIELD_TYPE_OBJECT ||
|
||||
type == FieldType::OLAP_FIELD_TYPE_STRING ||
|
||||
type == FieldType::OLAP_FIELD_TYPE_QUANTILE_STATE);
|
||||
auto wrapper = new WrapperField(rep, var_length, is_string_type);
|
||||
return wrapper;
|
||||
return new WrapperField(rep, var_length, is_string_type);
|
||||
}
|
||||
|
||||
WrapperField::WrapperField(Field* rep, size_t variable_len, bool is_string_type)
|
||||
@ -98,7 +98,7 @@ WrapperField::WrapperField(Field* rep, size_t variable_len, bool is_string_type)
|
||||
|
||||
if (_is_string_type) {
|
||||
_var_length = variable_len > DEFAULT_STRING_LENGTH ? DEFAULT_STRING_LENGTH : variable_len;
|
||||
Slice* slice = reinterpret_cast<Slice*>(buf);
|
||||
auto* slice = reinterpret_cast<Slice*>(buf);
|
||||
slice->size = _var_length;
|
||||
_string_content.reset(new char[slice->size]);
|
||||
slice->data = _string_content.get();
|
||||
|
||||
@ -34,7 +34,7 @@ enum class FieldType;
|
||||
|
||||
class WrapperField {
|
||||
public:
|
||||
static WrapperField* create(const TabletColumn& column, uint32_t len = 0);
|
||||
static Result<WrapperField*> create(const TabletColumn& column, uint32_t len = 0);
|
||||
static WrapperField* create_by_type(const FieldType& type) { return create_by_type(type, 0); }
|
||||
static WrapperField* create_by_type(const FieldType& type, int32_t var_length);
|
||||
|
||||
|
||||
@ -312,7 +312,9 @@ Status SnapshotLoader::download(const std::map<std::string, std::string>& src_to
|
||||
|
||||
// check disk capacity
|
||||
if (data_dir->reach_capacity_limit(file_len)) {
|
||||
return Status::InternalError("capacity limit reached");
|
||||
return Status::Error<ErrorCode::EXCEEDED_LIMIT>(
|
||||
"reach the capacity limit of path {}, file_size={}", data_dir->path(),
|
||||
file_len);
|
||||
}
|
||||
// remove file which will be downloaded now.
|
||||
// this file will be added to local_files if it be downloaded successfully.
|
||||
@ -545,7 +547,9 @@ Status SnapshotLoader::remote_http_download(
|
||||
|
||||
// check disk capacity
|
||||
if (data_dir->reach_capacity_limit(file_size)) {
|
||||
return Status::InternalError("Disk reach capacity limit");
|
||||
return Status::Error<ErrorCode::EXCEEDED_LIMIT>(
|
||||
"reach the capacity limit of path {}, file_size={}", data_dir->path(),
|
||||
file_size);
|
||||
}
|
||||
|
||||
total_file_size += file_size;
|
||||
|
||||
@ -182,9 +182,9 @@ VExpr::VExpr(const TExprNode& node)
|
||||
|
||||
VExpr::VExpr(const VExpr& vexpr) = default;
|
||||
|
||||
VExpr::VExpr(const TypeDescriptor& type, bool is_slotref, bool is_nullable)
|
||||
VExpr::VExpr(TypeDescriptor type, bool is_slotref, bool is_nullable)
|
||||
: _opcode(TExprOpcode::INVALID_OPCODE),
|
||||
_type(type),
|
||||
_type(std::move(type)),
|
||||
_fn_context_index(-1),
|
||||
_prepared(false) {
|
||||
if (is_slotref) {
|
||||
@ -197,13 +197,13 @@ VExpr::VExpr(const TypeDescriptor& type, bool is_slotref, bool is_nullable)
|
||||
Status VExpr::prepare(RuntimeState* state, const RowDescriptor& row_desc, VExprContext* context) {
|
||||
++context->_depth_num;
|
||||
if (context->_depth_num > config::max_depth_of_expr_tree) {
|
||||
return Status::InternalError(
|
||||
return Status::Error<ErrorCode::EXCEEDED_LIMIT>(
|
||||
"The depth of the expression tree is too big, make it less than {}",
|
||||
config::max_depth_of_expr_tree);
|
||||
}
|
||||
|
||||
for (int i = 0; i < _children.size(); ++i) {
|
||||
RETURN_IF_ERROR(_children[i]->prepare(state, row_desc, context));
|
||||
for (auto& i : _children) {
|
||||
RETURN_IF_ERROR(i->prepare(state, row_desc, context));
|
||||
}
|
||||
--context->_depth_num;
|
||||
return Status::OK();
|
||||
@ -211,8 +211,8 @@ Status VExpr::prepare(RuntimeState* state, const RowDescriptor& row_desc, VExprC
|
||||
|
||||
Status VExpr::open(RuntimeState* state, VExprContext* context,
|
||||
FunctionContext::FunctionStateScope scope) {
|
||||
for (int i = 0; i < _children.size(); ++i) {
|
||||
RETURN_IF_ERROR(_children[i]->open(state, context, scope));
|
||||
for (auto& i : _children) {
|
||||
RETURN_IF_ERROR(i->open(state, context, scope));
|
||||
}
|
||||
if (scope == FunctionContext::FRAGMENT_LOCAL) {
|
||||
RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr));
|
||||
|
||||
@ -80,7 +80,7 @@ public:
|
||||
|
||||
VExpr(const TExprNode& node);
|
||||
VExpr(const VExpr& vexpr);
|
||||
VExpr(const TypeDescriptor& type, bool is_slotref, bool is_nullable);
|
||||
VExpr(TypeDescriptor type, bool is_slotref, bool is_nullable);
|
||||
// only used for test
|
||||
VExpr() = default;
|
||||
virtual ~VExpr() = default;
|
||||
|
||||
@ -71,7 +71,7 @@ suite("test_group_commit_wal_limit") {
|
||||
assertEquals(code, 0)
|
||||
out = process.text
|
||||
logger.info("out is " + out )
|
||||
assertTrue(out.contains('[INTERNAL_ERROR]Stream load size too large'))
|
||||
assertTrue(out.contains('Stream load size too large'))
|
||||
|
||||
// too lagre data case 1TB
|
||||
strBuilder = new StringBuilder()
|
||||
@ -89,7 +89,7 @@ suite("test_group_commit_wal_limit") {
|
||||
assertEquals(code, 0)
|
||||
out = process.text
|
||||
logger.info("out is " + out )
|
||||
assertTrue(out.contains('[INTERNAL_ERROR]Stream load size too large'))
|
||||
assertTrue(out.contains('Stream load size too large'))
|
||||
|
||||
// httpload
|
||||
// normal case
|
||||
@ -126,7 +126,7 @@ suite("test_group_commit_wal_limit") {
|
||||
assertEquals(code, 0)
|
||||
out = process.text
|
||||
logger.info("out is " + out )
|
||||
assertTrue(out.contains('[INTERNAL_ERROR]Http load size too large'))
|
||||
assertTrue(out.contains('Http load size too large'))
|
||||
|
||||
// too lagre data case 1TB
|
||||
strBuilder = new StringBuilder()
|
||||
@ -144,5 +144,5 @@ suite("test_group_commit_wal_limit") {
|
||||
assertEquals(code, 0)
|
||||
out = process.text
|
||||
logger.info("out is " + out )
|
||||
assertTrue(out.contains('[INTERNAL_ERROR]Http load size too large'))
|
||||
assertTrue(out.contains('Http load size too large'))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user