Fix the excessive memory usage issue of ob_virtual_proxy_sys_variable
This commit is contained in:
parent
5e45c52767
commit
71033c8584
@ -33,12 +33,8 @@ namespace observer
|
||||
{
|
||||
ObVirtualProxySysVariable::ObVirtualProxySysVariable()
|
||||
: is_inited_(false),
|
||||
is_queried_(false),
|
||||
table_schema_(NULL),
|
||||
tenant_info_(NULL),
|
||||
var_state_(),
|
||||
idx_(-1),
|
||||
var_states_(),
|
||||
config_(NULL),
|
||||
sys_variable_schema_(NULL)
|
||||
{
|
||||
@ -48,12 +44,22 @@ ObVirtualProxySysVariable::~ObVirtualProxySysVariable()
|
||||
{
|
||||
}
|
||||
|
||||
void ObVirtualProxySysVariable::reset()
|
||||
{
|
||||
ObVirtualTableScannerIterator::reset();
|
||||
is_inited_ = false;
|
||||
table_schema_ = NULL;
|
||||
tenant_info_ = NULL;
|
||||
config_ = NULL;
|
||||
sys_variable_schema_ = NULL;
|
||||
}
|
||||
|
||||
int ObVirtualProxySysVariable::init(ObMultiVersionSchemaService &schema_service, ObServerConfig *config)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(is_inited_)) {
|
||||
ret = OB_INIT_TWICE;
|
||||
SERVER_LOG(WARN, "init twice", K(ret));
|
||||
LOG_WARN("init twice", KR(ret));
|
||||
} else if (OB_ISNULL(schema_guard_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("schema_guard is null", KR(ret));
|
||||
@ -62,22 +68,18 @@ int ObVirtualProxySysVariable::init(ObMultiVersionSchemaService &schema_service,
|
||||
LOG_WARN("failed to get table schema", KR(ret));
|
||||
} else if (OB_ISNULL(table_schema_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
SERVER_LOG(WARN, "table_schema_ is NULL", KP_(table_schema), K(ret));
|
||||
LOG_WARN("table_schema_ is NULL", KP_(table_schema), KR(ret));
|
||||
} else if (OB_FAIL(schema_guard_->get_tenant_info(OB_SYS_TENANT_ID, tenant_info_))) {
|
||||
SERVER_LOG(WARN, "tenant_info_ is NULL", KP_(tenant_info), K(ret));
|
||||
LOG_WARN("tenant_info_ is NULL", KP_(tenant_info), KR(ret));
|
||||
} else if (OB_ISNULL(tenant_info_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
SERVER_LOG(WARN, "tenant_info_ is null");
|
||||
LOG_WARN("tenant_info_ is null");
|
||||
} else if (OB_FAIL(schema_guard_->get_sys_variable_schema(OB_SYS_TENANT_ID, sys_variable_schema_))) {
|
||||
SERVER_LOG(WARN, "get sys variable schema failed", K(ret));
|
||||
LOG_WARN("get sys variable schema failed", KR(ret));
|
||||
} else if (OB_ISNULL(sys_variable_schema_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("sys variable schema is null", KR(ret));
|
||||
} else {
|
||||
is_queried_ = false;
|
||||
var_state_.reset();
|
||||
idx_ = -1;
|
||||
var_states_.reset();
|
||||
config_ = config;
|
||||
|
||||
is_inited_ = true;
|
||||
@ -85,34 +87,6 @@ int ObVirtualProxySysVariable::init(ObMultiVersionSchemaService &schema_service,
|
||||
return ret;
|
||||
}
|
||||
|
||||
ObVirtualProxySysVariable::ObVarStateInfo::ObVarStateInfo()
|
||||
{
|
||||
reset();
|
||||
}
|
||||
|
||||
void ObVirtualProxySysVariable::ObVarStateInfo::reset()
|
||||
{
|
||||
data_type_ = 0;
|
||||
flags_ = 0;
|
||||
tenant_id_ = OB_INVALID_TENANT_ID;
|
||||
name_buf_[0] = '\0';
|
||||
name_len_ = 0;
|
||||
value_buf_[0] = '\0';
|
||||
value_len_ = 0;
|
||||
gmt_modified_ = OB_INVALID_TIMESTAMP;
|
||||
}
|
||||
|
||||
bool ObVirtualProxySysVariable::ObVarStateInfo::is_valid() const
|
||||
{
|
||||
return data_type_ > 0
|
||||
&& flags_ > 0
|
||||
&& tenant_id_ > 0
|
||||
&& name_len_ > 0
|
||||
&& static_cast<int64_t>(strlen(name_buf_)) == name_len_
|
||||
&& gmt_modified_ != OB_INVALID_TIMESTAMP;
|
||||
}
|
||||
|
||||
|
||||
int ObVirtualProxySysVariable::inner_get_next_row(ObNewRow *&row)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -123,133 +97,89 @@ int ObVirtualProxySysVariable::inner_get_next_row(ObNewRow *&row)
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not inited" , KR(ret));
|
||||
} else if (!start_to_read_) {
|
||||
start_to_read_ = true;
|
||||
if (OB_FAIL(get_all_sys_variable())) {
|
||||
LOG_WARN("fill scanner fail", KR(ret));
|
||||
} else {
|
||||
scanner_it_ = scanner_.begin();
|
||||
start_to_read_ = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
ObArray<Column> columns;
|
||||
if (OB_FAIL(get_next_sys_variable())) {
|
||||
if (OB_ITER_END != ret) {
|
||||
LOG_WARN("failed to get sys variable info", KR(ret));
|
||||
if (OB_SUCCESS == ret && start_to_read_) {
|
||||
if (OB_FAIL(scanner_it_.get_next_row(cur_row_))) {
|
||||
if (OB_UNLIKELY(OB_ITER_END != ret)) {
|
||||
LOG_WARN("fail to get next row", KR(ret));
|
||||
}
|
||||
} else if (OB_FAIL(get_full_row(table_schema_, var_state_, columns))) {
|
||||
LOG_WARN("failed to get full row", "table_schema", *table_schema_, K(var_state_), KR(ret));
|
||||
} else if (OB_FAIL(project_row(columns, cur_row_))) {
|
||||
LOG_WARN("failed to project row", KR(ret));
|
||||
} else {
|
||||
row = &cur_row_;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObVirtualProxySysVariable::get_full_row(const share::schema::ObTableSchema *table,
|
||||
const ObVarStateInfo &var_state,
|
||||
ObIArray<Column> &columns)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not inited" , KR(ret));
|
||||
} else if (OB_ISNULL(table)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("table is null", KR(ret));
|
||||
} else if (!var_state.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid var_state", K(var_state), KR(ret));
|
||||
} else {
|
||||
ADD_COLUMN(set_int, table, "data_type", var_state.data_type_, columns);
|
||||
ADD_COLUMN(set_int, table, "flags", var_state.flags_, columns);
|
||||
ADD_COLUMN(set_int, table, "tenant_id", var_state.tenant_id_, columns);
|
||||
ADD_COLUMN(set_varchar, table, "name",
|
||||
ObString(var_state.name_len_, var_state.name_buf_), columns);
|
||||
ADD_COLUMN(set_varchar, table, "value",
|
||||
ObString(var_state.value_len_, var_state.value_buf_), columns);
|
||||
ADD_COLUMN(set_int, table, "modified_time", var_state.gmt_modified_, columns);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
int ObVirtualProxySysVariable::get_next_sys_variable()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not inited" , KR(ret));
|
||||
} else if (is_queried_ && (idx_ == var_states_.count())) {
|
||||
ret = OB_ITER_END;
|
||||
} else {
|
||||
if (!is_queried_) {
|
||||
if (OB_FAIL(get_all_sys_variable())) {
|
||||
LOG_WARN("failed to get all sys variable", KR(ret));
|
||||
} else {
|
||||
idx_ = 0;
|
||||
is_queried_ = true;
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (idx_ < 0 || idx_ >= var_states_.count()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invaild idx_", K_(idx), KR(ret));
|
||||
} else {
|
||||
var_state_ = var_states_[idx_];
|
||||
++idx_;
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObVirtualProxySysVariable::get_all_sys_variable()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not inited" , KR(ret));
|
||||
} else if (OB_ISNULL(tenant_info_)
|
||||
ObCollationType coll_type = ObCharset::get_default_collation(ObCharset::get_default_charset());
|
||||
if (OB_ISNULL(tenant_info_)
|
||||
|| OB_ISNULL(sys_variable_schema_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("tenant info or sys_variable schema is null");
|
||||
} else {
|
||||
ObVarStateInfo *var_state = OB_NEW(ObVarStateInfo, ObModIds::OB_PROXY_DEFAULT_SYS_VARIABLE);
|
||||
if (OB_ISNULL(var_state)) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("malloc ObVarStateInfo failed", KR(ret));
|
||||
}
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < sys_variable_schema_->get_sysvar_count(); ++i) {
|
||||
if (sys_variable_schema_->get_sysvar_schema(i) != NULL) {
|
||||
var_state->reset();
|
||||
const ObString &name = sys_variable_schema_->get_sysvar_schema(i)->get_name();
|
||||
const ObString &value = sys_variable_schema_->get_sysvar_schema(i)->get_value();
|
||||
var_state->data_type_ = sys_variable_schema_->get_sysvar_schema(i)->get_data_type();
|
||||
var_state->flags_ = sys_variable_schema_->get_sysvar_schema(i)->get_flags();
|
||||
var_state->tenant_id_ = sys_variable_schema_->get_sysvar_schema(i)->get_tenant_id();
|
||||
var_state->gmt_modified_ = sys_variable_schema_->get_sysvar_schema(i)->get_schema_version();
|
||||
ObString name_buffer;
|
||||
ObString value_buffer;
|
||||
name_buffer.assign_buffer(var_state->name_buf_, sizeof(var_state->name_buf_));
|
||||
value_buffer.assign_buffer(var_state->value_buf_, sizeof(var_state->value_buf_));
|
||||
if (OB_UNLIKELY((var_state->name_len_ = name_buffer.write(name.ptr(), name.length())) < name.length())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("write sysvar name failed", K(name));
|
||||
} else if (OB_UNLIKELY(var_state->name_len_ >= sizeof(var_state->name_buf_))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("name length is invalid", K_(var_state->name_len));
|
||||
} else if (FALSE_IT(var_state->name_buf_[var_state->name_len_] = '\0')) {
|
||||
//do nothing
|
||||
} else if (OB_UNLIKELY((var_state->value_len_ = value_buffer.write(value.ptr(), value.length())) < value.length())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("write sysvar value failed", K(value));
|
||||
} else if (OB_FAIL(var_states_.push_back(*var_state))) {
|
||||
LOG_WARN("failed to push back", K(var_state), KR(ret));
|
||||
const share::schema::ObSysVarSchema *sysvar_schema = sys_variable_schema_->get_sysvar_schema(i);
|
||||
uint64_t cell_idx = 0;
|
||||
const int64_t col_count = output_column_ids_.count();
|
||||
for (int64_t j = 0; OB_SUCC(ret) && j < col_count; ++j) {
|
||||
uint64_t col_id = output_column_ids_.at(j);
|
||||
switch (col_id) {
|
||||
case NAME: {
|
||||
cur_row_.cells_[cell_idx].set_varchar(sysvar_schema->get_name());
|
||||
cur_row_.cells_[cell_idx].set_collation_type(coll_type);
|
||||
break;
|
||||
}
|
||||
case TENANT_ID: {
|
||||
cur_row_.cells_[cell_idx].set_int(sysvar_schema->get_tenant_id());
|
||||
break;
|
||||
}
|
||||
case DATA_TYPE: {
|
||||
cur_row_.cells_[cell_idx].set_int(sysvar_schema->get_data_type());
|
||||
break;
|
||||
}
|
||||
case VALUE: {
|
||||
cur_row_.cells_[cell_idx].set_varchar(sysvar_schema->get_value());
|
||||
cur_row_.cells_[cell_idx].set_collation_type(coll_type);
|
||||
break;
|
||||
}
|
||||
case FLAGS: {
|
||||
cur_row_.cells_[cell_idx].set_int(sysvar_schema->get_flags());
|
||||
break;
|
||||
}
|
||||
case MODIFIED_TIME: {
|
||||
cur_row_.cells_[cell_idx].set_int(
|
||||
sysvar_schema->get_schema_version());
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invalid column id", KR(ret), K(cell_idx), K(i),
|
||||
K(output_column_ids_), K(col_id));
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
cell_idx++;
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(scanner_.add_row(cur_row_))) {
|
||||
LOG_WARN("fail to add row", KR(ret), K(cur_row_));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (var_state != NULL) {
|
||||
ob_delete(var_state);
|
||||
var_state = NULL;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -34,7 +34,7 @@ class ObServerConfig;
|
||||
}
|
||||
namespace observer
|
||||
{
|
||||
class ObVirtualProxySysVariable : public common::ObVirtualTableProjector
|
||||
class ObVirtualProxySysVariable : public common::ObVirtualTableScannerIterator
|
||||
{
|
||||
public:
|
||||
ObVirtualProxySysVariable();
|
||||
@ -43,40 +43,25 @@ public:
|
||||
int init(share::schema::ObMultiVersionSchemaService &schema_service, common::ObServerConfig *config);
|
||||
|
||||
virtual int inner_get_next_row(common::ObNewRow *&row);
|
||||
virtual void reset();
|
||||
|
||||
private:
|
||||
struct ObVarStateInfo
|
||||
enum PROXY_SYSVAR_COLUMN
|
||||
{
|
||||
ObVarStateInfo();
|
||||
~ObVarStateInfo() {}
|
||||
bool is_valid() const;
|
||||
void reset();
|
||||
TO_STRING_KV(K_(data_type), K_(flags), K_(tenant_id), K_(name_buf), K_(name_len),
|
||||
K_(value_buf), K_(value_len), K_(gmt_modified));
|
||||
|
||||
int64_t data_type_;
|
||||
int64_t flags_;
|
||||
int64_t tenant_id_;
|
||||
char name_buf_[common::OB_MAX_CONFIG_NAME_LEN+1];
|
||||
int64_t name_len_;
|
||||
char value_buf_[common::OB_MAX_CONFIG_VALUE_LEN+1];
|
||||
int64_t value_len_;
|
||||
int64_t gmt_modified_;
|
||||
NAME = common::OB_APP_MIN_COLUMN_ID,
|
||||
TENANT_ID,
|
||||
DATA_TYPE,
|
||||
VALUE,
|
||||
FLAGS,
|
||||
MODIFIED_TIME
|
||||
};
|
||||
|
||||
int get_full_row(const share::schema::ObTableSchema *table,
|
||||
const ObVarStateInfo &var_state,
|
||||
common::ObIArray<Column> &columns);
|
||||
int get_next_sys_variable();
|
||||
int get_all_sys_variable();
|
||||
|
||||
bool is_inited_;
|
||||
bool is_queried_;
|
||||
const share::schema::ObTableSchema *table_schema_;
|
||||
const share::schema::ObTenantSchema *tenant_info_;
|
||||
ObVarStateInfo var_state_;
|
||||
int64_t idx_;
|
||||
common::ObArray<ObVarStateInfo> var_states_;
|
||||
common::ObServerConfig *config_;
|
||||
const share::schema::ObSysVariableSchema *sys_variable_schema_;
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user