Record request_id in sql audit before push into queue.
This commit is contained in:
@ -198,19 +198,16 @@ int ObMySQLRequestManager::record_request(const ObAuditRecordData &audit_record,
|
|||||||
|
|
||||||
//push into queue
|
//push into queue
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
int64_t req_id = 0;
|
|
||||||
if (is_sensitive) {
|
if (is_sensitive) {
|
||||||
free(record);
|
free(record);
|
||||||
record = NULL;
|
record = NULL;
|
||||||
} else if (OB_FAIL(queue_.push(record, req_id))) {
|
} else if (OB_FAIL(queue_.push_with_imme_seq(record, record->data_.request_id_))) {
|
||||||
//sql audit槽位已满时会push失败, 依赖后台线程进行淘汰获得可用槽位
|
//sql audit槽位已满时会push失败, 依赖后台线程进行淘汰获得可用槽位
|
||||||
if (REACH_TIME_INTERVAL(2 * 1000 * 1000)) {
|
if (REACH_TIME_INTERVAL(2 * 1000 * 1000)) {
|
||||||
SERVER_LOG(WARN, "push into queue failed", K(ret));
|
SERVER_LOG(WARN, "push into queue failed", K(ret));
|
||||||
}
|
}
|
||||||
free(record);
|
free(record);
|
||||||
record = NULL;
|
record = NULL;
|
||||||
} else {
|
|
||||||
record->data_.request_id_ = req_id;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -123,6 +123,26 @@ public:
|
|||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
int push_with_imme_seq(void* p, int64_t& seq) {
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
if (NULL == array_) {
|
||||||
|
ret = OB_NOT_INIT;
|
||||||
|
} else if (NULL == p) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
} else {
|
||||||
|
uint64_t push_limit = ATOMIC_LOAD(&pop_) + capacity_;
|
||||||
|
uint64_t push_idx = faa_bounded(&push_, &push_limit, push_limit);
|
||||||
|
if (push_idx < push_limit) {
|
||||||
|
void** addr = get_addr(push_idx);
|
||||||
|
seq = push_idx;
|
||||||
|
while(!ATOMIC_BCAS(addr, NULL, p))
|
||||||
|
;
|
||||||
|
} else {
|
||||||
|
ret = OB_ENTRY_NOT_EXIST;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
void* get(uint64_t seq, Ref* ref) {
|
void* get(uint64_t seq, Ref* ref) {
|
||||||
void* ret = NULL;
|
void* ret = NULL;
|
||||||
if (NULL != array_) {
|
if (NULL != array_) {
|
||||||
|
@ -149,14 +149,14 @@ int ObMonitorInfoManager::add_monitor_info(ObPhyPlanMonitorInfo *info)
|
|||||||
int64_t retry_times = 3;
|
int64_t retry_times = 3;
|
||||||
while (retry_times > 0) {
|
while (retry_times > 0) {
|
||||||
retry_times --;
|
retry_times --;
|
||||||
int64_t req_id = 0;
|
int64_t &req_id = info->get_request_id();
|
||||||
if (OB_FAIL(slow_query_queue_.push((void*)info, req_id))) {
|
int64_t cur_operator_info_size = info->get_operator_info_memory_size();
|
||||||
|
if (OB_FAIL(slow_query_queue_.push_with_imme_seq((void*)info, req_id))) {
|
||||||
if (OB_SIZE_OVERFLOW == ret) {
|
if (OB_SIZE_OVERFLOW == ret) {
|
||||||
clear_queue(OB_BATCH_GC_COUNT);
|
clear_queue(OB_BATCH_GC_COUNT);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
info->set_request_id(req_id);
|
operator_info_size_ += cur_operator_info_size;
|
||||||
operator_info_size_ += info->get_operator_info_memory_size();
|
|
||||||
LOG_DEBUG("add monitor info", K(*info));
|
LOG_DEBUG("add monitor info", K(*info));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -72,6 +72,7 @@ public:
|
|||||||
}
|
}
|
||||||
void set_request_id(int64_t request_id) { request_id_ = request_id; }
|
void set_request_id(int64_t request_id) { request_id_ = request_id; }
|
||||||
void set_plan_id(int64_t plan_id) {plan_id_ = plan_id; }
|
void set_plan_id(int64_t plan_id) {plan_id_ = plan_id; }
|
||||||
|
int64_t &get_request_id() { return request_id_; }
|
||||||
int64_t get_request_id() const { return request_id_; }
|
int64_t get_request_id() const { return request_id_; }
|
||||||
int64_t get_plan_id() const { return plan_id_; }
|
int64_t get_plan_id() const { return plan_id_; }
|
||||||
int64_t get_execution_time() const { return execution_time_; }
|
int64_t get_execution_time() const { return execution_time_; }
|
||||||
|
Reference in New Issue
Block a user