[FEAT MERGE]logical plan move to plan cache module add feedback into to plan

This commit is contained in:
zzg19950727
2023-04-18 11:41:58 +00:00
committed by ob-robot
parent e392fc2a37
commit 8daff3e658
122 changed files with 45692 additions and 43707 deletions

View File

@ -1,257 +1,18 @@
// Copyright 2010-2016 Alibaba Inc. All Rights Reserved.
// Author:
// zhenling.zzg
// this file defines implementation of plan real info manager
// this file defines implementation of plan info manager
#define USING_LOG_PREFIX SQL
#include "share/diagnosis/ob_sql_plan_monitor_node_list.h"
#include "lib/allocator/ob_concurrent_fifo_allocator.h"
#include "sql/session/ob_sql_session_info.h"
#include "lib/thread/thread_mgr.h"
#include "common/object/ob_object.h"
#include "ob_plan_info_manager.h"
#include "lib/ob_running_mode.h"
#include "util/easy_time.h"
#include "lib/rc/ob_rc.h"
#include "lib/compress/ob_compressor.h"
#include "lib/compress/ob_compressor_pool.h"
#include "observer/ob_server.h"
namespace oceanbase
{
namespace sql
{
ObPlanRealInfo::ObPlanRealInfo()
{
reset();
}
ObPlanRealInfo::~ObPlanRealInfo()
{
}
void ObPlanRealInfo::reset()
{
plan_id_ = 0;
sql_id_ = NULL;
sql_id_len_ = 0;
plan_hash_ = 0;
id_ = 0;
real_cost_ = 0;
real_cardinality_ = 0;
cpu_cost_ = 0;
io_cost_ = 0;
}
int64_t ObPlanRealInfo::get_extra_size() const
{
return sql_id_len_;
}
ObPlanRealInfoRecord::ObPlanRealInfoRecord()
:allocator_(NULL)
{
}
ObPlanRealInfoRecord::~ObPlanRealInfoRecord()
{
destroy();
}
void ObPlanRealInfoRecord::destroy()
{
if (NULL != allocator_) {
allocator_->free(this);
}
}
ObPlanRealInfoMgr::ObPlanRealInfoMgr(ObConcurrentFIFOAllocator *allocator)
:allocator_(allocator),
queue_(),
destroyed_(false),
inited_(false)
{
}
ObPlanRealInfoMgr::~ObPlanRealInfoMgr()
{
if (inited_) {
destroy();
}
}
int ObPlanRealInfoMgr::init(uint64_t tenant_id,
const int64_t queue_size)
{
int ret = OB_SUCCESS;
if (inited_) {
ret = OB_INIT_TWICE;
} else if (OB_FAIL(queue_.init(ObModIds::OB_SQL_PLAN,
queue_size,
tenant_id))) {
SERVER_LOG(WARN, "Failed to init ObMySQLRequestQueue", K(ret));
} else {
inited_ = true;
destroyed_ = false;
}
if ((OB_FAIL(ret)) && (!inited_)) {
destroy();
}
return ret;
}
void ObPlanRealInfoMgr::destroy()
{
if (!destroyed_) {
clear_queue();
queue_.destroy();
inited_ = false;
destroyed_ = true;
}
}
int ObPlanRealInfoMgr::handle_plan_info(int64_t id,
const ObString& sql_id,
uint64_t plan_id,
uint64_t plan_hash,
const ObMonitorNode &plan_info)
{
int ret = OB_SUCCESS;
ObPlanRealInfoRecord *record = NULL;
if (!inited_) {
ret = OB_NOT_INIT;
} else {
char *buf = NULL;
//alloc mem from allocator
int64_t pos = sizeof(ObPlanRealInfoRecord);
int64_t total_size = sizeof(ObPlanRealInfoRecord) +
sql_id.length();
if (NULL == (buf = (char*)alloc(total_size))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
if (REACH_TIME_INTERVAL(100 * 1000)) {
SERVER_LOG(WARN, "alloc mem failed", K(total_size), K(ret));
}
} else {
uint64_t cpu_khz = OBSERVER.get_cpu_frequency_khz();
int64_t row_count = plan_info.output_row_count_;
int64_t cpu_time = plan_info.db_time_*1000 / cpu_khz;
int64_t io_time = plan_info.block_time_*1000 / cpu_khz;
int64_t open_time = plan_info.open_time_;
int64_t last_row_time = plan_info.last_row_time_;
int64_t real_time = 0;
if (last_row_time > open_time) {
real_time = last_row_time - open_time;
}
record = new(buf)ObPlanRealInfoRecord();
record->allocator_ = allocator_;
record->data_.id_ = id;
record->data_.plan_id_ = plan_id;
record->data_.plan_hash_ = plan_hash;
record->data_.real_cost_ = real_time;
record->data_.real_cardinality_ = row_count;
record->data_.io_cost_ = io_time;
record->data_.cpu_cost_ = cpu_time;
if ((sql_id.length() > 0) && (NULL != sql_id.ptr())) {
MEMCPY(buf + pos, sql_id.ptr(), sql_id.length());
record->data_.sql_id_ = buf + pos;
pos += sql_id.length();
record->data_.sql_id_len_ = sql_id.length();
}
}
//push into queue
if (OB_SUCC(ret)) {
int64_t req_id = 0;
if (OB_FAIL(queue_.push(record, req_id))) {
if (REACH_TIME_INTERVAL(2 * 1000 * 1000)) {
SERVER_LOG(WARN, "push into queue failed", K(ret));
}
free(record);
record = NULL;
}
}
}
return ret;
}
ObConcurrentFIFOAllocator *ObPlanRealInfoMgr::get_allocator()
{
return allocator_;
}
void* ObPlanRealInfoMgr::alloc(const int64_t size)
{
void *ret = NULL;
if (allocator_ != NULL) {
ret = allocator_->alloc(size);
}
return ret;
}
void ObPlanRealInfoMgr::free(void *ptr)
{
if (allocator_ != NULL) {
allocator_->free(ptr);
ptr = NULL;
}
}
int ObPlanRealInfoMgr::get(const int64_t idx, void *&record, Ref* ref)
{
int ret = OB_SUCCESS;
if (NULL == (record = queue_.get(idx, ref))) {
ret = OB_ENTRY_NOT_EXIST;
}
return ret;
}
int ObPlanRealInfoMgr::revert(Ref* ref)
{
queue_.revert(ref);
return OB_SUCCESS;
}
int64_t ObPlanRealInfoMgr::release_old(int64_t limit)
{
void* req = NULL;
int64_t count = 0;
while(count < limit && NULL != (req = queue_.pop())) {
free(req);
++count;
}
return count;
}
void ObPlanRealInfoMgr::clear_queue()
{
(void)release_old(INT64_MAX);
}
bool ObPlanRealInfoMgr::is_valid() const
{
return inited_ && !destroyed_;
}
int64_t ObPlanRealInfoMgr::get_start_idx() const
{
return (int64_t)queue_.get_pop_idx();
}
int64_t ObPlanRealInfoMgr::get_end_idx() const
{
return (int64_t)queue_.get_push_idx();
}
int64_t ObPlanRealInfoMgr::get_size_used()
{
return (int64_t)queue_.get_size();
}
int64_t ObPlanRealInfoMgr::get_capacity()
{
return (int64_t)queue_.get_capacity();
}
ObSqlPlanItem::ObSqlPlanItem()
{
reset();
@ -263,12 +24,6 @@ ObSqlPlanItem::~ObSqlPlanItem()
void ObSqlPlanItem::reset()
{
plan_id_ = 0;
db_id_ = 0;
sql_id_ = NULL;
sql_id_len_ = 0;
plan_hash_ = 0;
gmt_create_ = 0;
operation_ = NULL;
operation_len_ = 0;
options_ = NULL;
@ -293,7 +48,9 @@ void ObSqlPlanItem::reset()
is_last_child_ = false;
search_columns_ = 0;
cost_ = 0;
real_cost_ = 0;
cardinality_ = 0;
real_cardinality_ = 0;
bytes_ = 0;
rowset_ = 1;
other_tag_ = NULL;
@ -331,8 +88,7 @@ void ObSqlPlanItem::reset()
int64_t ObSqlPlanItem::get_extra_size() const
{
return sql_id_len_ +
operation_len_ +
return operation_len_ +
options_len_ +
object_node_len_ +
object_owner_len_ +
@ -355,107 +111,127 @@ int64_t ObSqlPlanItem::get_extra_size() const
other_xml_len_;
}
ObSqlPlanItemRecord::ObSqlPlanItemRecord()
:allocator_(NULL)
ObLogicalPlanHead::ObLogicalPlanHead()
{
reset();
}
ObLogicalPlanHead::~ObLogicalPlanHead()
{
}
ObSqlPlanItemRecord::~ObSqlPlanItemRecord()
void ObLogicalPlanHead::reset()
{
destroy();
count_ = 0;
plan_item_pos_ = NULL;
}
void ObSqlPlanItemRecord::destroy()
ObLogicalPlanHead::PlanItemPos::PlanItemPos()
{
if (NULL != allocator_) {
allocator_->free(this);
}
reset();
}
ObPlanItemMgr::ObPlanItemMgr(ObConcurrentFIFOAllocator *allocator)
:allocator_(allocator),
queue_(),
plan_id_increment_(0),
destroyed_(false),
inited_(false)
ObLogicalPlanHead::PlanItemPos::~PlanItemPos()
{
}
ObPlanItemMgr::~ObPlanItemMgr()
void ObLogicalPlanHead::PlanItemPos::reset()
{
if (inited_) {
destroy();
}
offset_ = 0;
length_ = 0;
}
int ObPlanItemMgr::init(uint64_t tenant_id,
const int64_t queue_size)
ObLogicalPlanRawData::ObLogicalPlanRawData()
{
reset();
}
ObLogicalPlanRawData::~ObLogicalPlanRawData()
{
}
void ObLogicalPlanRawData::reset()
{
logical_plan_ = NULL;
logical_plan_len_ = 0;
uncompress_len_ = 0;
}
bool ObLogicalPlanRawData::is_valid() const
{
return NULL != logical_plan_;
}
int ObLogicalPlanRawData::compress_logical_plan(ObIAllocator &allocator,
ObIArray<ObSqlPlanItem*> &plan_items)
{
int ret = OB_SUCCESS;
if (inited_) {
ret = OB_INIT_TWICE;
} else if (OB_FAIL(queue_.init(ObModIds::OB_SQL_PLAN,
queue_size,
tenant_id))) {
SERVER_LOG(WARN, "Failed to init ObMySQLRequestQueue", K(ret));
} else {
inited_ = true;
destroyed_ = false;
}
if ((OB_FAIL(ret)) && (!inited_)) {
destroy();
}
return ret;
}
void ObPlanItemMgr::destroy()
{
if (!destroyed_) {
clear_queue();
queue_.destroy();
inited_ = false;
destroyed_ = true;
}
}
int ObPlanItemMgr::handle_plan_item(const ObSqlPlanItem &plan_item)
{
int ret = OB_SUCCESS;
ObSqlPlanItemRecord *record = NULL;
if (!inited_) {
ret = OB_NOT_INIT;
} else {
char *buf = NULL;
//alloc mem from allocator
int64_t pos = sizeof(ObSqlPlanItemRecord);
int64_t total_size = sizeof(ObSqlPlanItemRecord) +
plan_item.get_extra_size();
if (NULL == (buf = (char*)alloc(total_size))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
if (REACH_TIME_INTERVAL(100 * 1000)) {
SERVER_LOG(WARN, "alloc mem failed", K(total_size), K(ret));
}
//step 1: serialize logical plan
char *buf = NULL;
ObLogicalPlanHead *head = NULL;
int64_t head_size = sizeof(ObLogicalPlanHead) +
plan_items.count() * sizeof(ObLogicalPlanHead::PlanItemPos);
int64_t total_size = 0;
int64_t buf_pos = head_size;
total_size += head_size;
//calculate logical plan length
for (int64_t i = 0; OB_SUCC(ret) && i < plan_items.count(); ++i) {
if (OB_ISNULL(plan_items.at(i))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpect null plan item", K(ret));
} else {
record = new(buf)ObSqlPlanItemRecord();
record->allocator_ = allocator_;
record->data_ = plan_item;
total_size += sizeof(ObSqlPlanItem) + plan_items.at(i)->get_extra_size();
}
}
//alloc memory
if (OB_FAIL(ret)) {
//do nothing
} else if (NULL == (buf = (char*)allocator.alloc(total_size))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
if (REACH_TIME_INTERVAL(100 * 1000)) {
LOG_WARN("alloc mem failed", K(total_size), K(ret));
}
} else {
//init operator count
head = new(buf)ObLogicalPlanHead();
head->count_ = plan_items.count();
head->plan_item_pos_ = reinterpret_cast<ObLogicalPlanHead::PlanItemPos*>(
buf + sizeof(ObLogicalPlanHead));
}
//serialize each operator
for (int64_t i = 0; OB_SUCC(ret) && i < plan_items.count(); ++i) {
ObSqlPlanItem* plan_item = plan_items.at(i);
if (OB_ISNULL(plan_item)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpect null plan item", K(ret));
} else {
//init operator map info
ObLogicalPlanHead::PlanItemPos *plan_item_pos = head->plan_item_pos_ + i;
plan_item_pos->offset_ = buf_pos;
plan_item_pos->length_ = sizeof(ObSqlPlanItem) + plan_item->get_extra_size();
//init operator basic info
ObSqlPlanItem *new_plan_item = new(buf+buf_pos)ObSqlPlanItem();
*new_plan_item = *plan_item;
buf_pos += sizeof(ObSqlPlanItem);
#define DEEP_COPY_DATA(value) \
do { \
if (pos + plan_item.value##len_ > total_size) { \
if (OB_FAIL(ret)) { \
} else if (buf_pos + plan_item->value##len_ > total_size) { \
ret = OB_ERR_UNEXPECTED; \
LOG_WARN("unexpect record size", K(pos), K(plan_item.value##len_), \
K(total_size), K(ret)); \
} else if ((plan_item.value##len_ > 0) && (NULL != plan_item.value)) { \
MEMCPY(buf + pos, plan_item.value, plan_item.value##len_); \
record->data_.value = buf + pos; \
pos += plan_item.value##len_; \
LOG_WARN("unexpect record size", K(buf_pos), K(plan_item->value##len_), \
K(total_size), K(ret)); \
} else if ((plan_item->value##len_ > 0) && (NULL != plan_item->value)) { \
MEMCPY(buf + buf_pos, plan_item->value, plan_item->value##len_); \
new_plan_item->value = reinterpret_cast<char*>(buf_pos); \
buf_pos += plan_item->value##len_; \
} else { \
record->data_.value = buf + pos; \
new_plan_item->value = reinterpret_cast<char*>(buf_pos); \
} \
} while(0);
DEEP_COPY_DATA(sql_id_);
//copy buffer data and convert ptr to offset
DEEP_COPY_DATA(operation_);
DEEP_COPY_DATA(options_);
DEEP_COPY_DATA(object_node_);
@ -478,199 +254,145 @@ int ObPlanItemMgr::handle_plan_item(const ObSqlPlanItem &plan_item)
DEEP_COPY_DATA(remarks_);
DEEP_COPY_DATA(other_xml_);
}
//push into queue
if (OB_SUCC(ret)) {
int64_t req_id = 0;
if (OB_FAIL(queue_.push(record, req_id))) {
if (REACH_TIME_INTERVAL(2 * 1000 * 1000)) {
SERVER_LOG(WARN, "push into queue failed", K(ret));
}
//step 2: compress data
common::ObCompressorType compressor_type = LZ4_COMPRESSOR;
common::ObCompressor *compressor = NULL;
char *compress_buf = NULL;
int64_t compress_size = total_size * 2;
if (OB_FAIL(ret)) {
//do nothing
} else if (OB_FAIL(common::ObCompressorPool::get_instance().get_compressor(compressor_type,
compressor))) {
LOG_WARN("fail to get compressor", K(compressor_type), K(ret));
} else if (OB_ISNULL(compressor)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpect null compressor", K(ret));
} else if (NULL == (compress_buf = (char*)allocator.alloc(compress_size))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
if (REACH_TIME_INTERVAL(100 * 1000)) {
LOG_WARN("alloc mem failed", K(compress_size), K(ret));
}
} else if (OB_FAIL(compressor->compress(buf,
total_size,
compress_buf,
compress_size,
compress_size))) {
LOG_WARN("failed to compress data", K(ret));
} else if (compress_size >= total_size) {
//will not use compress buffer
logical_plan_ = buf;
logical_plan_len_ = total_size;
uncompress_len_ = -1;
allocator.free(compress_buf);
compress_buf = NULL;
} else {
//use compress buffer
logical_plan_ = compress_buf;
logical_plan_len_ = compress_size;
uncompress_len_ = total_size;
allocator.free(buf);
buf = NULL;
}
return ret;
}
int ObLogicalPlanRawData::uncompress_logical_plan(ObIAllocator &allocator,
ObIArray<ObSqlPlanItem*> &plan_items)
{
int ret = OB_SUCCESS;
//step 1: uncompress data
common::ObCompressorType compressor_type = LZ4_COMPRESSOR;
common::ObCompressor *compressor = NULL;
char *uncompress_buf = NULL;
int64_t uncompress_size = uncompress_len_;
if (NULL == logical_plan_ || logical_plan_len_ <= 0) {
//do nothing
} else if (uncompress_len_ < 0) {
//do not need decompress
uncompress_buf = logical_plan_;
uncompress_size = logical_plan_len_;
} else if (OB_FAIL(common::ObCompressorPool::get_instance().get_compressor(compressor_type,
compressor))) {
LOG_WARN("fail to get compressor", K(compressor_type), K(ret));
} else if (OB_ISNULL(compressor)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpect null compressor", K(ret));
} else if (NULL == (uncompress_buf = (char*)allocator.alloc(uncompress_size))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
if (REACH_TIME_INTERVAL(100 * 1000)) {
LOG_WARN("alloc mem failed", K(uncompress_size), K(ret));
}
} else if (OB_FAIL(compressor->decompress(logical_plan_,
logical_plan_len_,
uncompress_buf,
uncompress_size,
uncompress_size))) {
LOG_WARN("failed to compress data", K(ret));
}
if (OB_FAIL(ret) || NULL == uncompress_buf) {
//do nothing
} else {
//step 2: deserialize logical plan
//get logical plan head info
ObLogicalPlanHead *head = NULL;
head = reinterpret_cast<ObLogicalPlanHead*>(uncompress_buf);
head->plan_item_pos_ = reinterpret_cast<ObLogicalPlanHead::PlanItemPos*>(
uncompress_buf + sizeof(ObLogicalPlanHead));
//deserialize each operator info
for (int64_t i = 0; OB_SUCC(ret) && i < head->count_; ++i) {
//get operator map info
ObLogicalPlanHead::PlanItemPos *plan_item_pos = head->plan_item_pos_ + i;
if (plan_item_pos->offset_ < 0 ||
plan_item_pos->offset_ + plan_item_pos->length_ > uncompress_size) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("broken compressed data", K(ret));
} else {
ObSqlPlanItem *plan_item = reinterpret_cast<ObSqlPlanItem*>(
uncompress_buf+plan_item_pos->offset_);
#define CONVERT_OFFSET_TO_PTR(value) \
do { \
int64_t offset = reinterpret_cast<int64_t>(plan_item->value); \
if (OB_FAIL(ret)) { \
} else if (offset + plan_item->value##len_ > uncompress_size) { \
ret = OB_ERR_UNEXPECTED; \
LOG_WARN("unexpect record size", K(offset), K(plan_item->value##len_), \
K(uncompress_size), K(ret)); \
} else { \
plan_item->value = uncompress_buf + offset; \
} \
} while(0);
//convert offset to ptr
CONVERT_OFFSET_TO_PTR(operation_);
CONVERT_OFFSET_TO_PTR(options_);
CONVERT_OFFSET_TO_PTR(object_node_);
CONVERT_OFFSET_TO_PTR(object_owner_);
CONVERT_OFFSET_TO_PTR(object_name_);
CONVERT_OFFSET_TO_PTR(object_alias_);
CONVERT_OFFSET_TO_PTR(object_type_);
CONVERT_OFFSET_TO_PTR(optimizer_);
CONVERT_OFFSET_TO_PTR(other_tag_);
CONVERT_OFFSET_TO_PTR(partition_start_);
CONVERT_OFFSET_TO_PTR(partition_stop_);
CONVERT_OFFSET_TO_PTR(other_);
CONVERT_OFFSET_TO_PTR(distribution_);
CONVERT_OFFSET_TO_PTR(access_predicates_);
CONVERT_OFFSET_TO_PTR(filter_predicates_);
CONVERT_OFFSET_TO_PTR(startup_predicates_);
CONVERT_OFFSET_TO_PTR(projection_);
CONVERT_OFFSET_TO_PTR(special_predicates_);
CONVERT_OFFSET_TO_PTR(qblock_name_);
CONVERT_OFFSET_TO_PTR(remarks_);
CONVERT_OFFSET_TO_PTR(other_xml_);
if (OB_SUCC(ret) &&
OB_FAIL(plan_items.push_back(plan_item))) {
LOG_WARN("failed to push back plan item", K(ret));
}
free(record);
record = NULL;
}
}
}
return ret;
}
int ObPlanItemMgr::get_plan(int64_t plan_id,
ObIArray<ObSqlPlanItem*> &plan)
{
int ret = OB_SUCCESS;
plan.reuse();
int64_t start_idx = get_start_idx();
int64_t end_idx = get_end_idx();
void *rec = NULL;
Ref ref;
for (int64_t cur_id=start_idx;
(OB_ENTRY_NOT_EXIST == ret || OB_SUCCESS == ret) && cur_id < end_idx;
++cur_id) {
ref.reset();
ret = get(cur_id, rec, &ref);
if (OB_SUCC(ret) && NULL != rec) {
ObSqlPlanItemRecord *record = static_cast<ObSqlPlanItemRecord*>(rec);
if (record->data_.plan_id_ != plan_id) {
//do nothing
} else if (OB_FAIL(plan.push_back(&record->data_))) {
LOG_WARN("failed to push back plan item", K(ret));
}
}
if (ref.idx_ != -1) {
revert(&ref);
}
}
return ret;
}
int ObPlanItemMgr::get_plan(const ObString &sql_id,
int64_t plan_id,
ObIArray<ObSqlPlanItem*> &plan)
{
int ret = OB_SUCCESS;
plan.reuse();
int64_t start_idx = get_start_idx();
int64_t end_idx = get_end_idx();
void *rec = NULL;
Ref ref;
for (int64_t cur_id=start_idx;
(OB_ENTRY_NOT_EXIST == ret || OB_SUCCESS == ret) && cur_id < end_idx;
++cur_id) {
ref.reset();
ret = get(cur_id, rec, &ref);
if (OB_SUCC(ret) && NULL != rec) {
ObSqlPlanItemRecord *record = static_cast<ObSqlPlanItemRecord*>(rec);
if (record->data_.plan_id_ != plan_id ||
sql_id.case_compare(ObString(record->data_.sql_id_len_,record->data_.sql_id_)) != 0) {
//do nothing
} else if (OB_FAIL(plan.push_back(&record->data_))) {
LOG_WARN("failed to push back plan item", K(ret));
}
}
if (ref.idx_ != -1) {
revert(&ref);
}
}
return ret;
}
int ObPlanItemMgr::get_plan_by_hash(const ObString &sql_id,
uint64_t plan_hash,
ObIArray<ObSqlPlanItem*> &plan)
{
int ret = OB_SUCCESS;
plan.reuse();
int64_t start_idx = get_start_idx();
int64_t end_idx = get_end_idx();
void *rec = NULL;
Ref ref;
for (int64_t cur_id=start_idx;
(OB_ENTRY_NOT_EXIST == ret || OB_SUCCESS == ret) && cur_id < end_idx;
++cur_id) {
ref.reset();
ret = get(cur_id, rec, &ref);
if (OB_SUCC(ret) && NULL != rec) {
ObSqlPlanItemRecord *record = static_cast<ObSqlPlanItemRecord*>(rec);
if (record->data_.plan_hash_ != plan_hash ||
sql_id.case_compare(ObString(record->data_.sql_id_len_,record->data_.sql_id_)) != 0) {
//do nothing
} else if (OB_FAIL(plan.push_back(&record->data_))) {
LOG_WARN("failed to push back plan item", K(ret));
}
}
if (ref.idx_ != -1) {
revert(&ref);
}
}
return ret;
}
ObConcurrentFIFOAllocator *ObPlanItemMgr::get_allocator()
{
return allocator_;
}
void* ObPlanItemMgr::alloc(const int64_t size)
{
void *ret = NULL;
if (allocator_ != NULL) {
ret = allocator_->alloc(size);
}
return ret;
}
void ObPlanItemMgr::free(void *ptr)
{
if (allocator_ != NULL) {
allocator_->free(ptr);
ptr = NULL;
}
}
int ObPlanItemMgr::get(const int64_t idx, void *&record, Ref* ref)
{
int ret = OB_SUCCESS;
if (NULL == (record = queue_.get(idx, ref))) {
ret = OB_ENTRY_NOT_EXIST;
}
return ret;
}
int ObPlanItemMgr::revert(Ref* ref)
{
queue_.revert(ref);
return OB_SUCCESS;
}
int64_t ObPlanItemMgr::release_old(int64_t limit)
{
void* req = NULL;
int64_t count = 0;
while(count < limit && NULL != (req = queue_.pop())) {
free(req);
++count;
}
return count;
}
void ObPlanItemMgr::clear_queue()
{
(void)release_old(INT64_MAX);
}
bool ObPlanItemMgr::is_valid() const
{
return inited_ && !destroyed_;
}
int64_t ObPlanItemMgr::get_start_idx() const
{
return (int64_t)queue_.get_pop_idx();
}
int64_t ObPlanItemMgr::get_end_idx() const
{
return (int64_t)queue_.get_push_idx();
}
int64_t ObPlanItemMgr::get_size_used()
{
return (int64_t)queue_.get_size();
}
int64_t ObPlanItemMgr::get_capacity()
{
return (int64_t)queue_.get_capacity();
}
int64_t ObPlanItemMgr::get_next_plan_id()
{
return ++plan_id_increment_;
}
int64_t ObPlanItemMgr::get_last_plan_id()
{
return plan_id_increment_;
}
} // end of namespace sql
} // end of namespace oceanbase

View File

@ -1,100 +1,17 @@
// Copyright 2010-2016 Alibaba Inc. All Rights Reserved.
// Author:
// zhenling.zzg
// this file defines interface of plan real info manager
// this file defines interface of plan info manager
#ifndef SRC_OBSERVER_PLAN_INFO_MGR_H_
#define SRC_OBSERVER_PLAN_INFO_MGR_H_
#include "lib/allocator/ob_concurrent_fifo_allocator.h"
#include "observer/mysql/ob_ra_queue.h"
#include "lib/task/ob_timer.h"
#include "lib/allocator/ob_allocator.h"
#include "lib/container/ob_iarray.h"
namespace oceanbase
{
namespace common
{
class ObConcurrentFIFOAllocator;
}
namespace sql
{
class ObMonitorNode;
struct ObPlanRealInfo {
ObPlanRealInfo();
virtual ~ObPlanRealInfo();
void reset();
int64_t get_extra_size() const;
int64_t plan_id_;
char* sql_id_;
int64_t sql_id_len_;
uint64_t plan_hash_;
int id_;
int64_t real_cost_;
int64_t real_cardinality_;
int64_t cpu_cost_;
int64_t io_cost_;
TO_STRING_KV(
K_(plan_id),
K_(real_cost),
K_(real_cardinality),
K_(cpu_cost),
K_(io_cost)
);
};
struct ObPlanRealInfoRecord
{
ObPlanRealInfoRecord();
virtual ~ObPlanRealInfoRecord();
virtual void destroy();
TO_STRING_KV(
K_(data)
);
ObPlanRealInfo data_;
common::ObConcurrentFIFOAllocator *allocator_;
};
class ObPlanRealInfoMgr
{
public:
typedef common::ObRaQueue::Ref Ref;
public:
ObPlanRealInfoMgr(common::ObConcurrentFIFOAllocator *allocator);
virtual ~ObPlanRealInfoMgr();
int init(uint64_t tenant_id,
const int64_t queue_size);
void destroy();
int handle_plan_info(int64_t id,
const ObString& sql_id,
uint64_t plan_id,
uint64_t plan_hash,
const ObMonitorNode &plan_info);
common::ObConcurrentFIFOAllocator *get_allocator();
void* alloc(const int64_t size);
void free(void *ptr);
int get(const int64_t idx, void *&record, Ref* ref);
int revert(Ref* ref);
int64_t release_old(int64_t limit);
void clear_queue();
int64_t get_start_idx() const;
int64_t get_end_idx() const;
int64_t get_size_used();
int64_t get_capacity();
bool is_valid() const;
private:
DISALLOW_COPY_AND_ASSIGN(ObPlanRealInfoMgr);
private:
common::ObConcurrentFIFOAllocator *allocator_;
common::ObRaQueue queue_;
bool destroyed_;
bool inited_;
};
struct ObSqlPlanItem {
ObSqlPlanItem();
@ -102,12 +19,6 @@ struct ObSqlPlanItem {
void reset();
int64_t get_extra_size() const;
int64_t plan_id_;
char* sql_id_;
int64_t sql_id_len_;
int64_t db_id_;
uint64_t plan_hash_;
int64_t gmt_create_;
char* operation_;
int64_t operation_len_;
char* options_;
@ -132,7 +43,9 @@ struct ObSqlPlanItem {
int search_columns_;
bool is_last_child_;
int64_t cost_;
int64_t real_cost_;
int64_t cardinality_;
int64_t real_cardinality_;
int64_t bytes_;
int64_t rowset_;
char* other_tag_;
@ -168,69 +81,40 @@ struct ObSqlPlanItem {
int64_t other_xml_len_;
TO_STRING_KV(
K_(plan_id)
K_(id)
);
};
struct ObSqlPlanItemRecord
struct ObLogicalPlanHead
{
ObSqlPlanItemRecord();
virtual ~ObSqlPlanItemRecord();
virtual void destroy();
TO_STRING_KV(
K_(data)
);
ObSqlPlanItem data_;
common::ObConcurrentFIFOAllocator *allocator_;
ObLogicalPlanHead();
virtual ~ObLogicalPlanHead();
void reset();
struct PlanItemPos
{
PlanItemPos();
virtual ~PlanItemPos();
void reset();
int64_t offset_;
int64_t length_;
};
int64_t count_; //operator count
PlanItemPos *plan_item_pos_; //operator data position in buffer
};
class ObPlanItemMgr
struct ObLogicalPlanRawData
{
public:
typedef common::ObRaQueue::Ref Ref;
public:
ObPlanItemMgr(common::ObConcurrentFIFOAllocator *allocator);
virtual ~ObPlanItemMgr();
int init(uint64_t tenant_id,
const int64_t queue_size);
void destroy();
int handle_plan_item(const ObSqlPlanItem &plan_item);
int get_plan(int64_t plan_id,
ObIArray<ObSqlPlanItem*> &plan);
int get_plan(const ObString &sql_id,
int64_t plan_id,
ObIArray<ObSqlPlanItem*> &plan);
int get_plan_by_hash(const ObString &sql_id,
uint64_t plan_hash,
ObIArray<ObSqlPlanItem*> &plan);
common::ObConcurrentFIFOAllocator *get_allocator();
void* alloc(const int64_t size);
void free(void *ptr);
int get(const int64_t idx, void *&record, Ref* ref);
int revert(Ref* ref);
int64_t release_old(int64_t limit);
void clear_queue();
int64_t get_start_idx() const;
int64_t get_end_idx() const;
int64_t get_size_used();
int64_t get_capacity();
int64_t get_next_plan_id();
int64_t get_last_plan_id();
ObLogicalPlanRawData();
virtual ~ObLogicalPlanRawData();
void reset();
bool is_valid() const;
TO_STRING_KV(
K_(plan_id_increment)
);
private:
DISALLOW_COPY_AND_ASSIGN(ObPlanItemMgr);
private:
common::ObConcurrentFIFOAllocator *allocator_;
common::ObRaQueue queue_;
int64_t plan_id_increment_;
bool destroyed_;
bool inited_;
int compress_logical_plan(ObIAllocator &allocator, ObIArray<ObSqlPlanItem*> &plan_items);
int uncompress_logical_plan(ObIAllocator &allocator, ObIArray<ObSqlPlanItem*> &plan_items);
char *logical_plan_; //serialize and compress data
int64_t logical_plan_len_; //compress data length
//uncompress data length, used for uncompress function
//if values is -1, logical plan not be compressed
int64_t uncompress_len_;
};
} // end of namespace sql

File diff suppressed because it is too large Load Diff

View File

@ -15,6 +15,8 @@ namespace sql
class ObLogPlan;
class ObSQLSessionInfo;
class ObLogicalOperator;
class ObPhysicalPlan;
class ObExecContext;
struct ObSqlPlanItem;
struct ObQueryCtx;
struct ObExplainDisplayOpt;
@ -116,13 +118,13 @@ private:
public:
ObSqlPlan(common::ObIAllocator &allocator);
virtual ~ObSqlPlan();
int store_sql_plan(ObLogPlan* plan,
int64_t plan_id,
uint64_t plan_hash,
ObString &sql_id);
int store_sql_plan(ObLogPlan* log_plan, ObPhysicalPlan* phy_plan);
int store_sql_plan_for_explain(ObLogPlan* plan,
int store_sql_plan_for_explain(ObExecContext *ctx,
ObLogPlan* plan,
ExplainType type,
const ObString& plan_table,
const ObString& statement_id,
const ObExplainDisplayOpt& option,
ObIArray<common::ObString> &plan_strs);
@ -131,25 +133,11 @@ public:
const ObExplainDisplayOpt& option,
ObIArray<common::ObString> &plan_strs);
int get_sql_plan(const ObString &sql_id,
int64_t plan_id,
ExplainType type,
const ObExplainDisplayOpt& option,
ObIArray<ObPlanRealInfo> &plan_infos,
PlanText &plan_text);
int get_sql_plan_by_hash(const ObString &sql_id,
uint64_t plan_hash,
ExplainType type,
const ObExplainDisplayOpt& option,
ObIArray<ObPlanRealInfo> &plan_infos,
PlanText &plan_text);
int get_last_explain_plan(ExplainType type,
const ObExplainDisplayOpt& option,
PlanText &plan_text);
void set_session_info(ObSQLSessionInfo *session_info);
int format_sql_plan(ObIArray<ObSqlPlanItem*> &sql_plan_infos,
ExplainType type,
const ObExplainDisplayOpt& option,
PlanText &plan_text);
static int get_plan_outline_info_one_line(PlanText &plan_text,
ObLogPlan* plan);
@ -161,13 +149,14 @@ public:
ObIArray<common::ObString> &plan_strs);
private:
int set_plan_id_for_explain(ObIArray<ObSqlPlanItem*> &sql_plan_infos);
int inner_store_sql_plan_for_explain(ObExecContext *ctx,
const ObString& plan_table,
const ObString& statement_id,
ObIArray<ObSqlPlanItem*> &sql_plan_infos);
int set_plan_id_for_excute(ObIArray<ObSqlPlanItem*> &sql_plan_infos,
int64_t plan_id,
uint64_t plan_hash,
ObString &sql_id,
PlanText &plan_text);
int escape_quotes(ObSqlPlanItem &plan_item);
int inner_escape_quotes(char* &ptr, int64_t &length);
int get_sql_plan_infos(PlanText &plan_text,
ObLogPlan* plan,
@ -224,21 +213,12 @@ private:
int64_t &pos,
const ObExprConstraint &info);
int inner_store_sql_plan(ObIArray<ObSqlPlanItem*> &sql_plan_infos, bool for_explain);
int format_sql_plan(ObIArray<ObSqlPlanItem*> &sql_plan_infos,
ExplainType type,
const ObExplainDisplayOpt& option,
ObIArray<ObPlanRealInfo> &plan_infos,
PlanText &plan_text);
int get_plan_table_formatter(ObIArray<ObSqlPlanItem*> &sql_plan_infos,
const ObExplainDisplayOpt& option,
PlanFormatHelper &format_helper);
int get_real_plan_table_formatter(ObIArray<ObSqlPlanItem*> &sql_plan_infos,
const ObExplainDisplayOpt& option,
ObIArray<ObPlanRealInfo> &plan_infos,
PlanFormatHelper &format_helper);
int get_operator_prefix(ObIArray<ObSqlPlanItem*> &sql_plan_infos,
@ -255,7 +235,6 @@ private:
int format_real_plan_table(ObIArray<ObSqlPlanItem*> &sql_plan_infos,
const ObExplainDisplayOpt& option,
ObIArray<ObPlanRealInfo> &plan_infos,
PlanText &plan_text);
int format_plan_output(ObIArray<ObSqlPlanItem*> &sql_plan_infos, PlanText &plan_text);
@ -276,17 +255,26 @@ private:
int64_t info_idx,
json::Value *&ret_val);
bool is_exchange_out_operator(ObSqlPlanItem *item);
int init_buffer(PlanText &plan_text);
void destroy_buffer(PlanText &plan_text);
int refine_buffer(PlanText &plan_text);
public:
static int format_one_output_expr(char *buf,
int64_t buf_len,
int64_t &pos,
int &line_begin_pos,
const char* expr_info,
int expr_len);
DISALLOW_COPY_AND_ASSIGN(ObSqlPlan);
private:
common::ObIAllocator &allocator_;
ObSQLSessionInfo *session_info_;
};
} // end of namespace sql

View File

@ -1,536 +0,0 @@
// Copyright 2010-2016 Alibaba Inc. All Rights Reserved.
// Author:
// zhenling.zzg
// this file defines implementation of sql plan manager
#define USING_LOG_PREFIX SQL
#include "lib/allocator/ob_concurrent_fifo_allocator.h"
#include "lib/compress/zlib/zlib_src/zlib.h"
#include "sql/session/ob_sql_session_info.h"
#include "common/object/ob_object.h"
#include "lib/thread/thread_mgr.h"
#include "ob_sql_plan_manager.h"
#include "lib/ob_running_mode.h"
#include "util/easy_time.h"
#include "lib/rc/ob_rc.h"
namespace oceanbase
{
namespace sql
{
ObSqlPlanMgr::ObSqlPlanMgr()
:allocator_(),
task_(),
plan_table_mgrs_(),
plan_item_mgr_(nullptr),
plan_real_info_mgr_(nullptr),
destroyed_(false),
inited_(false),
tenant_id_(OB_INVALID_TENANT_ID),
tg_id_(-1)
{
}
ObSqlPlanMgr::~ObSqlPlanMgr()
{
if (inited_) {
destroy();
}
}
int ObSqlPlanMgr::init(uint64_t tenant_id)
{
int ret = OB_SUCCESS;
void *buf = NULL;
tenant_id_ = tenant_id;
int64_t queue_size = lib::is_mini_mode() ?
MINI_MODE_MAX_QUEUE_SIZE : MAX_QUEUE_SIZE;
if (inited_) {
ret = OB_INIT_TWICE;
} else if (OB_FAIL(allocator_.init(SQL_PLAN_PAGE_SIZE,
ObModIds::OB_SQL_PLAN,
tenant_id,
INT64_MAX))) {
SERVER_LOG(WARN, "failed to init allocator", K(ret));
} else if (OB_ISNULL(buf=alloc(sizeof(ObPlanItemMgr)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc memory for ObPlanItemMgr", K(ret));
} else if (OB_FALSE_IT(plan_item_mgr_=new(buf)ObPlanItemMgr(get_allocator()))) {
} else if (OB_FAIL(plan_item_mgr_->init(tenant_id, queue_size))) {
LOG_WARN("failed to init plan item manager", K(ret));
} else if (OB_ISNULL(buf=alloc(sizeof(ObPlanRealInfoMgr)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc memory for ObPlanRealInfoMgr", K(ret));
} else if (OB_FALSE_IT(plan_real_info_mgr_=new(buf)ObPlanRealInfoMgr(get_allocator()))) {
} else if (OB_FAIL(plan_real_info_mgr_->init(tenant_id, queue_size))) {
LOG_WARN("failed to init plan real info manager", K(ret));
// } else if (OB_FAIL(TG_CREATE_TENANT(lib::TGDefIDs::ReqMemEvict,
// tg_id_))) {
// SERVER_LOG(WARN, "create failed", K(ret));
// } else if (OB_FAIL(TG_START(tg_id_))) {
// SERVER_LOG(WARN, "init timer fail", K(ret));
// } else if (OB_FAIL(task_.init(this))) {
// SERVER_LOG(WARN, "fail to init sql plan timer task", K(ret));
// } else if (OB_FAIL(TG_SCHEDULE(tg_id_, task_, EVICT_INTERVAL, true))) {
// SERVER_LOG(WARN, "start eliminate task failed", K(ret));
} else {
inited_ = true;
destroyed_ = false;
}
if ((OB_FAIL(ret)) && (!inited_)) {
destroy();
}
return ret;
}
void ObSqlPlanMgr::destroy()
{
if (!destroyed_) {
TG_DESTROY(tg_id_);
if (plan_real_info_mgr_ != NULL) {
plan_real_info_mgr_->destroy();
free(plan_real_info_mgr_);
}
if (plan_item_mgr_ != NULL) {
plan_item_mgr_->destroy();
free(plan_item_mgr_);
}
for (int i = 0; i < plan_table_mgrs_.count(); ++i) {
if (plan_table_mgrs_.at(i) != NULL) {
plan_table_mgrs_.at(i)->destroy();
free(plan_table_mgrs_.at(i));
}
}
allocator_.destroy();
inited_ = false;
destroyed_ = true;
}
}
ObConcurrentFIFOAllocator *ObSqlPlanMgr::get_allocator()
{
return &allocator_;
}
void* ObSqlPlanMgr::alloc(const int64_t size)
{
void * ret = allocator_.alloc(size);
return ret;
}
void ObSqlPlanMgr::free(void *ptr)
{
allocator_.free(ptr);
ptr = NULL;
}
uint64_t ObSqlPlanMgr::get_tenant_id() const
{
return tenant_id_;
}
bool ObSqlPlanMgr::is_valid() const
{
return inited_ && !destroyed_;
}
int ObSqlPlanMgr::get_mem_limit(int64_t &mem_limit)
{
int ret = OB_SUCCESS;
int64_t tenant_mem_limit = lib::get_tenant_memory_limit(tenant_id_);
// default mem limit
mem_limit = static_cast<int64_t>(SQL_PLAN_MEM_FACTOR * tenant_mem_limit);
// get mem_percentage from session info
// ObArenaAllocator alloc;
// ObObj obj_val;
// int64_t mem_pct = 0;
// if (OB_FAIL(ObBasicSessionInfo::get_global_sys_variable(tenant_id_,
// alloc,
// ObDataTypeCastParams(),
// ObString(share::OB_SV_SQL_PLAN_MEMORY_PERCENTAGE),
// obj_val))) {
// LOG_WARN("failed to get global sys variable", K(tenant_id_), K(ret));
// } else if (OB_FAIL(obj_val.get_int(mem_pct))) {
// LOG_WARN("failed to get int", K(ret), K(obj_val));
// } else if (mem_pct < 0 || mem_pct > 100) {
// ret = OB_INVALID_ARGUMENT;
// LOG_WARN("invalid value of sql plan mem percentage", K(ret), K(mem_pct));
// } else {
// mem_limit = static_cast<int64_t>(tenant_mem_limit * mem_pct / 100.0);
// LOG_DEBUG("tenant sql plan memory limit", K_(tenant_id),
// K(tenant_mem_limit), K(mem_pct), K(mem_limit));
// }
return ret;
}
int ObSqlPlanMgr::init_plan_table_manager(ObPlanItemMgr* &plan_table_mgr)
{
int ret = OB_SUCCESS;
void *buf = NULL;
plan_table_mgr = NULL;
int64_t evict_high_level = 0;
int64_t evict_low_level = 0;
if (1 == 1) {
//do nothing
} else if (OB_FAIL(task_.calc_evict_mem_level(evict_low_level, evict_high_level))) {
LOG_WARN("fail to get sql plan evict memory level", K(ret));
} else if (evict_high_level <= allocator_.allocated()) {
//do nothing
} else if (OB_ISNULL(buf=alloc(sizeof(ObPlanItemMgr)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc memory for ObPlanRealInfoMgr", K(ret));
} else if (OB_FALSE_IT(plan_table_mgr=new(buf)ObPlanItemMgr(get_allocator()))) {
} else if (OB_FAIL(plan_table_mgr->init(tenant_id_, PLAN_TABLE_QUEUE_SIZE))) {
LOG_WARN("failed to init plan real info manager", K(ret));
} else {
ObLockGuard<ObSpinLock> guard(plan_table_mgr_lock_);
if (OB_FAIL(plan_table_mgrs_.push_back(plan_table_mgr))) {
LOG_WARN("failed to push back plan table", K(ret));
}
}
if (OB_FAIL(ret) && plan_table_mgr != NULL) {
// cleanup
free(plan_table_mgr);
plan_table_mgr = NULL;
}
return ret;
}
int ObSqlPlanMgr::destroy_plan_table_manager(ObPlanItemMgr* &plan_table_mgr)
{
int ret = OB_SUCCESS;
if (plan_table_mgr != NULL) {
bool find = false;
int idx = 0;
ObLockGuard<ObSpinLock> guard(plan_table_mgr_lock_);
for (int i = 0; !find && i < plan_table_mgrs_.count(); ++i) {
if (plan_table_mgrs_.at(i) == plan_table_mgr) {
idx = i;
find = true;
}
}
if (find) {
plan_table_mgrs_.remove(idx);
plan_table_mgr->destroy();
free(plan_table_mgr);
plan_table_mgr = NULL;
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("plan table not in this sql plan mgr", K(ret));
}
}
return ret;
}
int ObSqlPlanMgr::mtl_init(ObSqlPlanMgr* &sql_plan_mgr)
{
int ret = OB_SUCCESS;
uint64_t tenant_id = lib::current_resource_owner_id();
sql_plan_mgr = OB_NEW(ObSqlPlanMgr, ObModIds::OB_SQL_PLAN);
if (nullptr == sql_plan_mgr) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc memory for ObSqlPlanMgr", K(ret));
} else if (OB_FAIL(sql_plan_mgr->init(tenant_id))) {
LOG_WARN("failed to init request manager", K(ret));
}
if (OB_FAIL(ret) && sql_plan_mgr != nullptr) {
// cleanup
ob_delete(sql_plan_mgr);
sql_plan_mgr = nullptr;
}
return ret;
}
void ObSqlPlanMgr::mtl_destroy(ObSqlPlanMgr* &sql_plan_mgr)
{
if (sql_plan_mgr != nullptr) {
ob_delete(sql_plan_mgr);
sql_plan_mgr = nullptr;
}
}
ObSqlPlanEliminateTask::ObSqlPlanEliminateTask()
:sql_plan_manager_(NULL),
config_mem_limit_(0)
{
}
ObSqlPlanEliminateTask::~ObSqlPlanEliminateTask()
{
}
int ObSqlPlanEliminateTask::init(const ObSqlPlanMgr *sql_plan_manager)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(sql_plan_manager)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(sql_plan_manager_), K(ret));
} else {
sql_plan_manager_ = const_cast<ObSqlPlanMgr*>(sql_plan_manager);
// can't call ObSqlPlanMgr::get_mem_limit for now, tenant not inited
// set config_mem_limit_ to 64M
config_mem_limit_ = 64 * 1024 * 1024; // 64M
disable_timeout_check();
}
return ret;
}
//mem_limit = tenant_mem_limit * ob_sql_plan_percentage
int ObSqlPlanEliminateTask::check_config_mem_limit(bool &is_change)
{
int ret = OB_SUCCESS;
is_change = false;
const int64_t MINIMUM_LIMIT = 64 * 1024 * 1024; // at lease 64M
const int64_t MAXIMUM_LIMIT = 1024 * 1024 * 1024; // 1G maximum
int64_t mem_limit = config_mem_limit_;
if (OB_ISNULL(sql_plan_manager_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(sql_plan_manager_), K(ret));
} else if (sql_plan_manager_->get_tenant_id() > OB_SYS_TENANT_ID &&
sql_plan_manager_->get_tenant_id() <= OB_MAX_RESERVED_TENANT_ID) {
// 50x租户在没有对应的tenant schema,查询配置一定失败
} else if (OB_FAIL(sql_plan_manager_->get_mem_limit(mem_limit))) {
LOG_WARN("failed to get mem limit", K(ret));
// if memory limit is not retrivable
// overwrite error code, set mem config to default value
// so that total memory use of sql plan can be limited
ret = OB_SUCCESS;
mem_limit = MAXIMUM_LIMIT;
}
if (config_mem_limit_ != mem_limit) {
LOG_TRACE("before change config mem", K(config_mem_limit_));
if (mem_limit < MINIMUM_LIMIT) {
if (lib::is_mini_mode()) {
config_mem_limit_ = mem_limit;
} else {
config_mem_limit_ = MINIMUM_LIMIT;
}
} else {
config_mem_limit_ = mem_limit;
}
is_change = true;
LOG_TRACE("after change config mem", K(config_mem_limit_));
}
return ret;
}
//剩余内存淘汰曲线图,当mem_limit在[64M, 100M]时, 内存剩余20M时淘汰;
// 当mem_limit在[100M, 5G]时, 内存剩余mem_limit*0.2时淘汰;
// 当mem_limit在[5G, +∞]时, 内存剩余1G时淘汰;
//高低水位线内存差曲线图,当mem_limit在[64M, 100M]时, 内存差为:20M;
// 当mem_limit在[100M, 5G]时,内存差:mem_limit*0.2;
// 当mem_limit在[5G, +∞]时, 内存差是:1G,
// ______
// /
// _____/
// 100M 5G
int ObSqlPlanEliminateTask::calc_evict_mem_level(int64_t &low, int64_t &high)
{
int ret = OB_SUCCESS;
const double HIGH_LEVEL_PRECENT = 0.80;
const double LOW_LEVEL_PRECENT = 0.60;
const double HALF_PRECENT = 0.50;
const int64_t BIG_MEMORY_LIMIT = 5368709120; //5G
const int64_t LOW_CONFIG = 10*1024*1024; //10M
if (OB_ISNULL(sql_plan_manager_) || config_mem_limit_ < 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(sql_plan_manager_), K(config_mem_limit_), K(ret));
} else {
if (config_mem_limit_ > BIG_MEMORY_LIMIT) {
// mem_limit > 5G
high = config_mem_limit_ - static_cast<int64_t>(BIG_MEMORY_LIMIT * (1.0 - HIGH_LEVEL_PRECENT));
low = config_mem_limit_ - static_cast<int64_t>(BIG_MEMORY_LIMIT * (1.0 - LOW_LEVEL_PRECENT)) ;
} else if (config_mem_limit_ > LOW_CONFIG) {
//mem_limit between 10M and 5G
high = static_cast<int64_t>(static_cast<double>(config_mem_limit_) * HIGH_LEVEL_PRECENT);
low = static_cast<int64_t>(static_cast<double>(config_mem_limit_) * LOW_LEVEL_PRECENT);
} else {
//mem_limit < 10M
high = static_cast<int64_t>(static_cast<double>(config_mem_limit_) * HALF_PRECENT);
low = 0;
}
}
return ret;
}
void ObSqlPlanEliminateTask::runTimerTask()
{
int ret = OB_SUCCESS;
ObConcurrentFIFOAllocator *allocator = NULL;
int64_t evict_high_level = 0;
int64_t evict_low_level = 0;
bool is_change = false;
int64_t start_time = ObTimeUtility::current_time();
int64_t evict_batch_count = 0;
if (OB_ISNULL(sql_plan_manager_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(sql_plan_manager_), K(ret));
} else if (OB_FAIL(check_config_mem_limit(is_change))) {
LOG_WARN("fail to check mem limit stat", K(ret));
} else if (OB_FAIL(calc_evict_mem_level(evict_low_level, evict_high_level))) {
LOG_WARN("fail to get sql plan evict memory level", K(ret));
} else if (OB_ISNULL(allocator = sql_plan_manager_->get_allocator())) {
ret = OB_NOT_INIT;
LOG_WARN("fail to get sql plan evict memory level", K(ret));
} else if (OB_FAIL(evict_queue_use(evict_batch_count))) {
LOG_WARN("failed to evict queue use", K(ret));
} else if (evict_high_level < allocator->allocated() &&
OB_FAIL(evict_memory_use(evict_low_level,
evict_high_level,
evict_batch_count))) {
LOG_WARN("failed to evict mempry use", K(ret));
} else {
//如果sql_plan_memory_limit改变, 则需要将ObConcurrentFIFOAllocator中total_limit_更新;
if (true == is_change) {
allocator->set_total_limit(config_mem_limit_);
}
int64_t end_time = ObTimeUtility::current_time();
if (evict_batch_count > 0) {
LOG_INFO("sql plan evict task end",
K(sql_plan_manager_->get_tenant_id()),
K(evict_high_level),
K(evict_low_level),
K(evict_batch_count),
"elapse_time", end_time - start_time,
K_(config_mem_limit),
"mem_used", allocator->allocated());
}
}
}
int ObSqlPlanEliminateTask::evict_memory_use(int64_t evict_low_level,
int64_t evict_high_level,
int64_t &evict_batch_count)
{
int ret = OB_SUCCESS;
ObConcurrentFIFOAllocator *allocator = NULL;
if (OB_ISNULL(sql_plan_manager_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(sql_plan_manager_), K(ret));
} else if (OB_ISNULL(allocator = sql_plan_manager_->get_allocator())) {
ret = OB_NOT_INIT;
LOG_WARN("fail to get sql plan evict memory level", K(ret));
} else {
int64_t evict_count = 0;
//evict plan table mgr
{
ObLockGuard<ObSpinLock> guard(sql_plan_manager_->plan_table_mgr_lock_);
for (int i = 0; i < sql_plan_manager_->get_plan_table_mgrs().count(); ++i) {
if (sql_plan_manager_->get_plan_table_mgrs().at(i) != NULL) {
ObPlanItemMgr *plan_table_mgr = sql_plan_manager_->get_plan_table_mgrs().at(i);
LOG_INFO("plan table evict mem start", K(evict_high_level), "mem_used", allocator->allocated());
while (evict_low_level < allocator->allocated()) {
evict_count = plan_table_mgr->release_old(ObSqlPlanMgr::BATCH_RELEASE_COUNT);
evict_batch_count += evict_count;
if (evict_count < ObSqlPlanMgr::BATCH_RELEASE_COUNT) {
LOG_INFO("release old cannot free more memory");
break;
}
}
}
}
}
//evict plan real info mgr
if (sql_plan_manager_->get_plan_real_info_mgr() != NULL) {
ObPlanRealInfoMgr *plan_real_info_mgr = sql_plan_manager_->get_plan_real_info_mgr();
LOG_INFO("plan real info evict mem start", K(evict_high_level), "mem_used", allocator->allocated());
while (evict_low_level < allocator->allocated()) {
evict_count = plan_real_info_mgr->release_old(ObSqlPlanMgr::BATCH_RELEASE_COUNT);
evict_batch_count += evict_count;
if (evict_count < ObSqlPlanMgr::BATCH_RELEASE_COUNT) {
LOG_INFO("release old cannot free more memory");
break;
}
}
}
//evict plan item mgr
if (sql_plan_manager_->get_plan_item_mgr() != NULL) {
ObPlanItemMgr *plan_item_mgr = sql_plan_manager_->get_plan_item_mgr();
LOG_INFO("plan item evict mem start", K(evict_high_level), "mem_used", allocator->allocated());
while (evict_low_level < allocator->allocated()) {
evict_count = plan_item_mgr->release_old(ObSqlPlanMgr::BATCH_RELEASE_COUNT);
evict_batch_count += evict_count;
if (evict_count < ObSqlPlanMgr::BATCH_RELEASE_COUNT) {
LOG_INFO("release old cannot free more memory");
break;
}
}
}
}
return ret;
}
int ObSqlPlanEliminateTask::evict_queue_use(int64_t &evict_batch_count)
{
int ret = OB_SUCCESS;
ObConcurrentFIFOAllocator *allocator = NULL;
if (OB_ISNULL(sql_plan_manager_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(sql_plan_manager_), K(ret));
} else if (OB_ISNULL(allocator = sql_plan_manager_->get_allocator())) {
ret = OB_NOT_INIT;
LOG_WARN("fail to get sql plan evict memory level", K(ret));
} else {
//evict plan item mgr
if (sql_plan_manager_->get_plan_item_mgr() != NULL) {
ObPlanItemMgr *plan_item_mgr = sql_plan_manager_->get_plan_item_mgr();
int64_t max_queue_size = plan_item_mgr->get_capacity();
int64_t high_level_evict_size = max_queue_size * ObSqlPlanMgr::HIGH_LEVEL_EVICT_SIZE_PERCENT;
int64_t low_level_evict_size = max_queue_size * ObSqlPlanMgr::LOW_LEVEL_EVICT_SIZE_PERCENT;
if (plan_item_mgr->get_size_used() > high_level_evict_size) {
int64_t batch_count = (plan_item_mgr->get_size_used() - low_level_evict_size)
/ ObSqlPlanMgr::BATCH_RELEASE_COUNT;
LOG_INFO("plan item evict record start", "size_used",plan_item_mgr->get_size_used());
for (int i = 0; i < batch_count; i++) {
evict_batch_count += plan_item_mgr->release_old(ObSqlPlanMgr::BATCH_RELEASE_COUNT);
}
}
}
//evict plan real into mgr
if (sql_plan_manager_->get_plan_real_info_mgr() != NULL) {
ObPlanRealInfoMgr *plan_real_info_mgr = sql_plan_manager_->get_plan_real_info_mgr();
int64_t max_queue_size = plan_real_info_mgr->get_capacity();
int64_t high_level_evict_size = max_queue_size * ObSqlPlanMgr::HIGH_LEVEL_EVICT_SIZE_PERCENT;
int64_t low_level_evict_size = max_queue_size * ObSqlPlanMgr::LOW_LEVEL_EVICT_SIZE_PERCENT;
if (plan_real_info_mgr->get_size_used() > high_level_evict_size) {
int64_t batch_count = (plan_real_info_mgr->get_size_used() - low_level_evict_size)
/ ObSqlPlanMgr::BATCH_RELEASE_COUNT;
LOG_INFO("plan real info evict record start", "size_used",plan_real_info_mgr->get_size_used());
for (int i = 0; i < batch_count; i++) {
evict_batch_count += plan_real_info_mgr->release_old(ObSqlPlanMgr::BATCH_RELEASE_COUNT);
}
}
}
//evict plan table mgr
{
ObLockGuard<ObSpinLock> guard(sql_plan_manager_->plan_table_mgr_lock_);
for (int i = 0; i < sql_plan_manager_->get_plan_table_mgrs().count(); ++i) {
if (sql_plan_manager_->get_plan_table_mgrs().at(i) != NULL) {
ObPlanItemMgr *plan_table_mgr = sql_plan_manager_->get_plan_table_mgrs().at(i);
int64_t max_queue_size = plan_table_mgr->get_capacity();
int64_t high_level_evict_size = max_queue_size * ObSqlPlanMgr::HIGH_LEVEL_EVICT_SIZE_PERCENT;
int64_t low_level_evict_size = max_queue_size * ObSqlPlanMgr::LOW_LEVEL_EVICT_SIZE_PERCENT;
if (plan_table_mgr->get_size_used() > high_level_evict_size) {
int64_t batch_count = (plan_table_mgr->get_size_used() - low_level_evict_size)
/ ObSqlPlanMgr::BATCH_RELEASE_COUNT;
LOG_INFO("plan table evict record start", "size_used",plan_table_mgr->get_size_used());
for (int i = 0; i < batch_count; i++) {
evict_batch_count += plan_table_mgr->release_old(ObSqlPlanMgr::BATCH_RELEASE_COUNT);
}
}
}
}
}
}
return ret;
}
} // end of namespace sql
} // end of namespace oceanbase

View File

@ -1,100 +0,0 @@
// Copyright 2010-2016 Alibaba Inc. All Rights Reserved.
// Author:
// zhenling.zzg
// this file defines interface of sql plan manager
#ifndef SRC_OBSERVER_SQL_PLAN_MGR_H_
#define SRC_OBSERVER_SQL_PLAN_MGR_H_
#include "lib/allocator/ob_concurrent_fifo_allocator.h"
#include "observer/mysql/ob_ra_queue.h"
#include "ob_plan_info_manager.h"
#include "lib/lock/ob_spin_lock.h"
#include "lib/task/ob_timer.h"
namespace oceanbase
{
namespace sql
{
class ObSqlPlanMgr;
class ObSqlPlanEliminateTask : public common::ObTimerTask
{
public:
ObSqlPlanEliminateTask();
virtual ~ObSqlPlanEliminateTask();
void runTimerTask();
int init(const ObSqlPlanMgr *sql_plan_manager);
int check_config_mem_limit(bool &is_change);
int calc_evict_mem_level(int64_t &low, int64_t &high);
int evict_memory_use(int64_t evict_low_level,
int64_t evict_high_level,
int64_t &evict_batch_count);
int evict_queue_use(int64_t &evict_batch_count);
private:
ObSqlPlanMgr *sql_plan_manager_;
int64_t config_mem_limit_;
};
class ObSqlPlanMgr
{
public:
static const int64_t SQL_PLAN_PAGE_SIZE = (1LL << 17); // 128K
//进行一次release_old操作删除的记录数
static const int32_t BATCH_RELEASE_COUNT = 5000;
//初始化queue大小为100w
static const int64_t MAX_QUEUE_SIZE = 1; //100w
static const int64_t MINI_MODE_MAX_QUEUE_SIZE = 1; // 10w
//当sql_plan超过90w行记录时触发淘汰
static constexpr const double HIGH_LEVEL_EVICT_SIZE_PERCENT = 0.9;
//按行淘汰的低水位线
static constexpr const double LOW_LEVEL_EVICT_SIZE_PERCENT = 0.8;
//启动淘汰检查的时间间隔
static const int64_t EVICT_INTERVAL = 1000000; //1s
static const int64_t PLAN_TABLE_QUEUE_SIZE = 100000;
public:
ObSqlPlanMgr();
virtual ~ObSqlPlanMgr();
int init(uint64_t tenant_id);
void destroy();
common::ObConcurrentFIFOAllocator *get_allocator();
void* alloc(const int64_t size);
void free(void *ptr);
uint64_t get_tenant_id() const;
bool is_valid() const;
inline ObPlanItemMgr *get_plan_item_mgr() { return plan_item_mgr_; }
inline ObPlanRealInfoMgr *get_plan_real_info_mgr() { return plan_real_info_mgr_; }
inline ObIArray<ObPlanItemMgr*> &get_plan_table_mgrs() { return plan_table_mgrs_; }
int get_mem_limit(int64_t &mem_limit);
int init_plan_table_manager(ObPlanItemMgr* &plan_table_mgr);
int destroy_plan_table_manager(ObPlanItemMgr* &plan_table_mgr);
static int mtl_init(ObSqlPlanMgr* &sql_plan_mgr);
static void mtl_destroy(ObSqlPlanMgr* &sql_plan_mgr);
private:
DISALLOW_COPY_AND_ASSIGN(ObSqlPlanMgr);
friend class ObSqlPlanEliminateTask;
private:
common::ObConcurrentFIFOAllocator allocator_;
ObSqlPlanEliminateTask task_;
ObSEArray<ObPlanItemMgr*, 8> plan_table_mgrs_;
mutable ObSpinLock plan_table_mgr_lock_;
ObPlanItemMgr *plan_item_mgr_;
ObPlanRealInfoMgr *plan_real_info_mgr_;
bool destroyed_;
bool inited_;
// tenant id of this manager
uint64_t tenant_id_;
int tg_id_;
};
} // end of namespace sql
} // end of namespace oceanbase
#endif /* SRC_OBSERVER_SQL_PLAN_MGR_H_ */