fix last_trace_id expr bug in remote execution
This commit is contained in:
@ -45,12 +45,12 @@ int ObExprLastTraceId::eval_last_trace_id(const ObExpr &expr, ObEvalCtx &ctx,
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
UNUSED(expr);
|
||||
const ObSQLSessionInfo *session_info = NULL;
|
||||
if (OB_ISNULL(session_info = ctx.exec_ctx_.get_my_session())) {
|
||||
const ObPhysicalPlanCtx *phy_plan_ctx = NULL;
|
||||
if (OB_ISNULL(phy_plan_ctx = ctx.exec_ctx_.get_physical_plan_ctx())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
SQL_ENG_LOG(WARN, "session info is null", K(ret));
|
||||
} else {
|
||||
const ObCurTraceId::TraceId &trace_id = session_info->get_last_trace_id();
|
||||
const ObCurTraceId::TraceId &trace_id = phy_plan_ctx->get_last_trace_id();
|
||||
if (trace_id.is_invalid()) {
|
||||
expr_datum.set_null();
|
||||
} else {
|
||||
|
||||
@ -681,6 +681,7 @@ OB_DEF_SERIALIZE(ObPhysicalPlanCtx)
|
||||
OB_UNIS_ENCODE(tenant_schema_version_);
|
||||
OB_UNIS_ENCODE(cursor_count);
|
||||
OB_UNIS_ENCODE(plan_start_time_);
|
||||
OB_UNIS_ENCODE(last_trace_id_);
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -762,6 +763,7 @@ OB_DEF_SERIALIZE_SIZE(ObPhysicalPlanCtx)
|
||||
OB_UNIS_ADD_LEN(tenant_schema_version_);
|
||||
OB_UNIS_ADD_LEN(cursor_count);
|
||||
OB_UNIS_ADD_LEN(plan_start_time_);
|
||||
OB_UNIS_ADD_LEN(last_trace_id_);
|
||||
return len;
|
||||
}
|
||||
|
||||
@ -842,6 +844,7 @@ OB_DEF_DESERIALIZE(ObPhysicalPlanCtx)
|
||||
(void)ObSQLUtils::adjust_time_by_ntp_offset(plan_start_time_);
|
||||
(void)ObSQLUtils::adjust_time_by_ntp_offset(ts_timeout_us_);
|
||||
}
|
||||
OB_UNIS_DECODE(last_trace_id_);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
@ -419,6 +419,12 @@ public:
|
||||
void set_plan_start_time(int64_t t) { plan_start_time_ = t; }
|
||||
int64_t get_plan_start_time() const { return plan_start_time_; }
|
||||
int replace_batch_param_datum(int64_t cur_group_id);
|
||||
void set_last_trace_id(const common::ObCurTraceId::TraceId &trace_id)
|
||||
{
|
||||
last_trace_id_ = trace_id;
|
||||
}
|
||||
const common::ObCurTraceId::TraceId &get_last_trace_id() const { return last_trace_id_; }
|
||||
common::ObCurTraceId::TraceId &get_last_trace_id() { return last_trace_id_; }
|
||||
|
||||
private:
|
||||
void reset_datum_frame(char *frame, int64_t expr_cnt);
|
||||
@ -481,6 +487,7 @@ private:
|
||||
//在存储层,如果table_id是系统表,则会跳过对tenant_schema_version的检查,还是使用原来的办法获取table schema version(见ObRelativeTables::check_schema_version)
|
||||
int64_t tenant_schema_version_;
|
||||
int64_t orig_question_mark_cnt_;
|
||||
common::ObCurTraceId::TraceId last_trace_id_;
|
||||
|
||||
private:
|
||||
/**
|
||||
|
||||
@ -37,7 +37,7 @@ using namespace oceanbase::sql;
|
||||
using namespace oceanbase::sql::dtl;
|
||||
using namespace oceanbase::share;
|
||||
|
||||
OB_SERIALIZE_MEMBER(ObExprExtraSerializeInfo, *current_time_);
|
||||
OB_SERIALIZE_MEMBER(ObExprExtraSerializeInfo, *current_time_, *last_trace_id_);
|
||||
|
||||
// 物理分布策略:对于叶子节点,dfo 分布一般直接按照数据分布来
|
||||
// Note:如果 dfo 中有两个及以上的 scan,仅仅考虑第一个。并且,要求其余 scan
|
||||
@ -1347,6 +1347,7 @@ int ObPxTreeSerializer::serialize_expr_frame_info(char *buf,
|
||||
OB_UNIS_ENCODE(expr_frame_info.need_ctx_cnt_);
|
||||
ObPhysicalPlanCtx *plan_ctx = ctx.get_physical_plan_ctx();
|
||||
expr_info.current_time_ = &plan_ctx->get_cur_time();
|
||||
expr_info.last_trace_id_ = &plan_ctx->get_last_trace_id();
|
||||
// rt exprs
|
||||
ObExpr::get_serialize_array() = &exprs;
|
||||
|
||||
@ -1507,6 +1508,7 @@ int ObPxTreeSerializer::deserialize_expr_frame_info(const char *buf,
|
||||
ObExprExtraSerializeInfo expr_info;
|
||||
ObPhysicalPlanCtx *plan_ctx = ctx.get_physical_plan_ctx();
|
||||
expr_info.current_time_ = &plan_ctx->get_cur_time();
|
||||
expr_info.last_trace_id_ = &plan_ctx->get_last_trace_id();
|
||||
if (OB_FAIL(expr_info.deserialize(buf, data_len, pos))) {
|
||||
LOG_WARN("fail to deserialize expr extra info", K(ret));
|
||||
} else if (OB_FAIL(serialization::decode_i32(buf, data_len, pos, &expr_cnt))) {
|
||||
@ -1622,6 +1624,7 @@ int64_t ObPxTreeSerializer::get_serialize_expr_frame_info_size(
|
||||
ObExprExtraSerializeInfo expr_info;
|
||||
ObPhysicalPlanCtx *plan_ctx = ctx.get_physical_plan_ctx();
|
||||
expr_info.current_time_ = &plan_ctx->get_cur_time();
|
||||
expr_info.last_trace_id_ = &plan_ctx->get_last_trace_id();
|
||||
ObIArray<ObExpr> &exprs = expr_frame_info.rt_exprs_;
|
||||
int32_t expr_cnt = expr_frame_info.is_mark_serialize()
|
||||
? expr_frame_info.ser_expr_marks_.count()
|
||||
|
||||
@ -46,8 +46,9 @@ struct ObExprExtraSerializeInfo
|
||||
{
|
||||
OB_UNIS_VERSION(1);
|
||||
public:
|
||||
ObExprExtraSerializeInfo() : current_time_(nullptr) { }
|
||||
ObExprExtraSerializeInfo() : current_time_(nullptr), last_trace_id_(nullptr) { }
|
||||
common::ObObj *current_time_;
|
||||
common::ObCurTraceId::TraceId *last_trace_id_;
|
||||
};
|
||||
|
||||
class ObPxSqcUtil
|
||||
|
||||
Reference in New Issue
Block a user