handle error line in parse

This commit is contained in:
yongshige 2023-05-11 12:37:40 +00:00 committed by ob-robot
parent e930ec4c1c
commit 2325157f39
2 changed files with 120 additions and 77 deletions

View File

@ -96,7 +96,12 @@ const char *ObLoadDataDirectImpl::Logger::log_file_column_names =
const char *ObLoadDataDirectImpl::Logger::log_file_row_fmt = "%.*s\t%ld\t%d\t%s\t\n";
ObLoadDataDirectImpl::Logger::Logger()
: is_oracle_mode_(false), buf_(nullptr), is_create_log_succ_(false), is_inited_(false)
: is_oracle_mode_(false),
buf_(nullptr),
is_create_log_succ_(false),
err_cnt_(0),
max_error_rows_(0),
is_inited_(false)
{
}
@ -108,7 +113,7 @@ ObLoadDataDirectImpl::Logger::~Logger()
}
}
int ObLoadDataDirectImpl::Logger::init(const ObString &load_info)
int ObLoadDataDirectImpl::Logger::init(const ObString &load_info, int64_t max_error_rows)
{
int ret = OB_SUCCESS;
if (IS_INIT) {
@ -128,6 +133,7 @@ int ObLoadDataDirectImpl::Logger::init(const ObString &load_info)
is_create_log_succ_ = true;
}
is_oracle_mode_ = lib::is_oracle_mode();
max_error_rows_ = max_error_rows;
is_inited_ = true;
}
return ret;
@ -186,16 +192,25 @@ int ObLoadDataDirectImpl::Logger::log_error_line(const ObString &file_name, int6
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObLoadDataDirectImpl::Logger not init", KR(ret), KP(this));
} else if (is_create_log_succ_) {
const char *err_msg = ob_errpkt_strerror(err_code, is_oracle_mode_);
const int err_no = ob_errpkt_errno(err_code, is_oracle_mode_);
int64_t pos = 0;
lib::ObMutexGuard guard(mutex_);
if (OB_FAIL(databuff_printf(buf_, DEFAULT_BUF_LENGTH, pos, log_file_row_fmt, file_name.length(),
file_name.ptr(), line_no, err_no, err_msg))) {
LOG_WARN("fail to databuff printf", KR(ret), K(line_no), K(err_no), K(err_msg));
} else if (OB_FAIL(file_appender_.append(buf_, pos, false))) {
LOG_WARN("fail to append log", KR(ret), K(pos), K(line_no), K(err_no), K(err_msg));
} else {
if (is_create_log_succ_) {
int tmp_ret = OB_SUCCESS;
const char *err_msg = ob_errpkt_strerror(err_code, is_oracle_mode_);
const int err_no = ob_errpkt_errno(err_code, is_oracle_mode_);
int64_t pos = 0;
lib::ObMutexGuard guard(mutex_);
if (OB_TMP_FAIL(databuff_printf(buf_, DEFAULT_BUF_LENGTH, pos, log_file_row_fmt,
file_name.length(), file_name.ptr(), line_no, err_no,
err_msg))) {
LOG_WARN("fail to databuff printf", KR(tmp_ret), K(line_no), K(err_no), K(err_msg));
} else if (OB_TMP_FAIL(file_appender_.append(buf_, pos, false))) {
LOG_WARN("fail to append log", KR(tmp_ret), K(pos), K(line_no), K(err_no), K(err_msg));
}
}
if (inc_error_count() > max_error_rows_) {
ret = OB_ERR_TOO_MANY_ROWS;
LOG_WARN("error row count reaches its maximum value", KR(ret), K(max_error_rows_),
K(err_cnt_));
}
}
return ret;
@ -711,7 +726,6 @@ int ObLoadDataDirectImpl::DataReader::get_next_buffer(ObLoadFileBuffer &file_buf
} else if (OB_FAIL(data_trimer_.backup_incomplate_data(file_buffer, complete_len))) {
LOG_WARN("fail to back up data", KR(ret));
} else {
data_trimer_.commit_line_cnt(complete_cnt);
line_count = complete_cnt;
LOG_DEBUG("LOAD DATA backup", "data", data_trimer_.get_incomplate_data_string());
}
@ -818,55 +832,59 @@ int ObLoadDataDirectImpl::DataParser::get_next_row(ObNewRow &row)
} else if (data_buffer_->empty()) {
ret = OB_ITER_END;
} else {
const char *str = data_buffer_->data();
const char *end = str + data_buffer_->get_data_length();
ObSEArray<ObCSVGeneralParser::LineErrRec, 1> err_records;
int64_t nrows = 1;
auto handle_one_line = [](ObIArray<ObCSVGeneralParser::FieldValue> &fields_per_line) -> int {
UNUSED(fields_per_line);
return OB_SUCCESS;
};
ret = csv_parser_.scan<decltype(handle_one_line), true>(
str, end, nrows, escape_buffer_.file_buffer_->begin_ptr(),
escape_buffer_.file_buffer_->begin_ptr() + escape_buffer_.file_buffer_->get_buffer_size(),
handle_one_line, err_records, data_buffer_->is_end_file_);
if (OB_FAIL(ret)) {
LOG_WARN("fail to scan", KR(ret));
} else if (OB_UNLIKELY(!err_records.empty())) {
ret = OB_ERR_WRONG_VALUE;
LOG_WARN("parse error", KR(ret));
} else if (0 == nrows) {
ret = OB_ITER_END;
} else {
++pos_;
const ObIArray<ObCSVGeneralParser::FieldValue> &field_values_in_file =
csv_parser_.get_fields_per_line();
for (int64_t i = 0; OB_SUCC(ret) && i < row.count_; ++i) {
const ObCSVGeneralParser::FieldValue &str_v = field_values_in_file.at(i);
ObObj &obj = row.cells_[i];
if (str_v.is_null_) {
obj.set_null();
while (OB_SUCC(ret)) {
const char *str = data_buffer_->data();
const char *end = str + data_buffer_->get_data_length();
ObSEArray<ObCSVGeneralParser::LineErrRec, 1> err_records;
int64_t nrows = 1;
ret = csv_parser_.scan<decltype(handle_one_line), true>(
str, end, nrows, escape_buffer_.file_buffer_->begin_ptr(),
escape_buffer_.file_buffer_->begin_ptr() + escape_buffer_.file_buffer_->get_buffer_size(),
handle_one_line, err_records, data_buffer_->is_end_file_);
if (OB_FAIL(ret)) {
LOG_WARN("fail to scan", KR(ret));
} else if (0 == nrows) {
ret = OB_ITER_END;
} else {
++pos_;
data_buffer_->advance(str - data_buffer_->data());
if (OB_UNLIKELY(!err_records.empty())) {
if (OB_FAIL(log_error_line(err_records.at(0).err_code, start_line_no_ + pos_))) {
LOG_WARN("fail to log error line", KR(ret));
}
} else {
obj.set_string(ObVarcharType, ObString(str_v.len_, str_v.ptr_));
obj.set_collation_type(
ObCharset::get_default_collation(csv_parser_.get_format().cs_type_));
const ObIArray<ObCSVGeneralParser::FieldValue> &field_values_in_file =
csv_parser_.get_fields_per_line();
for (int64_t i = 0; OB_SUCC(ret) && i < row.count_; ++i) {
const ObCSVGeneralParser::FieldValue &str_v = field_values_in_file.at(i);
ObObj &obj = row.cells_[i];
if (str_v.is_null_) {
obj.set_null();
} else {
obj.set_string(ObVarcharType, ObString(str_v.len_, str_v.ptr_));
obj.set_collation_type(
ObCharset::get_default_collation(csv_parser_.get_format().cs_type_));
}
}
break;
}
}
}
for (int64_t i = 0; i < err_records.count(); ++i) {
log_error_line(err_records.at(i).err_code, start_line_no_ + pos_ + 1);
}
data_buffer_->advance(str - data_buffer_->data());
}
return ret;
}
void ObLoadDataDirectImpl::DataParser::log_error_line(int err_ret, int64_t err_line_no)
int ObLoadDataDirectImpl::DataParser::log_error_line(int err_ret, int64_t err_line_no)
{
int ret = OB_SUCCESS;
if (OB_FAIL(logger_->log_error_line(file_name_, err_line_no, err_ret))) {
LOG_WARN("fail to log error line", KR(ret), K(err_ret), K(err_line_no));
}
return ret;
}
/**
@ -985,6 +1003,7 @@ ObLoadDataDirectImpl::FileLoadExecutor::FileLoadExecutor()
execute_ctx_(nullptr),
task_scheduler_(nullptr),
worker_ctx_array_(nullptr),
total_line_count_(0),
is_inited_(false)
{
}
@ -1222,6 +1241,8 @@ int ObLoadDataDirectImpl::FileLoadExecutor::handle_task_result(int64_t task_id,
int ret = OB_SUCCESS;
if (OB_FAIL(result.ret_)) {
LOG_WARN("task result is failed", KR(ret), K(task_id));
} else {
total_line_count_ += result.parsed_row_count_;
}
/*
if (0 != result.created_ts_) {
@ -1265,7 +1286,7 @@ void ObLoadDataDirectImpl::FileLoadExecutor::wait_all_task_finished()
int ObLoadDataDirectImpl::FileLoadExecutor::process_task_handle(int64_t worker_idx,
TaskHandle *handle,
int64_t &total_processed_line_count)
int64_t &parsed_line_count)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
@ -1281,7 +1302,8 @@ int ObLoadDataDirectImpl::FileLoadExecutor::process_task_handle(int64_t worker_i
const int64_t data_buffer_length = handle->data_buffer_.get_data_length();
int64_t parsed_bytes = 0;
int64_t processed_line_count = 0;
total_processed_line_count = 0;
int64_t total_processed_line_count = 0;
parsed_line_count = 0;
ObNewRow row;
bool is_iter_end = false;
if (OB_FAIL(worker_ctx.data_parser_.parse(handle->data_desc_.filename_, handle->start_line_no_,
@ -1336,10 +1358,12 @@ int ObLoadDataDirectImpl::FileLoadExecutor::process_task_handle(int64_t worker_i
}
}
} // end while()
parsed_line_count = worker_ctx.data_parser_.get_parsed_row_count();
parsed_bytes = data_buffer_length - handle->data_buffer_.get_data_length();
handle->result_.proccessed_row_count_ += total_processed_line_count;
handle->result_.parsed_row_count_ += parsed_line_count;
handle->result_.parsed_bytes_ += parsed_bytes;
ATOMIC_AAF(&execute_ctx_->job_stat_->parsed_rows_, total_processed_line_count);
ATOMIC_AAF(&execute_ctx_->job_stat_->parsed_rows_, parsed_line_count);
ATOMIC_AAF(&execute_ctx_->job_stat_->parsed_bytes_, parsed_bytes);
}
return ret;
@ -1408,7 +1432,7 @@ int ObLoadDataDirectImpl::LargeFileLoadTaskProcessor::process()
*/
ObLoadDataDirectImpl::LargeFileLoadExecutor::LargeFileLoadExecutor()
: next_session_id_(1), total_line_count_(0)
: next_session_id_(1)
{
}
@ -1484,7 +1508,6 @@ int ObLoadDataDirectImpl::LargeFileLoadExecutor::get_next_task_handle(TaskHandle
handle->result_.created_ts_ = ObTimeUtil::current_time();
handle->data_buffer_.swap(expr_buffer_);
handle->data_buffer_.is_end_file_ = data_reader_.is_end_file();
total_line_count_ += current_line_count;
}
return ret;
}
@ -1515,20 +1538,30 @@ int ObLoadDataDirectImpl::LargeFileLoadExecutor::skip_ignore_rows()
int ret = OB_SUCCESS;
const int64_t ignore_row_num = execute_param_->ignore_row_num_;
if (ignore_row_num > 0) {
int64_t skip_line_count = 0;
int64_t line_count = 0;
while (OB_SUCC(ret) && data_reader_.get_lines_count() < ignore_row_num) {
int64_t skip_bytes = 0;
while (OB_SUCC(ret) && skip_line_count < ignore_row_num) {
if (OB_FAIL(data_reader_.get_next_buffer(*expr_buffer_.file_buffer_, line_count,
ignore_row_num - data_reader_.get_lines_count()))) {
ignore_row_num - skip_line_count))) {
if (OB_UNLIKELY(OB_ITER_END != ret)) {
LOG_WARN("fail to get next buffer", KR(ret));
} else {
ret = OB_SUCCESS;
break;
}
} else {
skip_line_count += line_count;
skip_bytes += expr_buffer_.file_buffer_->get_data_len();
}
}
if (OB_SUCC(ret)) {
total_line_count_ += skip_line_count;
ATOMIC_AAF(&execute_ctx_->job_stat_->parsed_rows_, skip_line_count);
ATOMIC_AAF(&execute_ctx_->job_stat_->parsed_bytes_, skip_bytes);
}
LOG_INFO("LOAD DATA skip ignore rows", KR(ret), K(ignore_row_num),
K(data_reader_.get_lines_count()));
K(skip_line_count), K(skip_bytes));
}
return ret;
}
@ -1569,26 +1602,24 @@ int ObLoadDataDirectImpl::MultiFilesLoadTaskProcessor::process()
{
int ret = OB_SUCCESS;
handle_->result_.start_process_ts_ = ObTimeUtil::current_time();
int64_t total_line_count = 0;
int64_t current_line_count = 0;
if (OB_FAIL(data_reader_.init(execute_param_->data_access_param_, *execute_ctx_,
handle_->data_desc_, true))) {
LOG_WARN("fail to init data reader", KR(ret));
} else if (0 == handle_->data_desc_.file_idx_ && 0 == handle_->data_desc_.start_) {
if (OB_FAIL(skip_ignore_rows(total_line_count))) {
if (OB_FAIL(skip_ignore_rows(current_line_count))) {
LOG_WARN("fail to skip ignore rows", KR(ret));
} else if (OB_UNLIKELY(total_line_count < execute_param_->ignore_row_num_)) {
} else if (OB_UNLIKELY(current_line_count < execute_param_->ignore_row_num_)) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("direct-load does not support ignore rows exceed the first file", KR(ret),
K(total_line_count), K(execute_param_->ignore_row_num_));
K(current_line_count), K(execute_param_->ignore_row_num_));
} else if (!handle_->data_buffer_.empty()) {
handle_->start_line_no_ = total_line_count + 1;
handle_->data_buffer_.is_end_file_ = data_reader_.is_end_file();
handle_->start_line_no_ = handle_->result_.parsed_row_count_ + 1;
current_line_count = 0;
if (OB_FAIL(
file_load_executor_->process_task_handle(worker_idx_, handle_, current_line_count))) {
LOG_WARN("fail to process task handle", KR(ret));
} else {
total_line_count += current_line_count;
current_line_count = 0;
}
}
}
@ -1600,15 +1631,12 @@ int ObLoadDataDirectImpl::MultiFilesLoadTaskProcessor::process()
LOG_WARN("fail to get next buffer", KR(ret));
} else {
ret = OB_SUCCESS;
if (OB_UNLIKELY(!handle_->data_buffer_.empty())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("has incomplate data", KR(ret));
}
break;
}
} else {
handle_->data_buffer_.is_end_file_ = data_reader_.is_end_file();
handle_->start_line_no_ = total_line_count + 1;
handle_->start_line_no_ = handle_->result_.parsed_row_count_ + 1;
current_line_count = 0;
if (OB_FAIL(
file_load_executor_->process_task_handle(worker_idx_, handle_, current_line_count))) {
LOG_WARN("fail to process task handle", KR(ret));
@ -1616,9 +1644,6 @@ int ObLoadDataDirectImpl::MultiFilesLoadTaskProcessor::process()
ret = OB_NOT_SUPPORTED;
LOG_WARN("direct-load does not support big row", KR(ret), "size",
handle_->data_buffer_.get_data_length());
} else {
total_line_count += current_line_count;
current_line_count = 0;
}
}
}
@ -1630,6 +1655,7 @@ int ObLoadDataDirectImpl::MultiFilesLoadTaskProcessor::skip_ignore_rows(int64_t
int ret = OB_SUCCESS;
const int64_t ignore_row_num = execute_param_->ignore_row_num_;
skip_line_count = 0;
int64_t skip_bytes = 0;
if (ignore_row_num > 0) {
DataBuffer &data_buffer = handle_->data_buffer_;
data_buffer.reuse();
@ -1660,10 +1686,17 @@ int ObLoadDataDirectImpl::MultiFilesLoadTaskProcessor::skip_ignore_rows(int64_t
} else {
data_buffer.advance(complete_len);
skip_line_count += complete_cnt;
skip_bytes += complete_len;
}
}
}
LOG_INFO("LOAD DATA skip ignore rows", KR(ret), K(ignore_row_num), K(skip_line_count));
if (OB_SUCC(ret)) {
handle_->result_.parsed_row_count_ += skip_line_count;
handle_->result_.parsed_bytes_ += skip_bytes;
ATOMIC_AAF(&execute_ctx_->job_stat_->parsed_rows_, skip_line_count);
ATOMIC_AAF(&execute_ctx_->job_stat_->parsed_bytes_, skip_bytes);
}
LOG_INFO("LOAD DATA skip ignore rows", KR(ret), K(ignore_row_num), K(skip_line_count), K(skip_bytes));
}
return ret;
}
@ -1757,6 +1790,7 @@ int ObLoadDataDirectImpl::execute(ObExecContext &ctx, ObLoadDataStmt &load_stmt)
load_stmt_ = &load_stmt;
const ObLoadArgument &load_args = load_stmt_->get_load_arguments();
const int64_t original_timeout_us = THIS_WORKER.get_timeout_ts();
int64_t total_line_count = 0;
if (OB_SUCC(ret)) {
int64_t query_timeout = 0;
@ -1823,6 +1857,8 @@ int ObLoadDataDirectImpl::execute(ObExecContext &ctx, ObLoadDataStmt &load_stmt)
LOG_WARN("fail to init file load executor", KR(ret));
} else if (OB_FAIL(file_load_executor->execute())) {
LOG_WARN("fail to execute file load", KR(ret));
} else {
total_line_count = file_load_executor->get_total_line_count();
}
}
if (nullptr != file_load_executor) {
@ -1838,7 +1874,7 @@ int ObLoadDataDirectImpl::execute(ObExecContext &ctx, ObLoadDataStmt &load_stmt)
} else {
ObPhysicalPlanCtx *phy_plan_ctx = execute_ctx_.exec_ctx_->get_physical_plan_ctx();
phy_plan_ctx->set_affected_rows(result_info.rows_affected_);
phy_plan_ctx->set_row_matched_count(result_info.records_);
phy_plan_ctx->set_row_matched_count(total_line_count);
phy_plan_ctx->set_row_deleted_count(result_info.deleted_);
phy_plan_ctx->set_row_duplicated_count(result_info.skipped_);
}
@ -2104,7 +2140,7 @@ int ObLoadDataDirectImpl::init_logger()
OX(load_info.assign_ptr(buf, pos));
}
if (OB_SUCC(ret)) {
if (OB_FAIL(logger_.init(load_info))) {
if (OB_FAIL(logger_.init(load_info, execute_param_.max_error_rows_))) {
LOG_WARN("fail to init logger", KR(ret));
}
}

View File

@ -111,8 +111,9 @@ private:
public:
Logger();
~Logger();
int init(const common::ObString &load_info);
int init(const common::ObString &load_info, int64_t max_error_rows);
int log_error_line(const common::ObString &file_name, int64_t line_no, int err_code);
int64_t inc_error_count() { return ATOMIC_AAF(&err_cnt_, 1); }
private:
int create_log_file(const common::ObString &load_info);
static int generate_log_file_name(char *buf, int64_t size, common::ObString &file_name);
@ -122,6 +123,8 @@ private:
bool is_oracle_mode_;
char *buf_;
bool is_create_log_succ_;
int64_t err_cnt_;
int64_t max_error_rows_;
bool is_inited_;
DISALLOW_COPY_AND_ASSIGN(Logger);
};
@ -247,7 +250,6 @@ private:
int get_next_buffer(ObLoadFileBuffer &file_buffer, int64_t &line_count,
int64_t limit = INT64_MAX);
int get_next_raw_buffer(DataBuffer &data_buffer);
int64_t get_lines_count() const { return data_trimer_.get_lines_count(); }
bool has_incomplate_data() const { return data_trimer_.has_incomplate_data(); }
bool is_end_file() const { return io_accessor_.get_offset() >= end_offset_; }
ObCSVGeneralParser &get_csv_parser() { return csv_parser_; }
@ -272,8 +274,9 @@ private:
int init(const DataAccessParam &data_access_param, Logger &logger);
int parse(const common::ObString &file_name, int64_t start_line_no, DataBuffer &data_buffer);
int get_next_row(common::ObNewRow &row);
int64_t get_parsed_row_count() { return pos_; }
private:
void log_error_line(int err_ret, int64_t err_line_no);
int log_error_line(int err_ret, int64_t err_line_no);
private:
ObCSVGeneralParser csv_parser_;
DataBuffer escape_buffer_;
@ -304,6 +307,7 @@ private:
start_process_ts_(0),
finished_ts_(0),
proccessed_row_count_(0),
parsed_row_count_(0),
parsed_bytes_(0)
{
}
@ -314,6 +318,7 @@ private:
start_process_ts_ = 0;
finished_ts_ = 0;
proccessed_row_count_ = 0;
parsed_row_count_ = 0;
parsed_bytes_ = 0;
}
int ret_;
@ -321,9 +326,10 @@ private:
int64_t start_process_ts_;
int64_t finished_ts_;
int64_t proccessed_row_count_;
int64_t parsed_row_count_;
int64_t parsed_bytes_;
TO_STRING_KV(K_(ret), K_(created_ts), K_(start_process_ts), K_(finished_ts),
K_(proccessed_row_count), K_(parsed_bytes));
K_(proccessed_row_count), K_(parsed_row_count), K_(parsed_bytes));
};
struct TaskHandle
@ -353,6 +359,7 @@ private:
void free_task(observer::ObTableLoadTask *task);
void task_finished(TaskHandle *handle);
int process_task_handle(int64_t worker_idx, TaskHandle *handle, int64_t &line_count);
int64_t get_total_line_count() const {return total_line_count_; }
protected:
virtual int prepare_execute() = 0;
virtual int get_next_task_handle(TaskHandle *&handle) = 0;
@ -382,6 +389,7 @@ private:
ObParallelTaskController task_controller_;
ObConcurrentFixedCircularArray<TaskHandle *> handle_reserve_queue_;
common::ObSEArray<TaskHandle *, 64> handle_resource_; // 用于释放资源
int64_t total_line_count_;
bool is_inited_;
private:
DISALLOW_COPY_AND_ASSIGN(FileLoadExecutor);
@ -413,7 +421,6 @@ private:
DataBuffer expr_buffer_;
DataReader data_reader_;
int32_t next_session_id_;
int64_t total_line_count_;
DISALLOW_COPY_AND_ASSIGN(LargeFileLoadExecutor);
};