Report error when any load data task failed
This commit is contained in:
@ -1691,6 +1691,7 @@ int ObLoadDataSPImpl::handle_returned_insert_task(ObExecContext &ctx,
|
||||
"task_id", insert_task.task_id_,
|
||||
"ret", result.exec_ret_,
|
||||
"row_count", insert_task.row_count_);
|
||||
ret = result.exec_ret_;
|
||||
break;
|
||||
default:
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
|
||||
@ -193,6 +193,19 @@ int ObTableInsertOp::write_row_to_das_buffer()
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObTableInsertOp::record_err_for_load_data(int err_ret, int row_num)
|
||||
{
|
||||
UNUSED(err_ret);
|
||||
if (OB_NOT_NULL(ctx_.get_my_session()) && ctx_.get_my_session()->is_load_data_exec_session()) {
|
||||
//record failed line num in warning buffer for load data
|
||||
ObWarningBuffer *buffer = ob_get_tsi_warning_buffer();
|
||||
if (OB_NOT_NULL(buffer) && 0 == buffer->get_error_line()) {
|
||||
buffer->set_error_line_column(row_num, 0);
|
||||
}
|
||||
LOG_DEBUG("load data exec log error line", K(err_ret), K(row_num));
|
||||
}
|
||||
}
|
||||
|
||||
OB_INLINE int ObTableInsertOp::insert_row_to_das()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -243,16 +256,8 @@ OB_INLINE int ObTableInsertOp::insert_row_to_das()
|
||||
ObTriggerEvents::get_insert_event()))) {
|
||||
LOG_WARN("failed to handle before trigger", K(ret));
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret) && OB_NOT_NULL(ctx_.get_my_session())
|
||||
&& ctx_.get_my_session()->is_load_data_exec_session()) {
|
||||
//record failed line num in warning buffer for load data
|
||||
ObWarningBuffer *buffer = ob_get_tsi_warning_buffer();
|
||||
if (OB_NOT_NULL(buffer) && 0 == buffer->get_error_line()) {
|
||||
buffer->set_error_line_column(ins_rtdef.cur_row_num_, 0);
|
||||
}
|
||||
LOG_DEBUG("load data exec log error line", K(ret), K(ins_rtdef.cur_row_num_),
|
||||
K(buffer->get_error_line()));
|
||||
if (OB_FAIL(ret)) {
|
||||
record_err_for_load_data(ret, ins_rtdef.cur_row_num_);
|
||||
}
|
||||
} // end for global index ctdef loop
|
||||
|
||||
|
||||
@ -91,6 +91,7 @@ protected:
|
||||
int close_table_for_each();
|
||||
|
||||
int check_insert_affected_row();
|
||||
virtual void record_err_for_load_data(int err_ret, int row_num) override;
|
||||
protected:
|
||||
InsRtDef2DArray ins_rtdefs_; //see the comment of InsCtDef2DArray
|
||||
private:
|
||||
|
||||
@ -1104,6 +1104,7 @@ int ObTableModifyOp::inner_get_next_row()
|
||||
LOG_DEBUG("can't get gi task, iter end", K(MY_SPEC.id_), K(iter_end_));
|
||||
ret = OB_ITER_END;
|
||||
} else {
|
||||
int64_t row_count = 0;
|
||||
while (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(try_check_status())) {
|
||||
LOG_WARN("check status failed", K(ret));
|
||||
@ -1127,6 +1128,11 @@ int ObTableModifyOp::inner_get_next_row()
|
||||
} else if (MY_SPEC.is_returning_) {
|
||||
break;
|
||||
}
|
||||
row_count ++;
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
record_err_for_load_data(ret, row_count);
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret) && iter_end_ && dml_rtctx_.das_ref_.has_task()) {
|
||||
|
||||
@ -238,6 +238,7 @@ protected:
|
||||
int64_t delete_rows,
|
||||
int64_t found_rows);
|
||||
int discharge_das_write_buffer();
|
||||
virtual void record_err_for_load_data(int err_ret, int row_num) { UNUSED(err_ret); UNUSED(row_num); }
|
||||
public:
|
||||
common::ObMySQLProxy *sql_proxy_;
|
||||
observer::ObInnerSQLConnection *inner_conn_;
|
||||
|
||||
Reference in New Issue
Block a user