[CP] [to #52990298]add is_packed for ps cursor
This commit is contained in:
@ -508,7 +508,8 @@ int ObMPBase::check_and_refresh_schema(uint64_t login_tenant_id,
|
||||
|
||||
int ObMPBase::response_row(ObSQLSessionInfo &session,
|
||||
common::ObNewRow &row,
|
||||
const ColumnsFieldIArray *fields)
|
||||
const ColumnsFieldIArray *fields,
|
||||
bool is_packed)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObArenaAllocator allocator;
|
||||
@ -525,7 +526,7 @@ int ObMPBase::response_row(ObSQLSessionInfo &session,
|
||||
ObCharsetType charset_type = CHARSET_INVALID;
|
||||
ObCharsetType ncharset_type = CHARSET_INVALID;
|
||||
// need at ps mode
|
||||
if (value.get_type() != fields->at(i).type_.get_type()) {
|
||||
if (!is_packed && value.get_type() != fields->at(i).type_.get_type()) {
|
||||
ObCastCtx cast_ctx(&allocator, NULL, CM_WARN_ON_FAIL, fields->at(i).type_.get_collation_type());
|
||||
if (OB_FAIL(common::ObObjCaster::to_type(fields->at(i).type_.get_type(),
|
||||
cast_ctx,
|
||||
@ -536,6 +537,8 @@ int ObMPBase::response_row(ObSQLSessionInfo &session,
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (is_packed) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(session.get_character_set_results(charset_type))) {
|
||||
LOG_WARN("fail to get result charset", K(ret));
|
||||
} else if (OB_FAIL(session.get_ncharacter_set_connection(ncharset_type))) {
|
||||
@ -579,7 +582,9 @@ int ObMPBase::response_row(ObSQLSessionInfo &session,
|
||||
if (OB_SUCC(ret)) {
|
||||
const ObDataTypeCastParams dtc_params = ObBasicSessionInfo::create_dtc_params(&session);
|
||||
ObSMRow sm_row(obmysql::BINARY, tmp_row, dtc_params, fields);
|
||||
sm_row.set_packed(is_packed);
|
||||
obmysql::OMPKRow rp(sm_row);
|
||||
rp.set_is_packed(is_packed);
|
||||
if (OB_FAIL(response_packet(rp, &session))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("response packet fail", K(ret));
|
||||
|
||||
@ -122,7 +122,8 @@ protected:
|
||||
sql::ObSQLSessionInfo *session);
|
||||
int response_row(sql::ObSQLSessionInfo &session,
|
||||
common::ObNewRow &row,
|
||||
const ColumnsFieldIArray *fields);
|
||||
const ColumnsFieldIArray *fields,
|
||||
bool is_packed);
|
||||
int process_extra_info(sql::ObSQLSessionInfo &session, const obmysql::ObMySQLRawPacket &pkt,
|
||||
bool &need_response_error);
|
||||
protected:
|
||||
|
||||
@ -462,9 +462,9 @@ int ObMPStmtFetch::response_result(pl::ObPLCursorInfo &cursor,
|
||||
if (OB_SUCC(ret) && !need_fetch && NULL != row) {
|
||||
if (has_long_data()) {
|
||||
OZ (response_row(session, *(const_cast<common::ObNewRow*>(row)),
|
||||
fields, column_flag_, cursor_id_, true));
|
||||
fields, column_flag_, cursor_id_, true, cursor.is_packed()));
|
||||
} else {
|
||||
OZ (response_row(session, *(const_cast<common::ObNewRow*>(row)), fields));
|
||||
OZ (response_row(session, *(const_cast<common::ObNewRow*>(row)), fields, cursor.is_packed()));
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
LOG_WARN("response row fail.", K(ret));
|
||||
@ -489,9 +489,9 @@ int ObMPStmtFetch::response_result(pl::ObPLCursorInfo &cursor,
|
||||
cursor.set_current_position(cur);
|
||||
if (has_long_data()) {
|
||||
OZ (response_row(session, row, fields, column_flag_, cursor_id_,
|
||||
0 == row_num ? true : false));
|
||||
0 == row_num ? true : false, cursor.is_packed()));
|
||||
} else {
|
||||
OZ (response_row(session, row, fields));
|
||||
OZ (response_row(session, row, fields, cursor.is_packed()));
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
++row_num;
|
||||
@ -776,7 +776,8 @@ int ObMPStmtFetch::response_row(ObSQLSessionInfo &session,
|
||||
const ColumnsFieldArray *fields,
|
||||
char *column_map,
|
||||
int32_t stmt_id,
|
||||
bool first_time)
|
||||
bool first_time,
|
||||
bool is_packed)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
common::ObNewRow row;
|
||||
@ -874,7 +875,7 @@ int ObMPStmtFetch::response_row(ObSQLSessionInfo &session,
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(response_row(session, row, fields))) {
|
||||
} else if (OB_FAIL(response_row(session, row, fields, is_packed))) {
|
||||
LOG_WARN("response row fail.", K(ret), K(stmt_id));
|
||||
} else {
|
||||
LOG_DEBUG("response row success.", K(stmt_id));
|
||||
|
||||
@ -60,11 +60,13 @@ public:
|
||||
const ColumnsFieldArray *fields,
|
||||
char *column_map,
|
||||
int32_t stmt_id,
|
||||
bool first_time);
|
||||
bool first_time,
|
||||
bool is_packed);
|
||||
int response_row(sql::ObSQLSessionInfo &session,
|
||||
common::ObNewRow &row,
|
||||
const ColumnsFieldArray *fields) {
|
||||
return ObMPBase::response_row(session, row, fields);
|
||||
const ColumnsFieldArray *fields,
|
||||
bool is_packed) {
|
||||
return ObMPBase::response_row(session, row, fields, is_packed);
|
||||
}
|
||||
bool need_close_cursor() { return need_close_cursor_; }
|
||||
void set_close_cursor() { need_close_cursor_ = true; }
|
||||
|
||||
@ -510,7 +510,7 @@ int ObMPStmtPrexecute::execute_response(ObSQLSessionInfo &session,
|
||||
++cur;
|
||||
cursor->set_current_position(cur);
|
||||
is_fetched = true;
|
||||
if (OB_FAIL(response_row(session, row, &cursor->get_field_columns()))) {
|
||||
if (OB_FAIL(response_row(session, row, &cursor->get_field_columns(), cursor->is_packed()))) {
|
||||
LOG_WARN("response row fail at line: ", K(ret), K(row_num));
|
||||
} else {
|
||||
++row_num;
|
||||
@ -965,7 +965,7 @@ int ObMPStmtPrexecute::response_arraybinding_rows(sql::ObSQLSessionInfo &session
|
||||
arraybinding_row_->get_cell(0).set_int(is_pl ? curr_sql_idx_ : affect_rows);
|
||||
arraybinding_row_->get_cell(1).set_int(0);
|
||||
arraybinding_row_->get_cell(2).set_null();
|
||||
if (OB_FAIL(response_row(session, *arraybinding_row_, arraybinding_columns_))) {
|
||||
if (OB_FAIL(response_row(session, *arraybinding_row_, arraybinding_columns_, false))) {
|
||||
LOG_WARN("fail to response fail row to client", K(ret));
|
||||
}
|
||||
}
|
||||
@ -982,7 +982,7 @@ int ObMPStmtPrexecute::response_fail_result(sql::ObSQLSessionInfo &session, int
|
||||
arraybinding_row_->get_cell(i).set_null();
|
||||
}
|
||||
arraybinding_row_->get_cell(arraybinding_row_->get_count() - 1).set_varchar(ob_oracle_strerror(err_ret));
|
||||
if (OB_FAIL(response_row(session, *arraybinding_row_, arraybinding_columns_))) {
|
||||
if (OB_FAIL(response_row(session, *arraybinding_row_, arraybinding_columns_, false))) {
|
||||
LOG_WARN("fail to response fail row to client", K(ret));
|
||||
}
|
||||
return ret;
|
||||
@ -996,6 +996,7 @@ int ObMPStmtPrexecute::response_returning_rows(ObSQLSessionInfo &session,
|
||||
MYSQL_PROTOCOL_TYPE protocol_type = BINARY;
|
||||
const ObNewRow *result_row = NULL;
|
||||
const common::ColumnsFieldIArray *fields = result.get_field_columns();
|
||||
bool is_packed = result.get_physical_plan() ? result.get_physical_plan()->is_packed() : false;
|
||||
if (OB_ISNULL(fields) || NULL == arraybinding_row_ || arraybinding_row_->get_count() < 3) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("fields is null", K(ret), KP(fields));
|
||||
@ -1012,7 +1013,7 @@ int ObMPStmtPrexecute::response_returning_rows(ObSQLSessionInfo &session,
|
||||
arraybinding_row_->get_cell(i+2) = value;
|
||||
}
|
||||
arraybinding_row_->get_cell(arraybinding_row_->get_count() - 1).set_null();
|
||||
if (OB_SUCCESS != (response_ret = response_row(session, *arraybinding_row_, arraybinding_columns_))) {
|
||||
if (OB_SUCCESS != (response_ret = response_row(session, *arraybinding_row_, arraybinding_columns_, is_packed))) {
|
||||
LOG_WARN("fail to response row to client", K(response_ret));
|
||||
}
|
||||
}
|
||||
@ -1031,7 +1032,7 @@ int ObMPStmtPrexecute::response_returning_rows(ObSQLSessionInfo &session,
|
||||
}
|
||||
arraybinding_row_->get_cell(arraybinding_row_->get_count() - 1).set_varchar(ob_oracle_strerror(ret));
|
||||
LOG_DEBUG("error occured before send arraybinding_row_", KPC(arraybinding_row_));
|
||||
if (OB_SUCCESS != (response_ret = response_row(session, *arraybinding_row_, arraybinding_columns_))) {
|
||||
if (OB_SUCCESS != (response_ret = response_row(session, *arraybinding_row_, arraybinding_columns_, false))) {
|
||||
LOG_WARN("fail to response row to client", K(response_ret));
|
||||
}
|
||||
if (is_save_exception_) {
|
||||
|
||||
@ -857,6 +857,7 @@ public:
|
||||
first_row_.reset();
|
||||
last_row_.reset();
|
||||
is_need_check_snapshot_ = false;
|
||||
is_packed_ = false;
|
||||
}
|
||||
|
||||
void reset()
|
||||
@ -1002,6 +1003,9 @@ public:
|
||||
uint64_t mem_limit,
|
||||
bool is_local_for_update = false);
|
||||
|
||||
inline void set_packed(bool is_packed) { is_packed_ = is_packed; }
|
||||
inline bool is_packed() { return is_packed_; }
|
||||
|
||||
TO_STRING_KV(K_(id),
|
||||
K_(is_explicit),
|
||||
K_(for_update),
|
||||
@ -1026,7 +1030,8 @@ public:
|
||||
K_(is_scrollable),
|
||||
K_(snapshot),
|
||||
K_(is_need_check_snapshot),
|
||||
K_(last_execute_time));
|
||||
K_(last_execute_time),
|
||||
K_(is_packed));
|
||||
|
||||
protected:
|
||||
int64_t id_; // Cursor ID
|
||||
@ -1058,6 +1063,7 @@ protected:
|
||||
bool is_need_check_snapshot_;
|
||||
int64_t last_execute_time_; // 记录上一次cursor操作的时间点
|
||||
bool last_stream_cursor_; // cursor复用场景下,记录上一次是否是流式cursor
|
||||
bool is_packed_;
|
||||
};
|
||||
|
||||
class ObPLGetCursorAttrInfo
|
||||
|
||||
@ -3938,6 +3938,12 @@ int ObSPIService::dbms_cursor_open(ObPLExecCtx *ctx,
|
||||
cursor.get_dbms_entity()->get_arena_allocator(),
|
||||
spi_result.get_result_set()->get_field_columns(),
|
||||
cursor.get_field_columns()));
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_NOT_NULL(spi_result.get_result_set()) && OB_NOT_NULL(spi_result.get_result_set()->get_physical_plan())) {
|
||||
ObPhysicalPlan *plan = spi_result.get_result_set()->get_physical_plan();
|
||||
cursor.set_packed(plan->is_packed());
|
||||
}
|
||||
}
|
||||
OZ (fill_cursor(*spi_result.get_result_set(), spi_cursor));
|
||||
if (OB_FAIL(ret)) {
|
||||
int cli_ret = OB_SUCCESS;
|
||||
|
||||
Reference in New Issue
Block a user