From 89c55191933c8658f1a0fd3aa779783f6919f822 Mon Sep 17 00:00:00 2001 From: obdev Date: Fri, 9 Feb 2024 08:28:12 +0000 Subject: [PATCH] [CP] [CP] [Bugfix] cherry-pick bugfix from 421 for obkv --- src/libtable/test/ob_batch_execute_test.cpp | 21 ++- .../table/ob_htable_filter_operator.cpp | 1 + .../table/ob_htable_filter_parser.cpp | 1 + src/observer/table/ob_htable_filters.cpp | 1 + src/observer/table/ob_table_aggregation.cpp | 41 ++++ .../ob_table_batch_execute_processor.cpp | 36 ++-- src/observer/table/ob_table_cache.cpp | 5 +- src/observer/table/ob_table_cg_service.cpp | 72 ++++++- src/observer/table/ob_table_cg_service.h | 3 + src/observer/table/ob_table_context.cpp | 178 ++++++++++++------ src/observer/table/ob_table_context.h | 42 ++++- src/observer/table/ob_table_end_trans_cb.cpp | 4 +- .../table/ob_table_execute_processor.cpp | 31 +-- .../table/ob_table_execute_processor.h | 22 ++- src/observer/table/ob_table_filter.cpp | 1 + .../table/ob_table_modify_executor.cpp | 33 ++++ src/observer/table/ob_table_modify_executor.h | 1 + src/observer/table/ob_table_move_response.cpp | 35 ++-- src/observer/table/ob_table_op_wrapper.cpp | 78 +++++++- src/observer/table/ob_table_op_wrapper.h | 6 + .../ob_table_query_and_mutate_processor.cpp | 36 +--- .../ob_table_query_and_mutate_processor.h | 20 ++ src/observer/table/ob_table_query_common.cpp | 1 + .../table/ob_table_query_processor.cpp | 1 + .../table/ob_table_query_sync_processor.cpp | 1 + src/observer/table/ob_table_rpc_processor.cpp | 22 ++- src/observer/table/ob_table_session_pool.cpp | 49 +++-- src/observer/table/ob_table_session_pool.h | 13 +- src/share/object/ob_obj_cast.cpp | 13 +- src/share/table/ob_table.cpp | 78 +++++--- src/share/table/ob_table.h | 22 +++ .../observer/table/test_create_executor.cpp | 36 ++-- .../observer/table/test_table_sess_pool.cpp | 6 +- 33 files changed, 663 insertions(+), 247 deletions(-) diff --git a/src/libtable/test/ob_batch_execute_test.cpp b/src/libtable/test/ob_batch_execute_test.cpp index 2da78fb89..68509fdd5 100644 --- a/src/libtable/test/ob_batch_execute_test.cpp +++ b/src/libtable/test/ob_batch_execute_test.cpp @@ -714,7 +714,7 @@ TEST_F(TestBatchExecute, column_type_check) ASSERT_EQ(OB_SUCCESS, entity->set_property(CTINYINT, value)); ObTableOperation table_operation = ObTableOperation::insert(*entity); ASSERT_EQ(OB_SUCCESS, the_table->execute(table_operation, r)); - ASSERT_EQ(OB_ERR_COLLATION_MISMATCH, r.get_errno()); + ASSERT_EQ(OB_KV_COLLATION_MISMATCH, r.get_errno()); } { // case: insert + rowkey + int @@ -728,7 +728,7 @@ TEST_F(TestBatchExecute, column_type_check) ASSERT_EQ(OB_SUCCESS, entity->set_property(CTINYINT, value)); ObTableOperation table_operation = ObTableOperation::insert(*entity); ASSERT_EQ(OB_SUCCESS, the_table->execute(table_operation, r)); - ASSERT_EQ(OB_OBJ_TYPE_ERROR, r.get_errno()); + ASSERT_EQ(OB_KV_COLUMN_TYPE_NOT_MATCH, r.get_errno()); } { // case: insert + rowkey + too long @@ -845,6 +845,7 @@ TEST_F(TestBatchExecute, column_type_check) ASSERT_EQ(OB_SUCCESS, entity->set_property(ObString::make_string("cblob"), value)); int64_t now = ObTimeUtility::current_time(); + ObTimeConverter::round_datetime(0, now); value.set_timestamp(now); ASSERT_EQ(OB_SUCCESS, entity->set_property(ObString::make_string("ctimestamp"), value)); value.set_datetime(now); @@ -877,7 +878,7 @@ TEST_F(TestBatchExecute, column_type_check) ASSERT_EQ(OB_SUCCESS, entity->add_rowkey_value(pk2)); ObTableOperation table_operation = ObTableOperation::retrieve(*entity); ASSERT_EQ(OB_SUCCESS, the_table->execute(table_operation, r)); - ASSERT_EQ(OB_ERR_COLLATION_MISMATCH, r.get_errno()); + ASSERT_EQ(OB_KV_COLLATION_MISMATCH, r.get_errno()); } { // case: replace + rowkey + collation @@ -891,7 +892,7 @@ TEST_F(TestBatchExecute, column_type_check) ASSERT_EQ(OB_SUCCESS, entity->set_property(CTINYINT, value)); ObTableOperation table_operation = ObTableOperation::replace(*entity); ASSERT_EQ(OB_SUCCESS, the_table->execute(table_operation, r)); - ASSERT_EQ(OB_ERR_COLLATION_MISMATCH, r.get_errno()); + ASSERT_EQ(OB_KV_COLLATION_MISMATCH, r.get_errno()); } { @@ -921,7 +922,7 @@ TEST_F(TestBatchExecute, column_type_check) ASSERT_EQ(OB_SUCCESS, entity->set_property(CTINYINT, value)); ObTableOperation table_operation = ObTableOperation::insert_or_update(*entity); ASSERT_EQ(OB_SUCCESS, the_table->execute(table_operation, r)); - ASSERT_EQ(OB_ERR_COLLATION_MISMATCH, r.get_errno()); + ASSERT_EQ(OB_KV_COLLATION_MISMATCH, r.get_errno()); } { @@ -951,7 +952,7 @@ TEST_F(TestBatchExecute, column_type_check) ASSERT_EQ(OB_SUCCESS, entity->set_property(CTINYINT, value)); ObTableOperation table_operation = ObTableOperation::del(*entity); ASSERT_EQ(OB_SUCCESS, the_table->execute(table_operation, r)); - ASSERT_EQ(OB_ERR_COLLATION_MISMATCH, r.get_errno()); + ASSERT_EQ(OB_KV_COLLATION_MISMATCH, r.get_errno()); } { // case: update + rowkey + collation @@ -965,7 +966,7 @@ TEST_F(TestBatchExecute, column_type_check) ASSERT_EQ(OB_SUCCESS, entity->set_property(CTINYINT, value)); ObTableOperation table_operation = ObTableOperation::update(*entity); ASSERT_EQ(OB_SUCCESS, the_table->execute(table_operation, r)); - ASSERT_EQ(OB_ERR_COLLATION_MISMATCH, r.get_errno()); + ASSERT_EQ(OB_KV_COLLATION_MISMATCH, r.get_errno()); } { @@ -5096,7 +5097,7 @@ TEST_F(TestBatchExecute, check_scan_range) ASSERT_EQ(OB_SUCCESS, query.add_scan_range(range)); ASSERT_EQ(OB_SUCCESS, query.set_scan_index(ObString::make_string("primary"))); - ASSERT_EQ(OB_ERR_UNEXPECTED, the_table->execute_query(query, iter)); // wrong rowkey size + ASSERT_EQ(OB_KV_SCAN_RANGE_MISSING, the_table->execute_query(query, iter)); // wrong rowkey size } // case 2: scan by primary key, but key objs type is invalid @@ -5119,7 +5120,7 @@ TEST_F(TestBatchExecute, check_scan_range) ASSERT_EQ(OB_SUCCESS, query.add_scan_range(range)); ASSERT_EQ(OB_SUCCESS, query.set_scan_index(ObString::make_string("primary"))); - ASSERT_EQ(OB_OBJ_TYPE_ERROR, the_table->execute_query(query, iter)); // wrong rowkey type + ASSERT_EQ(OB_KV_COLUMN_TYPE_NOT_MATCH, the_table->execute_query(query, iter)); // wrong rowkey type } // case 3: scan by primary key, but collation type is invalid @@ -5143,7 +5144,7 @@ TEST_F(TestBatchExecute, check_scan_range) ASSERT_EQ(OB_SUCCESS, query.add_scan_range(range)); ASSERT_EQ(OB_SUCCESS, query.set_scan_index(ObString::make_string("primary"))); - ASSERT_EQ(OB_ERR_COLLATION_MISMATCH, the_table->execute_query(query, iter)); // wrong collation type + ASSERT_EQ(OB_KV_COLLATION_MISMATCH, the_table->execute_query(query, iter)); // wrong collation type } // case 4: scan by primary key, but accuracy is invalid diff --git a/src/observer/table/ob_htable_filter_operator.cpp b/src/observer/table/ob_htable_filter_operator.cpp index 874fe915d..b88db0bbf 100644 --- a/src/observer/table/ob_htable_filter_operator.cpp +++ b/src/observer/table/ob_htable_filter_operator.cpp @@ -736,6 +736,7 @@ int ObHTableRowIterator::get_next_result(ObTableQueryResult *&out_result) ObHTableMatchCode match_code = ObHTableMatchCode::DONE_SCAN; // initialize if (ObQueryFlag::Reverse == scan_order_ && (-1 != limit_per_row_per_cf_ || 0 != offset_per_row_per_cf_)) { ret = OB_NOT_SUPPORTED; + LOG_USER_ERROR(OB_NOT_SUPPORTED, "set limit_per_row_per_cf_ and offset_per_row_per_cf_ in reverse scan"); LOG_WARN("server don't support set limit_per_row_per_cf_ and offset_per_row_per_cf_ in reverse scan yet", K(ret), K(scan_order_), K(limit_per_row_per_cf_), K(offset_per_row_per_cf_)); } diff --git a/src/observer/table/ob_htable_filter_parser.cpp b/src/observer/table/ob_htable_filter_parser.cpp index 47b4386f5..f1325a0dd 100644 --- a/src/observer/table/ob_htable_filter_parser.cpp +++ b/src/observer/table/ob_htable_filter_parser.cpp @@ -120,6 +120,7 @@ int ObHTableFilterParser::create_comparator(const SimpleString &bytes, hfilter:: //comparator = OB_NEWx(hfilter::RegexStringComparator, allocator_, comparator_value); LOG_WARN("regexstring comparator not supported yet", K(ret)); ret = OB_NOT_SUPPORTED; + LOG_USER_ERROR(OB_NOT_SUPPORTED, "regexstring comparator"); } else if (comparator_type == SUB_STRING_TYPE) { comparator = OB_NEWx(hfilter::SubStringComparator, allocator_, comparator_value); } else { diff --git a/src/observer/table/ob_htable_filters.cpp b/src/observer/table/ob_htable_filters.cpp index 651e0a476..aa67a2074 100644 --- a/src/observer/table/ob_htable_filters.cpp +++ b/src/observer/table/ob_htable_filters.cpp @@ -167,6 +167,7 @@ int RegexStringComparator::compare_to(const ObString &b) { // @todo UNUSED(b); + LOG_USER_ERROR(OB_NOT_SUPPORTED, "regexstring comparator"); LOG_WARN_RET(OB_NOT_SUPPORTED, "regexstring comparator not supported yet"); return 0; } diff --git a/src/observer/table/ob_table_aggregation.cpp b/src/observer/table/ob_table_aggregation.cpp index f23a0bf04..91ba97435 100644 --- a/src/observer/table/ob_table_aggregation.cpp +++ b/src/observer/table/ob_table_aggregation.cpp @@ -171,6 +171,10 @@ int ObTableAggCalculator::aggregate_sum(uint64_t idx, const ObNewRow &row) int64_t res = 0; if (sql::ObExprAdd::is_add_out_of_range(sum_val.get_int(), cur_value.get_int(), res)) { ret = OB_DATA_OUT_OF_RANGE; + if (idx < agg_columns_.count()) { + int64_t row_num = 0; + LOG_USER_ERROR(OB_DATA_OUT_OF_RANGE, agg_columns_.at(idx).length(), agg_columns_.at(idx).ptr(), row_num); + } LOG_WARN("data out of range", K(ret), K(sum_val), K(cur_value)); } else { sum_val.set_int(res); @@ -190,6 +194,10 @@ int ObTableAggCalculator::aggregate_sum(uint64_t idx, const ObNewRow &row) uint64_t res = 0; if (sql::ObExprAdd::is_add_out_of_range(sum_val.get_uint64(), cur_value.get_uint64(), res)) { ret = OB_DATA_OUT_OF_RANGE; + if (idx < agg_columns_.count()) { + int64_t row_num = 0; + LOG_USER_ERROR(OB_DATA_OUT_OF_RANGE, agg_columns_.at(idx).length(), agg_columns_.at(idx).ptr(), row_num); + } LOG_WARN("data out of range", K(ret), K(sum_val), K(cur_value)); } else { sum_val.set_uint64(res); @@ -206,6 +214,10 @@ int ObTableAggCalculator::aggregate_sum(uint64_t idx, const ObNewRow &row) double res = sum_val.get_double() + (double)cur_value.get_float(); if (INFINITY == res) { ret = OB_DATA_OUT_OF_RANGE; + if (idx < agg_columns_.count()) { + int64_t row_num = 0; + LOG_USER_ERROR(OB_DATA_OUT_OF_RANGE, agg_columns_.at(idx).length(), agg_columns_.at(idx).ptr(), row_num); + } LOG_WARN("data out of range", K(ret), K(sum_val), K(cur_value)); } else { sum_val.set_double(res); @@ -222,6 +234,10 @@ int ObTableAggCalculator::aggregate_sum(uint64_t idx, const ObNewRow &row) double res = sum_val.get_double() + cur_value.get_double(); if (INFINITY == res) { ret = OB_DATA_OUT_OF_RANGE; + if (idx < agg_columns_.count()) { + int64_t row_num = 0; + LOG_USER_ERROR(OB_DATA_OUT_OF_RANGE, agg_columns_.at(idx).length(), agg_columns_.at(idx).ptr(), row_num); + } LOG_WARN("data out of range", K(ret), K(sum_val), K(cur_value)); } else { sum_val.set_double(res); @@ -231,6 +247,10 @@ int ObTableAggCalculator::aggregate_sum(uint64_t idx, const ObNewRow &row) } default: { ret = OB_NOT_SUPPORTED; + char err_msg[128]; + const char *type_str = ob_obj_type_str(sum_type); + (void)sprintf(err_msg, "%s type for sum aggregation", type_str); + LOG_USER_ERROR(OB_NOT_SUPPORTED, err_msg); LOG_WARN("this data type does not support aggregate sum operation", K(ret), K(sum_type)); } } @@ -257,6 +277,10 @@ int ObTableAggCalculator::aggregate_avg(uint64_t idx, const ObNewRow &row) double res = sum + (double)cur_value.get_int(); if (INFINITY == res) { ret = OB_DATA_OUT_OF_RANGE; + if (idx < agg_columns_.count()) { + int64_t row_num = 0; + LOG_USER_ERROR(OB_DATA_OUT_OF_RANGE, agg_columns_.at(idx).length(), agg_columns_.at(idx).ptr(), row_num); + } LOG_WARN("data out of range", K(ret), K(sum), K(cur_value)); } else { sum = res; @@ -272,6 +296,10 @@ int ObTableAggCalculator::aggregate_avg(uint64_t idx, const ObNewRow &row) double res = sum + (double)cur_value.get_uint64(); if (INFINITY == res) { ret = OB_DATA_OUT_OF_RANGE; + if (idx < agg_columns_.count()) { + int64_t row_num = 0; + LOG_USER_ERROR(OB_DATA_OUT_OF_RANGE, agg_columns_.at(idx).length(), agg_columns_.at(idx).ptr(), row_num); + } LOG_WARN("data out of range", K(ret), K(sum), K(cur_value)); } else { sum = res; @@ -284,6 +312,10 @@ int ObTableAggCalculator::aggregate_avg(uint64_t idx, const ObNewRow &row) double res = sum + (double)cur_value.get_float(); if (INFINITY == res) { ret = OB_DATA_OUT_OF_RANGE; + if (idx < agg_columns_.count()) { + int64_t row_num = 0; + LOG_USER_ERROR(OB_DATA_OUT_OF_RANGE, agg_columns_.at(idx).length(), agg_columns_.at(idx).ptr(), row_num); + } LOG_WARN("data out of range", K(ret), K(sum), K(cur_value)); } else { sum = res; @@ -296,6 +328,10 @@ int ObTableAggCalculator::aggregate_avg(uint64_t idx, const ObNewRow &row) double res = sum + cur_value.get_double(); if (INFINITY == res) { ret = OB_DATA_OUT_OF_RANGE; + if (idx < agg_columns_.count()) { + int64_t row_num = 0; + LOG_USER_ERROR(OB_DATA_OUT_OF_RANGE, agg_columns_.at(idx).length(), agg_columns_.at(idx).ptr(), row_num); + } LOG_WARN("data out of range", K(ret), K(sum), K(cur_value)); } else { sum = res; @@ -304,6 +340,10 @@ int ObTableAggCalculator::aggregate_avg(uint64_t idx, const ObNewRow &row) } default: { ret = OB_NOT_SUPPORTED; + char err_msg[128]; + const char *type_str = ob_obj_type_str(avg_type); + (void)sprintf(err_msg, "%s type for avg aggregation", type_str); + LOG_USER_ERROR(OB_NOT_SUPPORTED, err_msg); LOG_WARN("this data type does not support aggregate avg operation", K(ret), K(avg_type)); } } @@ -352,6 +392,7 @@ int ObTableAggCalculator::aggregate(const ObNewRow &row) { } default: { ret = OB_NOT_SUPPORTED; + LOG_USER_ERROR(OB_NOT_SUPPORTED, "invalid aggregation type"); LOG_WARN("unexpected agg type", K(ret), K(agg_type)); break; } diff --git a/src/observer/table/ob_table_batch_execute_processor.cpp b/src/observer/table/ob_table_batch_execute_processor.cpp index 00b7f3be6..01c10f237 100644 --- a/src/observer/table/ob_table_batch_execute_processor.cpp +++ b/src/observer/table/ob_table_batch_execute_processor.cpp @@ -67,6 +67,7 @@ int ObTableBatchExecuteP::check_arg() if (!(arg_.consistency_level_ == ObTableConsistencyLevel::STRONG || arg_.consistency_level_ == ObTableConsistencyLevel::EVENTUAL)) { ret = OB_NOT_SUPPORTED; + LOG_USER_ERROR(OB_NOT_SUPPORTED, "invalid consistency level"); LOG_WARN("some options not supported yet", K(ret), "consistency_level", arg_.consistency_level_); } @@ -79,6 +80,7 @@ int ObTableBatchExecuteP::check_arg2() const if (arg_.returning_rowkey() || arg_.returning_affected_entity()) { ret = OB_NOT_SUPPORTED; + LOG_USER_ERROR(OB_NOT_SUPPORTED, "returning rowkey or returning affected entity"); LOG_WARN("some options not supported yet", K(ret), "returning_rowkey", arg_.returning_rowkey(), "returning_affected_entity", arg_.returning_affected_entity()); @@ -182,6 +184,7 @@ int ObTableBatchExecuteP::try_process() LOG_WARN("fail to check index supported", K(ret), K(table_id_)); } else if (OB_UNLIKELY(!is_index_supported)) { ret = OB_NOT_SUPPORTED; + LOG_USER_ERROR(OB_NOT_SUPPORTED, "global index"); LOG_WARN("index type is not supported by table api", K(ret)); } else { if (batch_operation.is_readonly()) { @@ -345,10 +348,8 @@ int ObTableBatchExecuteP::htable_put() LOG_WARN("fail to get htable lock handle", K(ret)); } else if (OB_FAIL(ObHTableUtils::lock_htable_rows(table_id, batch_operation, *lock_handle, ObHTableLockMode::SHARED))) { LOG_WARN("fail to lock htable rows", K(ret), K(table_id), K(batch_operation)); - } else if (OB_FAIL(ObTableOpWrapper::get_or_create_spec(tb_ctx_, - cache_guard, - spec))) { - LOG_WARN("fail to get or create spec", K(ret)); + } else if (OB_FAIL(ObTableOpWrapper::get_insert_up_spec(tb_ctx_, cache_guard, spec))) { + LOG_WARN("fail to get insert up spec", K(ret)); } else { for (int64_t i = 0; OB_SUCC(ret) && i < batch_operation.count(); ++i) { const ObTableOperation &table_operation = batch_operation.at(i); @@ -368,7 +369,7 @@ int ObTableBatchExecuteP::htable_put() ObTableOperationResult op_result; op_result.set_type(ObTableOperationType::INSERT_OR_UPDATE); op_result.set_entity(result_entity_); - op_result.set_errno(ret); + op_result.set_err(ret); op_result.set_affected_rows(affected_rows); result_.reset(); if (OB_FAIL(result_.push_back(op_result))) { @@ -440,7 +441,7 @@ int ObTableBatchExecuteP::multi_get() } } op_result.set_entity(*result_entity); - op_result.set_errno(ret); + op_result.set_err(ret); op_result.set_type(tb_ctx_.get_opertion_type()); if (OB_FAIL(result_.push_back(op_result))) { LOG_WARN("fail to push back op result", K(ret), K(i)); @@ -573,7 +574,7 @@ int ObTableBatchExecuteP::htable_delete() ObTableOperationResult single_op_result; single_op_result.set_entity(result_entity_); single_op_result.set_type(ObTableOperationType::DEL); - single_op_result.set_errno(ret); + single_op_result.set_err(ret); single_op_result.set_affected_rows(affected_rows); result_.reset(); if (OB_FAIL(result_.push_back(single_op_result))) { @@ -610,10 +611,8 @@ int ObTableBatchExecuteP::multi_insert() LOG_WARN("fail to start readonly transaction", K(ret)); } else if (OB_FAIL(tb_ctx_.init_trans(get_trans_desc(), get_tx_snapshot()))) { LOG_WARN("fail to init trans", K(ret), K(tb_ctx_)); - } else if (OB_FAIL(ObTableOpWrapper::get_or_create_spec(tb_ctx_, - cache_guard, - spec))) { - LOG_WARN("fail to get or create spec", K(ret)); + } else if (OB_FAIL(ObTableOpWrapper::get_insert_spec(tb_ctx_, cache_guard, spec))) { + LOG_WARN("fail to get or create insert spec", K(ret)); } else { for (int64_t i = 0; OB_SUCC(ret) && i < batch_operation.count(); ++i) { const ObTableOperation &table_operation = batch_operation.at(i); @@ -766,7 +765,7 @@ int ObTableBatchExecuteP::batch_execute_internal(const ObTableBatchOperation &ba ret = process_get(op_tb_ctx, op_result); break; case ObTableOperationType::INSERT: - ret = ObTableOpWrapper::process_op(op_tb_ctx, op_result); + ret = ObTableOpWrapper::process_insert_op(op_tb_ctx, op_result); break; case ObTableOperationType::DEL: ret = ObTableOpWrapper::process_op(op_tb_ctx, op_result); @@ -774,15 +773,13 @@ int ObTableBatchExecuteP::batch_execute_internal(const ObTableBatchOperation &ba case ObTableOperationType::UPDATE: ret = ObTableOpWrapper::process_op(op_tb_ctx, op_result); break; - case ObTableOperationType::INSERT_OR_UPDATE: - ret = ObTableOpWrapper::process_op(op_tb_ctx, op_result); - break; case ObTableOperationType::REPLACE: ret = ObTableOpWrapper::process_op(op_tb_ctx, op_result); break; + case ObTableOperationType::INSERT_OR_UPDATE: case ObTableOperationType::APPEND: case ObTableOperationType::INCREMENT: - ret = ObTableOpWrapper::process_op(op_tb_ctx, op_result); + ret = ObTableOpWrapper::process_insert_up_op(op_tb_ctx, op_result); break; default: ret = OB_ERR_UNEXPECTED; @@ -914,7 +911,7 @@ int ObTableBatchExecuteP::process_get(table::ObTableCtx &op_tb_ctx, result_entity))) { LOG_WARN("fail to cosntruct result entity", K(ret)); } - result.set_errno(ret); + result.set_err(ret); result.set_type(op_tb_ctx.get_opertion_type()); return ret; } @@ -978,6 +975,7 @@ int ObTableBatchExecuteP::htable_mutate_row() default: { ret = OB_NOT_SUPPORTED; + LOG_USER_ERROR(OB_NOT_SUPPORTED, "mutation type"); LOG_WARN("not supported mutation type", K(ret), K(table_operation)); break; } @@ -1040,7 +1038,7 @@ int ObTableBatchExecuteP::execute_htable_delete(const ObTableBatchOperation &bat ObTableOperationResult single_op_result; single_op_result.set_entity(result_entity_); single_op_result.set_type(ObTableOperationType::DEL); - single_op_result.set_errno(ret); + single_op_result.set_err(ret); single_op_result.set_affected_rows(affected_rows); result_.reset(); if (OB_FAIL(result_.push_back(single_op_result))) { @@ -1066,7 +1064,7 @@ int ObTableBatchExecuteP::execute_htable_put(const ObTableBatchOperation &batch_ LOG_WARN("fail to init table ctx", K(ret)); } else if (OB_FAIL(tb_ctx.init_trans(get_trans_desc(), get_tx_snapshot()))) { LOG_WARN("fail to init trans", K(ret), K(tb_ctx)); - } else if (OB_FAIL(ObTableOpWrapper::process_op(tb_ctx, single_op_result))) { + } else if (OB_FAIL(ObTableOpWrapper::process_insert_up_op(tb_ctx, single_op_result))) { LOG_WARN("fail to process insertup op", K(ret)); } else if (FALSE_IT(result_.reset())) { } else if (OB_FAIL(result_.push_back(single_op_result))) { diff --git a/src/observer/table/ob_table_cache.cpp b/src/observer/table/ob_table_cache.cpp index 7e4637b0b..921abef1d 100644 --- a/src/observer/table/ob_table_cache.cpp +++ b/src/observer/table/ob_table_cache.cpp @@ -229,7 +229,10 @@ int ObTableApiCacheGuard::append_column_ids(const ObITableEntity *entity, const ObColumnSchemaV2 *col_schema = nullptr; for (int64_t i = 0; i < properties_names.count(); i++) { if (OB_ISNULL(col_schema = table_schema->get_column_schema(properties_names.at(i)))) { - ret = OB_ERR_COLUMN_NOT_FOUND; + ret = OB_ERR_BAD_FIELD_ERROR; + const ObString &column = properties_names.at(i); + const ObString &table = table_schema->get_table_name_str(); + LOG_USER_ERROR(OB_ERR_BAD_FIELD_ERROR, column.length(), column.ptr(), table.length(), table.ptr()); LOG_WARN("fail to get column schema", K(ret), K(i), K(properties_names.at(i))); } else if (OB_FAIL(op_column_ids.push_back(col_schema->get_column_id()))) { LOG_WARN("fail to push back column id", K(ret), K(i), K(col_schema->get_column_id())); diff --git a/src/observer/table/ob_table_cg_service.cpp b/src/observer/table/ob_table_cg_service.cpp index 82eb9b60e..508b635f6 100644 --- a/src/observer/table/ob_table_cg_service.cpp +++ b/src/observer/table/ob_table_cg_service.cpp @@ -447,6 +447,7 @@ int ObTableExprCgService::generate_assign_expr(ObTableCtx &ctx, ObTableAssignmen } else if (item->is_generated_column_) { if (!item->is_stored_generated_column_) { ret = OB_NOT_SUPPORTED; + LOG_USER_ERROR(OB_NOT_SUPPORTED, "assign virtual generated column"); LOG_WARN("assign virtual generated column is not supported", K(ret)); } else { if (OB_FAIL(build_generated_column_expr(ctx, *item, item->generated_expr_str_, tmp_expr))) { @@ -1043,10 +1044,16 @@ int ObTableExprCgService::refresh_properties_exprs_frame(ObTableCtx &ctx, bool not_found = (OB_SEARCH_NOT_FOUND == entity.get_property(item.column_name_, prop_value)); if (not_found) { obj = &item.default_value_; + if (!item.is_nullable_ && !item.is_auto_increment_ && obj->is_null()) { + ret = OB_ERR_NO_DEFAULT_FOR_FIELD; + LOG_USER_ERROR(OB_ERR_NO_DEFAULT_FOR_FIELD, to_cstring(item.column_name_)); + LOG_WARN("column can not be null", K(ret), K(item)); + } } else { obj = &prop_value; } - if (T_FUN_SYS_AUTOINC_NEXTVAL == expr->type_) { + if (OB_FAIL(ret)) { + } else if (T_FUN_SYS_AUTOINC_NEXTVAL == expr->type_) { ObObj null_obj; null_obj.set_null(); obj = not_found ? &null_obj : &prop_value; @@ -1145,6 +1152,7 @@ int ObTableExprCgService::refresh_assign_exprs_frame(ObTableCtx &ctx, LOG_WARN("unexpected assign projector_index_", K(ret), K(new_row), K(assign.column_item_)); } else if (assign.column_item_->is_virtual_generated_column_) { ret = OB_NOT_SUPPORTED; + LOG_USER_ERROR(OB_NOT_SUPPORTED, "update virtual generated column"); LOG_WARN("virtual generated column not support to update", K(ret), K(assign)); } else { // on update current timestamp will not find value @@ -1214,6 +1222,56 @@ int ObTableDmlCgService::replace_exprs_with_dependant(ObTableCtx &ctx, return ret; } +/* + add column infos for check nullable before insert new_row to das. +*/ +int ObTableDmlCgService::add_all_column_infos(ObTableCtx &ctx, + ObIAllocator &allocator, + ColContentFixedArray &column_infos) +{ + int ret = OB_SUCCESS; + ObSEArray column_ids; + ObIArray &items = ctx.get_column_items(); + const ObTableSchema *table_schema = ctx.get_table_schema(); + + if (OB_ISNULL(table_schema)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("table schema is null", K(ret)); + } else if (OB_FAIL(table_schema->get_column_ids(column_ids))) { + LOG_WARN("fail to get column ids", K(ret)); + } else if (OB_FAIL(column_infos.init(items.count()))) { + LOG_WARN("fail to init column infos capacity", K(ret), K(items.count())); + } + + for (int64_t i= 0; OB_SUCC(ret) && i < items.count(); i++) { + const ObTableColumnItem &item = items.at(i); + ObColumnRefRawExpr *column_expr = item.expr_; + if (OB_ISNULL(column_expr)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("column ref expr is null", K(ret)); + } else { + ColumnContent column_content; + int64_t idx = 0; + column_content.auto_filled_timestamp_ = column_expr->get_result_type().has_result_flag(ON_UPDATE_NOW_FLAG); + column_content.is_nullable_ = !column_expr->get_result_type().is_not_null_for_write(); + column_content.is_predicate_column_ = false; + column_content.is_implicit_ = false; + if (OB_FAIL(ob_write_string(allocator, column_expr->get_column_name(), column_content.column_name_))) { + LOG_WARN("fail to copy column name", K(ret), K(column_expr->get_column_name())); + } else if (!has_exist_in_array(column_ids, column_expr->get_column_id(), &idx)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("column not exists in schema columns", K(ret), KPC(column_expr), K(column_ids)); + } else if (FALSE_IT(column_content.projector_index_ = static_cast(idx))) { + //do nothing + } else if (OB_FAIL(column_infos.push_back(column_content))) { + LOG_WARN("fail to store colum content to column infos", K(ret), K(column_content)); + } + } + } + + return ret; +} + /* genreate insert ctdef - replace exprs with depenedant expr if there are generated column. @@ -1238,6 +1296,8 @@ int ObTableDmlCgService::generate_insert_ctdef(ObTableCtx &ctx, LOG_WARN("fail to assign new row", K(ret)); } else if (OB_FAIL(generate_base_ctdef(ctx, ins_ctdef, old_row, new_row))) { LOG_WARN("fail to generate dml base ctdef", K(ret)); + } else if (OB_FAIL(add_all_column_infos(ctx, allocator, ins_ctdef.column_infos_))) { + LOG_WARN("fail to add all column infos", K(ret)); } else if (OB_FAIL(generate_das_ins_ctdef(ctx, ctx.get_ref_table_id(), ins_ctdef.das_ctdef_, @@ -2005,7 +2065,6 @@ int ObTableDmlCgService::generate_das_base_ctdef(uint64_t index_tid, base_ctdef.is_ignore_ = false; // insert ignore base_ctdef.is_batch_stmt_ = false; base_ctdef.is_table_api_ = true; - int64_t binlog_row_image = share::ObBinlogRowImage::FULL; ObSQLSessionInfo &session = ctx.get_session_info(); if (OB_FAIL(generate_column_info(index_tid, ctx, base_ctdef))) { @@ -2017,11 +2076,9 @@ int ObTableDmlCgService::generate_das_base_ctdef(uint64_t index_tid, LOG_WARN("fail to get table schema version", K(ret)); } else if (OB_FAIL(convert_table_param(ctx, base_ctdef))) { LOG_WARN("fail to convert table dml param", K(ret)); - } else if (OB_FAIL(session.get_binlog_row_image(binlog_row_image))) { - LOG_WARN("fail to get binlog row image", K(ret)); } else { base_ctdef.tz_info_ = *session.get_tz_info_wrap().get_time_zone_info(); - base_ctdef.is_total_quantity_log_ = (share::ObBinlogRowImage::FULL == binlog_row_image); + base_ctdef.is_total_quantity_log_ = ctx.is_total_quantity_log(); base_ctdef.encrypt_meta_.reset(); } @@ -2334,8 +2391,9 @@ int ObTableTscCgService::replace_gen_col_exprs(const ObTableCtx &ctx, for (int64_t i = 0; i < access_exprs.count() && OB_SUCC(ret); i++) { ObRawExpr *expr = access_exprs.at(i); if (!expr->is_column_ref_expr()) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected expr type", K(ret), K(*expr)); + if (OB_FAIL(res_access_expr.push_back(expr))) { + LOG_WARN("fail to push back expr", K(ret)); + } } else if (FALSE_IT(ref_expr = static_cast(expr))) { } else if (!ref_expr->is_virtual_generated_column()) { if (OB_FAIL(res_access_expr.push_back(expr))) { diff --git a/src/observer/table/ob_table_cg_service.h b/src/observer/table/ob_table_cg_service.h index 46a2b2c90..0db5ea19c 100644 --- a/src/observer/table/ob_table_cg_service.h +++ b/src/observer/table/ob_table_cg_service.h @@ -235,6 +235,9 @@ private: sql::ObRowkeyCstCtdefArray &cst_ctdefs); static int replace_exprs_with_dependant(ObTableCtx &ctx, common::ObIArray &dst_exprs); + static int add_all_column_infos(ObTableCtx &ctx, + common::ObIAllocator &allocator, + sql::ColContentFixedArray &column_infos); private: DISALLOW_COPY_AND_ASSIGN(ObTableDmlCgService); }; diff --git a/src/observer/table/ob_table_context.cpp b/src/observer/table/ob_table_context.cpp index 56c38a1f3..440ada0a9 100644 --- a/src/observer/table/ob_table_context.cpp +++ b/src/observer/table/ob_table_context.cpp @@ -19,6 +19,8 @@ #include "sql/engine/expr/ob_expr_lob_utils.h" #include "ob_table_aggregation.h" +using namespace oceanbase::common; + namespace oceanbase { namespace table @@ -75,7 +77,7 @@ int ObTableCtx::init_sess_info(ObTableApiCredential &credential) } int ObTableCtx::init_common(ObTableApiCredential &credential, - const common::ObTabletID &arg_tablet_id, + const ObTabletID &arg_tablet_id, const uint64_t table_id, const int64_t &timeout_ts) { @@ -140,6 +142,7 @@ int ObTableCtx::construct_column_items() item.is_stored_generated_column_ = col_schema->is_stored_generated_column(); item.is_virtual_generated_column_ = col_schema->is_virtual_generated_column(); item.is_auto_increment_ = col_schema->is_autoincrement(); + item.is_nullable_ = col_schema->is_nullable(); item.generated_expr_str_ = item.default_value_.get_string(); item.auto_filled_timestamp_ = col_schema->is_on_update_current_timestamp(); item.rowkey_position_ = col_schema->get_rowkey_position(); @@ -301,8 +304,8 @@ int ObTableCtx::init_physical_plan_ctx(int64_t timeout_ts, int64_t tenant_schema - adjust entity */ int ObTableCtx::init_common(ObTableApiCredential &credential, - const common::ObTabletID &arg_tablet_id, - const common::ObString &arg_table_name, + const ObTabletID &arg_tablet_id, + const ObString &arg_table_name, const int64_t &timeout_ts) { int ret = OB_SUCCESS; @@ -317,6 +320,11 @@ int ObTableCtx::init_common(ObTableApiCredential &credential, false, /* is_index */ table_schema_))) { LOG_WARN("fail to get table schema", K(ret), K(tenant_id), K(database_id), K(arg_table_name)); + } else if (OB_ISNULL(table_schema_)) { + ret = OB_TABLE_NOT_EXIST; + ObString db(""); + LOG_USER_ERROR(OB_TABLE_NOT_EXIST, db.ptr(), arg_table_name.ptr()); + LOG_WARN("table not exist", K(ret), K(tenant_id), K(database_id), K(arg_table_name)); } else if (OB_FAIL(inner_init_common(credential, arg_tablet_id, arg_table_name, timeout_ts))) { LOG_WARN("fail to inner init common", KR(ret), K(credential), K(arg_tablet_id), K(arg_table_name), K(timeout_ts)); @@ -326,8 +334,8 @@ int ObTableCtx::init_common(ObTableApiCredential &credential, } int ObTableCtx::inner_init_common(ObTableApiCredential &credential, - const common::ObTabletID &arg_tablet_id, - const common::ObString &table_name, + const ObTabletID &arg_tablet_id, + const ObString &table_name, const int64_t &timeout_ts) { int ret = OB_SUCCESS; @@ -353,6 +361,7 @@ int ObTableCtx::inner_init_common(ObTableApiCredential &credential, if (is_scan_) { // 扫描场景使用table_schema上的tablet id,客户端已经做了路由分发 if (table_schema_->is_partitioned_table()) { // 不支持分区表 ret = OB_NOT_SUPPORTED; + LOG_USER_ERROR(OB_NOT_SUPPORTED, "invalid tablet id in partition table when do scan"); LOG_WARN("partitioned table not supported", K(ret), K(table_name)); } else { tablet_id = table_schema_->get_tablet_id(); @@ -380,6 +389,8 @@ int ObTableCtx::inner_init_common(ObTableApiCredential &credential, LOG_WARN("fail to construct column items", K(ret)); } else if (!is_scan_ && OB_FAIL(adjust_entity())) { LOG_WARN("fail to adjust entity", K(ret)); + } else if (OB_FAIL(sess_guard_.get_sess_info().get_binlog_row_image(binlog_row_image_type_))) { + LOG_WARN("fail to get binlog row image", K(ret)); } else { table_name_ = table_name; ref_table_id_ = table_schema_->get_table_id(); @@ -394,11 +405,11 @@ int ObTableCtx::inner_init_common(ObTableApiCredential &credential, return ret; } -// get columns type from index_schema or primary table schema -int ObTableCtx::generate_columns_type(ObIArray &columns_type) +// get columns info from index_schema or primary table schema +int ObTableCtx::generate_column_infos(ObIArray &columns_infos) { int ret = OB_SUCCESS; - ObExprResType tmp_column_type; + ObTableColumnInfo tmp_column_info; const ObColumnSchemaV2 *column_schema = nullptr; if (is_index_scan_) { @@ -407,10 +418,10 @@ int ObTableCtx::generate_columns_type(ObIArray &columns_type) if (OB_ISNULL(column_schema = index_schema_->get_column_schema(index_col_ids_.at(i)))) { ret = OB_SCHEMA_ERROR; LOG_WARN("fail to get column schema", K(ret), K(index_col_ids_.at(i))); - } else if (OB_FAIL(cons_column_type(*column_schema, tmp_column_type))) { - LOG_WARN("fail to cons column type", K(ret)); - } else if (OB_FAIL(columns_type.push_back(tmp_column_type))) { - LOG_WARN("fail to push back column type", K(ret), K(tmp_column_type)); + } else if (OB_FAIL(cons_column_info(*column_schema, tmp_column_info))) { + LOG_WARN("fail to cons column info", K(ret)); + } else if (OB_FAIL(columns_infos.push_back(tmp_column_info))) { + LOG_WARN("fail to push back column info", K(ret), K(tmp_column_info)); } } } else { // primary key @@ -423,10 +434,10 @@ int ObTableCtx::generate_columns_type(ObIArray &columns_type) } else if (OB_ISNULL(column_schema = table_schema_->get_column_schema(column_id))) { ret = OB_SCHEMA_ERROR; LOG_WARN("fail to get column schema", K(ret), K(column_id)); - } else if (OB_FAIL(cons_column_type(*column_schema, tmp_column_type))) { - LOG_WARN("fail to cons column type", K(ret)); - } else if (OB_FAIL(columns_type.push_back(tmp_column_type))) { - LOG_WARN("fail to push back column type", K(ret), K(tmp_column_type)); + } else if (OB_FAIL(cons_column_info(*column_schema, tmp_column_info))) { + LOG_WARN("fail to cons column info", K(ret)); + } else if (OB_FAIL(columns_infos.push_back(tmp_column_info))) { + LOG_WARN("fail to push back column info", K(ret), K(tmp_column_info)); } } } @@ -434,25 +445,29 @@ int ObTableCtx::generate_columns_type(ObIArray &columns_type) return ret; } -int ObTableCtx::cons_column_type(const ObColumnSchemaV2 &column_schema, - ObExprResType &column_type) +int ObTableCtx::cons_column_info(const ObColumnSchemaV2 &column_schema, + ObTableColumnInfo &column_info) { int ret = OB_SUCCESS; - column_type.set_type(column_schema.get_data_type()); - column_type.set_result_flag(ObRawExprUtils::calc_column_result_flag(column_schema)); + column_info.type_.set_type(column_schema.get_data_type()); + column_info.type_.set_result_flag(ObRawExprUtils::calc_column_result_flag(column_schema)); + column_info.column_name_ = column_schema.get_column_name_str(); + column_info.is_auto_inc_ = column_schema.is_autoincrement(); + column_info.is_nullable_ = column_schema.is_nullable(); if (ob_is_string_type(column_schema.get_data_type()) || ob_is_json(column_schema.get_data_type())) { - column_type.set_collation_type(column_schema.get_collation_type()); - column_type.set_collation_level(CS_LEVEL_IMPLICIT); + column_info.type_.set_collation_type(column_schema.get_collation_type()); + column_info.type_.set_collation_level(CS_LEVEL_IMPLICIT); } else { - column_type.set_collation_type(CS_TYPE_BINARY); - column_type.set_collation_level(CS_LEVEL_NUMERIC); + column_info.type_.set_collation_type(CS_TYPE_BINARY); + column_info.type_.set_collation_level(CS_LEVEL_NUMERIC); } const ObAccuracy &accuracy = column_schema.get_accuracy(); - column_type.set_accuracy(accuracy); - const bool is_zerofill = column_type.has_result_flag(ZEROFILL_FLAG); + column_info.type_.set_accuracy(accuracy); + const bool is_zerofill = column_info.type_.has_result_flag(ZEROFILL_FLAG); if (is_zerofill) { ret = OB_NOT_SUPPORTED; + LOG_USER_ERROR(OB_NOT_SUPPORTED, "modifing column with ZEROFILL flag"); LOG_WARN("modifing column with ZEROFILL flag is not supported", K(ret), K(column_schema)); } @@ -496,25 +511,28 @@ int ObTableCtx::read_real_lob(ObIAllocator &allocator, ObObj &obj) - check collation for string type and convert obj type to the column type (char, varchar or text) - check accuracy */ -int ObTableCtx::adjust_column_type(const ObExprResType &column_type, - ObObj &obj, - bool is_autoincrement/*=false*/) +int ObTableCtx::adjust_column_type(const ObTableColumnInfo &column_info, ObObj &obj) { int ret = OB_SUCCESS; - const bool is_not_nullable = column_type.is_not_null_for_read(); + const ObExprResType &column_type = column_info.type_; const ObCollationType cs_type = column_type.get_collation_type(); // 1. check nullable - if (is_not_nullable && obj.is_null()) { - if (!is_autoincrement) { + if (!column_info.is_nullable_ && obj.is_null()) { + if (!column_info.is_auto_inc_) { ret = OB_BAD_NULL_ERROR; + LOG_USER_ERROR(OB_BAD_NULL_ERROR, column_info.column_name_.length(), column_info.column_name_.ptr()); } } else if (obj.is_null() || is_inc()) { // continue } else if (column_type.get_type() != obj.get_type() && !(ob_is_string_type(column_type.get_type()) && ob_is_string_type(obj.get_type()))) { // 2. data type mismatch - ret = OB_OBJ_TYPE_ERROR; + ret = OB_KV_COLUMN_TYPE_NOT_MATCH; + const char *schema_type_str = ob_obj_type_str(column_type.get_type()); + const char *obj_type_str = ob_obj_type_str(obj.get_type()); + LOG_USER_ERROR(OB_KV_COLUMN_TYPE_NOT_MATCH, column_info.column_name_.length(), column_info.column_name_.ptr(), + static_cast(strlen(schema_type_str)), schema_type_str, static_cast(strlen(obj_type_str)), obj_type_str); LOG_WARN("object type mismatch with column type", K(ret), K(column_type), K(obj)); } else { // 3. check collation @@ -530,7 +548,11 @@ int ObTableCtx::adjust_column_type(const ObExprResType &column_type, // same charset, convert it obj.set_collation_type(cs_type); } else { - ret = OB_ERR_COLLATION_MISMATCH; + ret = OB_KV_COLLATION_MISMATCH; + const char *schema_coll_str = ObCharset::collation_name(cs_type); + const char *obj_coll_str = ObCharset::collation_name(obj.get_collation_type()); + LOG_USER_ERROR(OB_KV_COLLATION_MISMATCH, column_info.column_name_.length(), column_info.column_name_.ptr(), + static_cast(strlen(schema_coll_str)), schema_coll_str, static_cast(strlen(obj_coll_str)), obj_coll_str); LOG_WARN("collation type mismatch with column", K(ret), K(column_type), K(obj)); } if (OB_SUCC(ret)) { @@ -546,6 +568,16 @@ int ObTableCtx::adjust_column_type(const ObExprResType &column_type, // 4. check accuracy if (OB_SUCC(ret)) { if (OB_FAIL(ob_obj_accuracy_check_only(column_type.get_accuracy(), cs_type, obj))) { + if (ret == OB_DATA_OUT_OF_RANGE) { + int64_t row_num = 0; + LOG_USER_ERROR(OB_DATA_OUT_OF_RANGE, column_info.column_name_.length(), column_info.column_name_.ptr(), row_num); + } else if (ret == OB_OPERATE_OVERFLOW) { + const char *type_str = ob_obj_type_str(column_type.get_type()); + LOG_USER_ERROR(OB_OPERATE_OVERFLOW, type_str, column_info.column_name_.ptr()); + } else if (ret == OB_ERR_DATA_TOO_LONG) { + int64_t row_num = 0; + LOG_USER_ERROR(OB_ERR_DATA_TOO_LONG, column_info.column_name_.length(), column_info.column_name_.ptr(), row_num); + } LOG_WARN("accuracy check failed", K(ret), K(obj), K(column_type)); } } @@ -564,12 +596,12 @@ int ObTableCtx::adjust_column_type(const ObExprResType &column_type, int ObTableCtx::adjust_column(const ObColumnSchemaV2 &col_schema, ObObj &obj) { int ret = OB_SUCCESS; - ObExprResType column_type; + ObTableColumnInfo column_info; - if (OB_FAIL(cons_column_type(col_schema, column_type))) { - LOG_WARN("fail to construct column type", K(ret), K(col_schema)); - } else if (OB_FAIL(adjust_column_type(column_type, obj, col_schema.is_autoincrement()))) { - LOG_WARN("fail to adjust rowkey column type", K(ret), K(obj)); + if (OB_FAIL(cons_column_info(col_schema, column_info))) { + LOG_WARN("fail to construct column info", K(ret), K(col_schema)); + } else if (OB_FAIL(adjust_column_type(column_info, obj))) { + LOG_WARN("fail to adjust rowkey column type", K(ret), K(obj), K(column_info)); } return ret; @@ -608,6 +640,7 @@ int ObTableCtx::adjust_rowkey() has_auto_inc = true; if (col_schema->is_part_key_column()) { ret = OB_NOT_SUPPORTED; + LOG_USER_ERROR(OB_NOT_SUPPORTED, "auto increment column set to be partition column"); LOG_WARN("auto increment column could not be partition column", K(ret), K(*col_schema)); } else if (!is_full_filled) { // curr column is auto_increment and user not fill,no need to check need_check = false; @@ -629,7 +662,8 @@ int ObTableCtx::adjust_rowkey() if (OB_FAIL(ret)) { // do nothing } else if (!has_auto_inc && entity_rowkey_cnt != schema_rowkey_cnt) { - ret = OB_ERR_UNEXPECTED; + ret = OB_KV_ROWKEY_COUNT_NOT_MATCH; + LOG_USER_ERROR(OB_KV_ROWKEY_COUNT_NOT_MATCH, schema_rowkey_cnt, entity_rowkey_cnt); LOG_WARN("entity rowkey count mismatch table schema rowkey count", K(ret), K(entity_rowkey_cnt), K(schema_rowkey_cnt)); } @@ -660,12 +694,15 @@ int ObTableCtx::adjust_properties() const ObString &col_name = prop_names.at(i); ObObj &prop_obj = const_cast(prop_objs.at(i)); if (OB_ISNULL(col_schema = table_schema_->get_column_schema(col_name))) { - ret = OB_ERR_COLUMN_NOT_FOUND; + ret = OB_ERR_BAD_FIELD_ERROR; + const ObString &table = table_schema_->get_table_name_str(); + LOG_USER_ERROR(OB_ERR_BAD_FIELD_ERROR, col_name.length(), col_name.ptr(), table.length(), table.ptr()); LOG_WARN("fail to get column schema", K(ret), K(col_name)); } else if (is_get) { // do nothing } else if (col_schema->is_rowkey_column()) { ret = OB_NOT_SUPPORTED; + LOG_USER_ERROR(OB_NOT_SUPPORTED, "mutate rowkey column"); LOG_WARN("property should not be rowkey column", K(ret), K(prop_names), K(i)); } else if (OB_FAIL(adjust_column(*col_schema, prop_obj))) { LOG_WARN("fail to adjust column", K(ret), K(prop_obj)); @@ -722,11 +759,11 @@ int ObTableCtx::generate_key_range(const ObIArray &scan_ranges) { int ret = OB_SUCCESS; int64_t padding_num = -1; - ObArray columns_type; + ObSEArray columns_infos; int64_t N = scan_ranges.count(); - if (OB_FAIL(generate_columns_type(columns_type))) { - LOG_WARN("fail to generate columns type", K(ret)); + if (OB_FAIL(generate_column_infos(columns_infos))) { + LOG_WARN("fail to generate columns infos", K(ret)); } else if (is_index_scan_) { // 索引扫描场景下用户可能没有填写rowkey的key_range,需要加上 padding_num = index_schema_->get_rowkey_column_num() - index_col_ids_.count(); @@ -745,17 +782,18 @@ int ObTableCtx::generate_key_range(const ObIArray &scan_ranges) if (p_key->is_min_row() || p_key->is_max_row()) { // do nothing } else { - if (p_key->get_obj_cnt() != columns_type.count()) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("wrong rowkey size", K(ret), K(i), K(j), K(*p_key), K(columns_type)); + if (p_key->get_obj_cnt() != columns_infos.count()) { + ret = OB_KV_SCAN_RANGE_MISSING; + LOG_USER_ERROR(OB_KV_SCAN_RANGE_MISSING, p_key->get_obj_cnt(), columns_infos.count()); + LOG_WARN("wrong rowkey size", K(ret), K(i), K(j), K(*p_key), K(columns_infos)); } else { const int64_t M = p_key->get_obj_cnt(); for (int64_t k = 0; OB_SUCCESS == ret && k < M; ++k) { ObObj &obj = const_cast(p_key->get_obj_ptr()[k]); if (obj.is_min_value() || obj.is_max_value()) { // do nothing - } else if (OB_FAIL(adjust_column_type(columns_type.at(k), obj))) { - LOG_WARN("fail to adjust column type", K(ret), K(columns_type.at(k)), K(obj)); + } else if (OB_FAIL(adjust_column_type(columns_infos.at(k), obj))) { + LOG_WARN("fail to adjust column type", K(ret), K(columns_infos.at(k)), K(obj)); } } } @@ -865,7 +903,7 @@ int ObTableCtx::init_scan(const ObTableQuery &query, } else { // init index_col_ids_ if (OB_SUCC(ret)) { - const common::ObIndexInfo &index_info = index_schema_->get_index_info(); + const ObIndexInfo &index_info = index_schema_->get_index_info(); if (OB_FAIL(index_info.get_column_ids(index_col_ids_))) { LOG_WARN("fail to get index column ids", K(ret), K(index_info)); } @@ -988,7 +1026,7 @@ int ObTableCtx::add_stored_generated_column_assignment(const ObTableAssignment & } } } - return init_dml_related_tid(); + return ret; } /* @@ -1015,6 +1053,7 @@ int ObTableCtx::init_assignments(const ObTableEntity &entity) if (OB_SUCCESS == entity.get_property(item.column_name_, prop_obj)) { if (item.rowkey_position_ > 0) { ret = OB_ERR_UPDATE_ROWKEY_COLUMN; + LOG_USER_ERROR(OB_ERR_UPDATE_ROWKEY_COLUMN); LOG_WARN("can not update rowkey column", K(ret)); } else { ObTableAssignment assign(&item); @@ -1248,10 +1287,15 @@ int ObTableCtx::init_append(bool return_affected_entity, bool return_rowkey) } else if (assign.column_item_->auto_filled_timestamp_) { // do nothing } else if (delta.is_null()) { - ret = OB_OBJ_TYPE_ERROR; + ret = OB_NOT_SUPPORTED; + LOG_USER_ERROR(OB_NOT_SUPPORTED, "append null"); LOG_WARN("append NULL is illegal", K(ret), K(delta)); } else if (OB_UNLIKELY(!ob_is_string_type(delta.get_type()))) { - ret = OB_OBJ_TYPE_ERROR; + ret = OB_KV_COLUMN_TYPE_NOT_MATCH; + const char *schema_type_str = "stringTc"; + const char *obj_type_str = ob_obj_type_str(delta.get_type()); + LOG_USER_ERROR(OB_KV_COLUMN_TYPE_NOT_MATCH, assign.column_item_->column_name_.length(), assign.column_item_->column_name_.ptr(), + static_cast(strlen(schema_type_str)), schema_type_str, static_cast(strlen(obj_type_str)), obj_type_str); LOG_WARN("can only append string type", K(ret), K(delta)); } else { const ObString &column_name = assign.column_item_->column_name_; @@ -1310,11 +1354,16 @@ int ObTableCtx::init_increment(bool return_affected_entity, bool return_rowkey) LOG_WARN("column item is null", K(ret), K(assign)); } else if (assign.column_item_->is_auto_increment_) { ret = OB_NOT_SUPPORTED; + LOG_USER_ERROR(OB_NOT_SUPPORTED, "increment auto increment column"); LOG_WARN("not support increment auto increment column", K(ret), K(assign)); } else if (assign.column_item_->auto_filled_timestamp_) { // do nothing } else if (!ob_is_int_tc(delta.get_type())) { - ret = OB_OBJ_TYPE_ERROR; + ret = OB_KV_COLUMN_TYPE_NOT_MATCH; + const char *schema_type_str = "intTc"; + const char *obj_type_str = ob_obj_type_str(delta.get_type()); + LOG_USER_ERROR(OB_KV_COLUMN_TYPE_NOT_MATCH, assign.column_item_->column_name_.length(), assign.column_item_->column_name_.ptr(), + static_cast(strlen(schema_type_str)), schema_type_str, static_cast(strlen(obj_type_str)), obj_type_str); LOG_WARN("delta should only be signed integer type", K(ret), K(delta)); } else { const ObString &column_name = assign.column_item_->column_name_; @@ -1575,7 +1624,7 @@ int ObTableCtx::init_agg_cell_proj(int64_t size) } int ObTableCtx::add_aggregate_proj(int64_t cell_idx, - const common::ObString &column_name, + const ObString &column_name, const ObIArray &aggregations) { int ret = OB_SUCCESS; @@ -1703,20 +1752,22 @@ int ObTableCtx::get_related_tablet_id(const share::schema::ObTableSchema &index_ int ObTableCtx::check_insert_up_can_use_put(bool &use_put) { int ret = OB_SUCCESS; - use_put = true; + use_put = false; + bool can_use_put = true; if (is_inc_or_append()) { // increment or append operarion need old value to calculate, can not use put - use_put = false; + can_use_put = false; } else if (ObTableOperationType::INSERT_OR_UPDATE != operation_type_) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid operation type", K(ret), K_(operation_type)); } else if (is_htable()) { // htable has no index and alway full filled. - use_put = true; + can_use_put = true; } else if (is_client_set_put_ && !related_index_ids_.empty()) { ret = OB_NOT_SUPPORTED; + LOG_USER_ERROR(OB_NOT_SUPPORTED, "table with index use put"); LOG_WARN("client set use_put flag, but has local index is not support", K(ret), K_(related_index_ids)); } else if (!related_index_ids_.empty()) { // has index, can not use put - use_put = false; + can_use_put = false; } else if (OB_ISNULL(table_schema_)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("table schema is null", K(ret)); @@ -1729,14 +1780,19 @@ int ObTableCtx::check_insert_up_can_use_put(bool &use_put) - table_schema_->get_rowkey_column_num() <= entity_->get_properties_count())) { // all columns are filled } else if (is_client_set_put_ && !is_all_columns_filled) { ret = OB_NOT_SUPPORTED; + LOG_USER_ERROR(OB_NOT_SUPPORTED, "all columns not filled but use put"); LOG_WARN("client set use_put flag, but not fill all columns is not support", K(ret), KPC_(table_schema), KPC_(entity)); } else if (is_client_set_put_ || is_all_columns_filled) { - use_put = true; + can_use_put = true; } else { // some columns are missing - use_put = false; + can_use_put = false; } } + if (OB_SUCC(ret) && can_use_put && !is_total_quantity_log()) { + use_put = true; + } + return ret; } diff --git a/src/observer/table/ob_table_context.h b/src/observer/table/ob_table_context.h index bcfa34959..5ef50f741 100644 --- a/src/observer/table/ob_table_context.h +++ b/src/observer/table/ob_table_context.h @@ -34,6 +34,7 @@ struct ObTableColumnItem : public sql::ColumnItem is_stored_generated_column_(false), is_virtual_generated_column_(false), is_auto_increment_(false), + is_nullable_(true), rowkey_position_(-1) {} TO_STRING_KV("ColumnItem", static_cast(*this), @@ -45,6 +46,7 @@ struct ObTableColumnItem : public sql::ColumnItem K_(generated_expr_str), K_(dependant_exprs), K_(is_auto_increment), + K_(is_nullable), K_(rowkey_position)); sql::ObRawExpr *raw_expr_; // column ref expr or calculate expr bool is_generated_column_; @@ -56,9 +58,35 @@ struct ObTableColumnItem : public sql::ColumnItem common::ObString generated_expr_str_; common::ObSEArray dependant_exprs_; bool is_auto_increment_; + bool is_nullable_; int64_t rowkey_position_; // greater than zero if this is rowkey column, 0 if this is common column }; +struct ObTableColumnInfo +{ + ObTableColumnInfo() + : type_(), + is_auto_inc_(false), + is_nullable_(true) + { + } + ObTableColumnInfo(sql::ObExprResType type, const common::ObString &column_name, bool is_auto_inc = false, bool is_nullable = true) + : type_(type), + column_name_(column_name), + is_auto_inc_(is_auto_inc), + is_nullable_(is_nullable) + { + } + sql::ObExprResType type_; + common::ObString column_name_; + bool is_auto_inc_; + bool is_nullable_; + TO_STRING_KV(K_(type), + K_(column_name), + K_(is_auto_inc), + K_(is_nullable)); +}; + struct ObTableAssignment : public sql::ObAssignment { ObTableAssignment() @@ -152,6 +180,7 @@ public: is_ttl_table_ = false; is_skip_scan_ = false; is_client_set_put_ = false; + binlog_row_image_type_ = ObBinlogRowImage::FULL; } virtual ~ObTableCtx() {} @@ -186,7 +215,8 @@ public: K_(cur_cluster_version), K_(is_ttl_table), K_(is_skip_scan), - K_(is_client_set_put)); + K_(is_client_set_put), + K_(binlog_row_image_type)); public: //////////////////////////////////////// getter //////////////////////////////////////////////// // for common @@ -242,6 +272,7 @@ public: OB_INLINE const common::ObIArray& get_select_col_ids() const { return select_col_ids_; } OB_INLINE const common::ObIArray& get_query_col_ids() const { return query_col_ids_; } OB_INLINE const common::ObIArray& get_query_col_names() const { return query_col_names_; } + OB_INLINE bool is_total_quantity_log() const { return binlog_row_image_type_ == ObBinlogRowImage::FULL; } // for update OB_INLINE bool is_for_update() const { return is_for_update_; } OB_INLINE bool is_inc_or_append() const @@ -373,7 +404,7 @@ private: int init_sess_info(ObTableApiCredential &credential); // for scan int init_index_info(const common::ObString &index_name); - int generate_columns_type(common::ObIArray &columns_type); + int generate_column_infos(common::ObIArray &columns_infos); int generate_key_range(const common::ObIArray &scan_ranges); // for dml int init_dml_related_tid(); @@ -396,9 +427,9 @@ private: private: int construct_column_items(); - int cons_column_type(const share::schema::ObColumnSchemaV2 &column_schema, - sql::ObExprResType &column_type); - int adjust_column_type(const ObExprResType &column_type, ObObj &obj, bool is_autoincrement = false); + int cons_column_info(const share::schema::ObColumnSchemaV2 &column_schema, + ObTableColumnInfo &column_info); + int adjust_column_type(const ObTableColumnInfo &column_info, ObObj &ob); int adjust_column(const ObColumnSchemaV2 &col_schema, ObObj &obj); int adjust_rowkey(); int adjust_properties(); @@ -483,6 +514,7 @@ private: bool is_skip_scan_; // for put bool is_client_set_put_; + int64_t binlog_row_image_type_; private: DISALLOW_COPY_AND_ASSIGN(ObTableCtx); }; diff --git a/src/observer/table/ob_table_end_trans_cb.cpp b/src/observer/table/ob_table_end_trans_cb.cpp index 3e2a844b7..49eabd1df 100644 --- a/src/observer/table/ob_table_end_trans_cb.cpp +++ b/src/observer/table/ob_table_end_trans_cb.cpp @@ -80,7 +80,7 @@ void ObTableExecuteEndTransCb::callback(int cb_param) CHECK_BALANCE("[table async callback]"); if (cb_param != OB_SUCCESS) { // commit failed - result_.set_errno(cb_param); + result_.set_err(cb_param); result_.set_affected_rows(0); result_entity_.reset(); } @@ -143,7 +143,7 @@ void ObTableBatchExecuteEndTransCb::callback(int cb_param) // same result for all ObTableOperationResult single_op_result; single_op_result.set_entity(result_entity_); - single_op_result.set_errno(cb_param); + single_op_result.set_err(cb_param); single_op_result.set_type(table_operation_type_); if (OB_FAIL(result_.push_back(single_op_result))) { LOG_WARN("failed to add result", K(ret)); // @todo reset the connection diff --git a/src/observer/table/ob_table_execute_processor.cpp b/src/observer/table/ob_table_execute_processor.cpp index 80b7ae294..304ef5f77 100644 --- a/src/observer/table/ob_table_execute_processor.cpp +++ b/src/observer/table/ob_table_execute_processor.cpp @@ -81,6 +81,7 @@ int ObTableApiExecuteP::check_arg() if (!(arg_.consistency_level_ == ObTableConsistencyLevel::STRONG || arg_.consistency_level_ == ObTableConsistencyLevel::EVENTUAL)) { ret = OB_NOT_SUPPORTED; + LOG_USER_ERROR(OB_NOT_SUPPORTED, "consistency level"); LOG_WARN("some options not supported yet", K(ret), "consistency_level", arg_.consistency_level_, "operation_type", arg_.table_operation_.type()); @@ -97,6 +98,7 @@ int ObTableApiExecuteP::check_arg2() const ObTableOperationType::Type::INCREMENT != op_type) { if (arg_.returning_rowkey() || arg_.returning_affected_entity()) { ret = OB_NOT_SUPPORTED; + LOG_USER_ERROR(OB_NOT_SUPPORTED, "returning rowkey or affected entity"); LOG_WARN("some options not supported yet", K(ret), "returning_rowkey", arg_.returning_rowkey(), "returning_affected_entity", arg_.returning_affected_entity(), @@ -221,6 +223,7 @@ int ObTableApiExecuteP::try_process() // do nothing } else if (OB_UNLIKELY(!is_index_supported)) { ret = OB_NOT_SUPPORTED; + LOG_USER_ERROR(OB_NOT_SUPPORTED, "global index"); LOG_WARN("index type is not supported by table api", K(ret)); } else if (OB_FAIL(check_arg2())) { LOG_WARN("fail to check arg", K(ret)); @@ -230,11 +233,7 @@ int ObTableApiExecuteP::try_process() switch (table_operation.type()) { case ObTableOperationType::INSERT: stat_event_type_ = ObTableProccessType::TABLE_API_SINGLE_INSERT; - if (tb_ctx_.is_ttl_table()) { - ret = process_dml_op(); - } else { - ret = process_dml_op(); - } + ret = process_insert(); break; case ObTableOperationType::GET: stat_event_type_ = ObTableProccessType::TABLE_API_SINGLE_GET; @@ -250,11 +249,7 @@ int ObTableApiExecuteP::try_process() break; case ObTableOperationType::INSERT_OR_UPDATE: stat_event_type_ = ObTableProccessType::TABLE_API_SINGLE_INSERT_OR_UPDATE; - if (tb_ctx_.is_ttl_table()) { - ret = process_dml_op(); - } else { - ret = process_dml_op(); - } + ret = process_insert_up(); break; case ObTableOperationType::REPLACE: stat_event_type_ = ObTableProccessType::TABLE_API_SINGLE_REPLACE; @@ -262,19 +257,11 @@ int ObTableApiExecuteP::try_process() break; case ObTableOperationType::INCREMENT: stat_event_type_ = ObTableProccessType::TABLE_API_SINGLE_INCREMENT; - if (tb_ctx_.is_ttl_table()) { - ret = process_dml_op(); - } else { - ret = process_dml_op(); - } + ret = process_insert_up(); break; case ObTableOperationType::APPEND: stat_event_type_ = ObTableProccessType::TABLE_API_SINGLE_APPEND; - if (tb_ctx_.is_ttl_table()) { - ret = process_dml_op(); - } else { - ret = process_dml_op(); - } + ret = process_insert_up(); break; default: ret = OB_INVALID_ARGUMENT; @@ -286,7 +273,7 @@ int ObTableApiExecuteP::try_process() if (OB_FAIL(ret)) { // init_tb_ctx will return some replaceable error code - result_.set_errno(ret); + result_.set_err(ret); table::ObTableApiUtil::replace_ret_code(ret); } @@ -420,7 +407,7 @@ int ObTableApiExecuteP::process_get() } release_read_trans(); - result_.set_errno(ret); + result_.set_err(ret); ObTableApiUtil::replace_ret_code(ret); result_.set_type(arg_.table_operation_.type()); diff --git a/src/observer/table/ob_table_execute_processor.h b/src/observer/table/ob_table_execute_processor.h index 8b1f6883e..da273b62e 100644 --- a/src/observer/table/ob_table_execute_processor.h +++ b/src/observer/table/ob_table_execute_processor.h @@ -68,7 +68,7 @@ private: SERVER_LOG(WARN, "fail to process op", K(ret)); } - result_.set_errno(ret); + result_.set_err(ret); table::ObTableApiUtil::replace_ret_code(ret); int tmp_ret = ret; if (OB_FAIL(end_trans(OB_SUCCESS != ret, req_, get_timeout_ts()))) { @@ -79,6 +79,26 @@ private: return ret; } int process_get(); + int process_insert() + { + int ret = OB_SUCCESS; + if (!tb_ctx_.is_ttl_table()) { + ret = process_dml_op(); + } else { + ret = process_dml_op(); + } + return ret; + } + int process_insert_up() + { + int ret = OB_SUCCESS; + if (!tb_ctx_.is_ttl_table()) { + ret = process_dml_op(); + } else { + ret = process_dml_op(); + } + return ret; + } private: table::ObTableEntity request_entity_; table::ObTableEntity result_entity_; diff --git a/src/observer/table/ob_table_filter.cpp b/src/observer/table/ob_table_filter.cpp index 198c033c1..93c6b2d04 100644 --- a/src/observer/table/ob_table_filter.cpp +++ b/src/observer/table/ob_table_filter.cpp @@ -103,6 +103,7 @@ int ObTableComparator::compare_to(const ObIArray &select_columns, } else { // not support others ret = OB_NOT_SUPPORTED; + LOG_USER_ERROR(OB_NOT_SUPPORTED, "only for int and string, other column type"); LOG_WARN("do not support other column type, only for int, string", K(ret), K(column_type)); } } else { diff --git a/src/observer/table/ob_table_modify_executor.cpp b/src/observer/table/ob_table_modify_executor.cpp index f6e4a3526..88d3215c9 100644 --- a/src/observer/table/ob_table_modify_executor.cpp +++ b/src/observer/table/ob_table_modify_executor.cpp @@ -23,6 +23,33 @@ namespace oceanbase { namespace table { + +int ObTableApiModifyExecutor::check_row_null(const ObExprPtrIArray &row, const ColContentIArray &column_infos) +{ + int ret = OB_SUCCESS; + + if (row.count() < column_infos.count()) { // column_infos count less than row count when do update + ret = OB_ERR_UNEXPECTED; + LOG_WARN("invalid row count", K(ret), K(row), K(column_infos)); + } + + for (int i = 0; OB_SUCC(ret) && i < column_infos.count(); i++) { + ObDatum *datum = NULL; + const bool is_nullable = column_infos.at(i).is_nullable_; + uint64_t col_idx = column_infos.at(i).projector_index_; + if (OB_FAIL(row.at(col_idx)->eval(eval_ctx_, datum))) { + LOG_WARN("fail to eval datum", K(ret), K(row), K(column_infos), K(col_idx)); + } else if (!is_nullable && datum->is_null()) { + const ObString &column_name = column_infos.at(i).column_name_; + ret = OB_BAD_NULL_ERROR; + LOG_USER_ERROR(OB_BAD_NULL_ERROR, column_name.length(), column_name.ptr()); + LOG_WARN("bad null error", K(ret), K(row), K(column_name)); + } + } + + return ret; +} + int ObTableApiModifyExecutor::open() { int ret = OB_SUCCESS; @@ -239,6 +266,8 @@ int ObTableApiModifyExecutor::insert_row_to_das(const ObTableInsCtDef &ins_ctdef ObChunkDatumStore::StoredRow* stored_row = nullptr; if (OB_FAIL(calc_tablet_loc(tablet_loc))) { LOG_WARN("fail to calc partition key", K(ret)); + } else if (OB_FAIL(check_row_null(ins_ctdef.new_row_, ins_ctdef.column_infos_))) { + LOG_WARN("fail to check row nullable", K(ret)); } else if (OB_FAIL(ObDMLService::insert_row(ins_ctdef.das_ctdef_, ins_rtdef.das_rtdef_, tablet_loc, @@ -428,6 +457,7 @@ int ObTableApiModifyExecutor::check_rowkey_change(const ObChunkDatumStore::Store // do nothing } else if (!ObDatum::binary_equal(upd_old_row.cells()[i], upd_new_row.cells()[i])) { ret = OB_ERR_UPDATE_ROWKEY_COLUMN; + LOG_USER_ERROR(OB_ERR_UPDATE_ROWKEY_COLUMN); LOG_WARN("can not update rowkey column", K(ret)); } } @@ -469,6 +499,7 @@ int ObTableApiModifyExecutor::to_expr_skip_old(const ObChunkDatumStore::StoredRo LOG_WARN("unexpected assign projector_index_", K(ret), K(new_row), K(assign.column_item_)); } else if (assign.column_item_->is_virtual_generated_column_) { ret = OB_NOT_SUPPORTED; + LOG_USER_ERROR(OB_NOT_SUPPORTED, "update virtual generated column"); LOG_WARN("virtual generated column not support to update", K(ret), K(assign)); } else { ObExpr *expr = new_row.at(assign.column_item_->col_idx_); @@ -607,6 +638,8 @@ int ObTableApiModifyExecutor::insert_upd_new_row_to_das(const ObTableUpdCtDef &u } else if (OB_ISNULL(upd_rtdef.dins_rtdef_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("dins_rtdef_ is null", K(ret)); + } else if (OB_FAIL(check_row_null(upd_ctdef.new_row_, upd_ctdef.assign_columns_))) { + LOG_WARN("fail to check row nullable", K(ret)); } else if (OB_FAIL(ObDMLService::insert_row(*upd_ctdef.dins_ctdef_, *upd_rtdef.dins_rtdef_, tablet_loc, diff --git a/src/observer/table/ob_table_modify_executor.h b/src/observer/table/ob_table_modify_executor.h index 8ed83aa14..8733ef77b 100644 --- a/src/observer/table/ob_table_modify_executor.h +++ b/src/observer/table/ob_table_modify_executor.h @@ -128,6 +128,7 @@ protected: int stored_row_to_exprs(const ObChunkDatumStore::StoredRow &row, const common::ObIArray &exprs, ObEvalCtx &ctx); + int check_row_null(const ObExprPtrIArray &row, const ColContentIArray &column_infos); protected: sql::ObDMLRtCtx dml_rtctx_; int64_t affected_rows_; diff --git a/src/observer/table/ob_table_move_response.cpp b/src/observer/table/ob_table_move_response.cpp index b114a5faa..9febae0c0 100644 --- a/src/observer/table/ob_table_move_response.cpp +++ b/src/observer/table/ob_table_move_response.cpp @@ -23,7 +23,7 @@ using namespace oceanbase::table; //////////////////////////////////////////////////////////////// int ObTableMoveResponseSender::get_replica(const uint64_t table_id, - const common::ObTabletID &tablet_id, + const ObTabletID &tablet_id, table::ObTableMoveReplicaInfo &replica) { int ret = OB_SUCCESS; @@ -50,28 +50,33 @@ int ObTableMoveResponseSender::get_replica(const uint64_t table_id, } int ObTableMoveResponseSender::init(const uint64_t table_id, - const common::ObTabletID &tablet_id, + const ObTabletID &tablet_id, share::schema::ObMultiVersionSchemaService &schema_service) { int ret = OB_SUCCESS; - ObTableMoveReplicaInfo &replica = result_.get_replica_info(); - if (OB_FAIL(get_replica(table_id, tablet_id, replica))) { - LOG_WARN("fail to get partition info", K(ret), K(table_id), K(tablet_id)); + share::schema::ObSchemaGetterGuard schema_guard; + const share::schema::ObTableSchema *table_schema = nullptr; + + if (OB_FAIL(schema_service.get_tenant_schema_guard(MTL_ID(), schema_guard))) { + LOG_WARN("fail to get schema guard", K(ret)); + } else if (OB_FAIL(schema_guard.get_table_schema(MTL_ID(), table_id, table_schema))) { + LOG_WARN("fail to get table schema", K(table_id), K(ret)); + } else if (OB_ISNULL(table_schema)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("NULL ptr", K(ret), K(table_id)); } else { - share::schema::ObSchemaGetterGuard schema_guard; - const share::schema::ObTableSchema *table_schema = nullptr; - if (OB_FAIL(schema_service.get_tenant_schema_guard(MTL_ID(), schema_guard))) { - LOG_WARN("fail to get schema guard", K(ret)); - } else if (OB_FAIL(schema_guard.get_table_schema(MTL_ID(), table_id, table_schema))) { - LOG_WARN("fail to get table schema", K(table_id), K(ret)); - } else if (OB_ISNULL(table_schema)) { - ret = OB_ERR_UNEXPECTED; - LOG_ERROR("NULL ptr", K(ret), K(table_id)); + ObTabletID tmp_tablet_id = tablet_id; + if (!table_schema->is_partitioned_table()) { + tmp_tablet_id = table_schema->get_tablet_id(); + } + + if (OB_FAIL(get_replica(table_id, tmp_tablet_id, replica))) { + LOG_WARN("fail to get partition info", K(ret), K(table_id), K(tmp_tablet_id)); } else { replica.set_table_id(table_id); replica.set_schema_version(table_schema->get_schema_version()); - replica.set_tablet_id(tablet_id); + replica.set_tablet_id(tmp_tablet_id); // set move pcode response_sender_.set_pcode(obrpc::OB_TABLE_API_MOVE); diff --git a/src/observer/table/ob_table_op_wrapper.cpp b/src/observer/table/ob_table_op_wrapper.cpp index ef1760c23..aa7c3c7b1 100644 --- a/src/observer/table/ob_table_op_wrapper.cpp +++ b/src/observer/table/ob_table_op_wrapper.cpp @@ -62,7 +62,7 @@ int ObTableOpWrapper::process_op_with_spec(ObTableCtx &tb_ctx, ret = COVER_SUCC(tmp_ret); } - op_result.set_errno(ret); + op_result.set_err(ret); op_result.set_type(tb_ctx.get_opertion_type()); spec->destroy_executor(executor); return ret; @@ -201,6 +201,78 @@ int ObTableOpWrapper::process_get_with_spec(ObTableCtx &tb_ctx, return ret; } +int ObTableOpWrapper::get_insert_spec(ObTableCtx &tb_ctx, + ObTableApiCacheGuard &cache_guard, + ObTableApiSpec *&spec) +{ + int ret = OB_SUCCESS; + + if (!tb_ctx.is_ttl_table()) { + if (OB_FAIL(ObTableOpWrapper::get_or_create_spec(tb_ctx, cache_guard, spec))) { + LOG_WARN("fail to get or create insert spec", K(ret)); + } + } else { + if (OB_FAIL(ObTableOpWrapper::get_or_create_spec(tb_ctx, cache_guard, spec))) { + LOG_WARN("fail to get or create ttl spec", K(ret)); + } + } + + return ret; +} + +int ObTableOpWrapper::get_insert_up_spec(ObTableCtx &tb_ctx, + ObTableApiCacheGuard &cache_guard, + ObTableApiSpec *&spec) +{ + int ret = OB_SUCCESS; + + if (!tb_ctx.is_ttl_table()) { + if (OB_FAIL(ObTableOpWrapper::get_or_create_spec(tb_ctx, cache_guard, spec))) { + LOG_WARN("fail to get or create insert up spec", K(ret)); + } + } else { + if (OB_FAIL(ObTableOpWrapper::get_or_create_spec(tb_ctx, cache_guard, spec))) { + LOG_WARN("fail to get or create ttl spec", K(ret)); + } + } + + return ret; +} + +int ObTableOpWrapper::process_insert_op(ObTableCtx &tb_ctx, ObTableOperationResult &op_result) +{ + int ret = OB_SUCCESS; + + if (!tb_ctx.is_ttl_table()) { + if (OB_FAIL(ObTableOpWrapper::process_op(tb_ctx, op_result))) { + LOG_WARN("fail to process insert operation", K(ret)); + } + } else { + if (OB_FAIL(ObTableOpWrapper::process_op(tb_ctx, op_result))) { + LOG_WARN("fail to process ttl insert operation", K(ret)); + } + } + + return ret; +} + +int ObTableOpWrapper::process_insert_up_op(ObTableCtx &tb_ctx, ObTableOperationResult &op_result) +{ + int ret = OB_SUCCESS; + + if (!tb_ctx.is_ttl_table()) { + if (OB_FAIL(ObTableOpWrapper::process_op(tb_ctx, op_result))) { + LOG_WARN("fail to process insert up operation", K(ret)); + } + } else { + if (OB_FAIL(ObTableOpWrapper::process_op(tb_ctx, op_result))) { + LOG_WARN("fail to process ttl insert up operation", K(ret)); + } + } + + return ret; +} + int ObTableApiUtil::construct_entity_from_row(ObIAllocator &allocator, ObNewRow *row, const ObTableSchema *table_schema, @@ -223,7 +295,9 @@ int ObTableApiUtil::construct_entity_from_row(ObIAllocator &allocator, for (int64_t i = 0; OB_SUCC(ret) && i < N; ++i) { const ObString &name = arr_col->at(i); if (OB_ISNULL(column_schema = table_schema->get_column_schema(name))) { - ret = OB_ERR_COLUMN_NOT_FOUND; + ret = OB_ERR_BAD_FIELD_ERROR; + const ObString &table = table_schema->get_table_name_str(); + LOG_USER_ERROR(OB_ERR_BAD_FIELD_ERROR, name.length(), name.ptr(), table.length(), table.ptr()); LOG_WARN("column not exist", K(ret), K(name)); } else { int64_t column_idx = table_schema->get_column_idx(column_schema->get_column_id()); diff --git a/src/observer/table/ob_table_op_wrapper.h b/src/observer/table/ob_table_op_wrapper.h index 9d5858552..aca501fa5 100644 --- a/src/observer/table/ob_table_op_wrapper.h +++ b/src/observer/table/ob_table_op_wrapper.h @@ -68,6 +68,10 @@ public: // get特有的逻辑,单独处理 static int process_get(ObTableCtx &tb_ctx, ObNewRow *&row); static int process_get_with_spec(ObTableCtx &tb_ctx, ObTableApiSpec *spec, ObNewRow *&row); + static int get_insert_spec(ObTableCtx &tb_ctx, ObTableApiCacheGuard &cache_guard, ObTableApiSpec *&spec); + static int get_insert_up_spec(ObTableCtx &tb_ctx, ObTableApiCacheGuard &cache_guard, ObTableApiSpec *&spec); + static int process_insert_op(ObTableCtx &tb_ctx, ObTableOperationResult &op_result); + static int process_insert_up_op(ObTableCtx &tb_ctx, ObTableOperationResult &op_result); private: static int process_affected_entity(ObTableCtx &tb_ctx, const ObTableApiSpec &spec, @@ -91,7 +95,9 @@ public: if (OB_ERR_PRIMARY_KEY_DUPLICATE == ret || OB_BAD_NULL_ERROR == ret || OB_OBJ_TYPE_ERROR == ret + || OB_KV_COLUMN_TYPE_NOT_MATCH == ret || OB_ERR_COLLATION_MISMATCH == ret + || OB_KV_COLLATION_MISMATCH == ret || OB_ERR_DATA_TOO_LONG == ret || OB_DATA_OUT_OF_RANGE == ret) { ret = OB_SUCCESS; diff --git a/src/observer/table/ob_table_query_and_mutate_processor.cpp b/src/observer/table/ob_table_query_and_mutate_processor.cpp index ec731a9dc..94fa0e8f0 100644 --- a/src/observer/table/ob_table_query_and_mutate_processor.cpp +++ b/src/observer/table/ob_table_query_and_mutate_processor.cpp @@ -75,6 +75,7 @@ int ObTableQueryAndMutateP::check_arg() LOG_WARN("invalid table query request", K(ret), K(query)); } else if ((ObTableEntityType::ET_HKV == arg_.entity_type_) && !hfilter.is_valid()) { ret = OB_NOT_SUPPORTED; + LOG_USER_ERROR(OB_NOT_SUPPORTED, "QueryAndMutate hbase model not set hfilter"); LOG_WARN("QueryAndMutate hbase model should set hfilter", K(ret)); } else if ((ObTableEntityType::ET_KV == arg_.entity_type_) && (1 != mutations.count())) { ret = OB_ERR_UNEXPECTED; @@ -320,7 +321,7 @@ int ObTableQueryAndMutateP::execute_htable_put(const ObITableEntity &new_entity) LOG_WARN("fail to init table ctx", K(ret)); } else if (OB_FAIL(tb_ctx.init_trans(get_trans_desc(), get_tx_snapshot()))) { LOG_WARN("fail to init trans", K(ret), K(tb_ctx)); - } else if (OB_FAIL(ObTableOpWrapper::process_op(tb_ctx, op_result))) { + } else if (OB_FAIL(ObTableOpWrapper::process_insert_up_op(tb_ctx, op_result))) { LOG_WARN("fail to process insert up op", K(ret)); } } @@ -644,7 +645,7 @@ int ObTableQueryAndMutateP::execute_htable_insert(const ObITableEntity &new_enti LOG_WARN("fail to init table ctx", K(ret)); } else if (OB_FAIL(tb_ctx.init_trans(get_trans_desc(), get_tx_snapshot()))) { LOG_WARN("fail to init trans", K(ret), K(tb_ctx)); - } else if (OB_FAIL(ObTableOpWrapper::process_op(tb_ctx, op_result))) { + } else if (OB_FAIL(ObTableOpWrapper::process_insert_op(tb_ctx, op_result))) { LOG_WARN("fail to process insert op", K(ret)); } } @@ -705,6 +706,7 @@ int ObTableQueryAndMutateP::execute_htable_mutation(ObTableQueryResultIterator * } default: { ret = OB_NOT_SUPPORTED; + LOG_USER_ERROR(OB_NOT_SUPPORTED, "mutation type"); LOG_WARN("not supported mutation type", K(ret), "type", mutation.type()); break; } @@ -847,42 +849,24 @@ int ObTableQueryAndMutateP::execute_one_mutation(ObTableQueryResult &one_result, } break; } - case ObTableOperationType::INCREMENT: { - if (tb_ctx_.is_ttl_table()) { - ret = process_dml_op(*new_entity, tmp_affect_rows); - } else { - ret = process_dml_op(*new_entity, tmp_affect_rows); - } - if (OB_FAIL(ret)) { - LOG_WARN("fail to do increment", K(ret), K(tb_ctx_.is_ttl_table())); - } else { - affected_rows += tmp_affect_rows; - } - break; - } + case ObTableOperationType::INCREMENT: case ObTableOperationType::APPEND: { - if (tb_ctx_.is_ttl_table()) { - ret = process_dml_op(*new_entity, tmp_affect_rows); - } else { - ret = process_dml_op(*new_entity, tmp_affect_rows); - } - if (OB_FAIL(ret)) { - LOG_WARN("fail to do append", K(ret), K(tb_ctx_.is_ttl_table())); - } else { + ret = process_insert_up(*new_entity, tmp_affect_rows); + if (OB_SUCC(ret)) { affected_rows += tmp_affect_rows; } break; } case ObTableOperationType::INSERT: { // 使用mutation上的entity执行insert - if (OB_FAIL(process_dml_op(mutate_entity, tmp_affect_rows))) { - LOG_WARN("ail to execute table insert", K(ret)); - } else { + ret = process_insert(mutate_entity, tmp_affect_rows); + if (OB_SUCC(ret)) { affected_rows += tmp_affect_rows; } break; } default: { ret = OB_NOT_SUPPORTED; + LOG_USER_ERROR(OB_NOT_SUPPORTED, "mutation type"); LOG_WARN("not supported mutation type", K(ret), "type", mutation.type()); break; } diff --git a/src/observer/table/ob_table_query_and_mutate_processor.h b/src/observer/table/ob_table_query_and_mutate_processor.h index f1783780e..9077b52a7 100644 --- a/src/observer/table/ob_table_query_and_mutate_processor.h +++ b/src/observer/table/ob_table_query_and_mutate_processor.h @@ -108,6 +108,26 @@ private: return ret; } + int process_insert(const table::ObITableEntity &new_entity, int64_t &affected_rows) + { + int ret = OB_SUCCESS; + if (!tb_ctx_.is_ttl_table()) { + ret = process_dml_op(new_entity, affected_rows); + } else { + ret = process_dml_op(new_entity, affected_rows); + } + return ret; + } + int process_insert_up(const table::ObITableEntity &new_entity, int64_t &affected_rows) + { + int ret = OB_SUCCESS; + if (!tb_ctx_.is_ttl_table()) { + ret = process_dml_op(new_entity, affected_rows); + } else { + ret = process_dml_op(new_entity, affected_rows); + } + return ret; + } private: common::ObArenaAllocator allocator_; table::ObTableCtx tb_ctx_; diff --git a/src/observer/table/ob_table_query_common.cpp b/src/observer/table/ob_table_query_common.cpp index 11710fddf..0ab78cb0e 100644 --- a/src/observer/table/ob_table_query_common.cpp +++ b/src/observer/table/ob_table_query_common.cpp @@ -50,6 +50,7 @@ int ObTableQueryUtils::check_htable_query_args(const ObTableQuery &query, LOG_WARN("htable scan should not set Offset and Limit", K(ret), K(query)); } else if (ObQueryFlag::Forward != query.get_scan_order() && ObQueryFlag::Reverse != query.get_scan_order()) { ret = OB_NOT_SUPPORTED; + LOG_USER_ERROR(OB_NOT_SUPPORTED, "scan order"); LOG_WARN("TableQuery with htable_filter only support forward and reverse scan yet", K(ret)); } } diff --git a/src/observer/table/ob_table_query_processor.cpp b/src/observer/table/ob_table_query_processor.cpp index 1c76ba53c..295420fba 100644 --- a/src/observer/table/ob_table_query_processor.cpp +++ b/src/observer/table/ob_table_query_processor.cpp @@ -55,6 +55,7 @@ int ObTableQueryP::check_arg() } else if (!(arg_.consistency_level_ == ObTableConsistencyLevel::STRONG || arg_.consistency_level_ == ObTableConsistencyLevel::EVENTUAL)) { ret = OB_NOT_SUPPORTED; + LOG_USER_ERROR(OB_NOT_SUPPORTED, "consistency level"); LOG_WARN("some options not supported yet", K(ret), "consistency_level", arg_.consistency_level_); } diff --git a/src/observer/table/ob_table_query_sync_processor.cpp b/src/observer/table/ob_table_query_sync_processor.cpp index 4f2f980d3..d6f8507b7 100644 --- a/src/observer/table/ob_table_query_sync_processor.cpp +++ b/src/observer/table/ob_table_query_sync_processor.cpp @@ -296,6 +296,7 @@ int ObTableQuerySyncP::check_arg() } else if (!(arg_.consistency_level_ == ObTableConsistencyLevel::STRONG || arg_.consistency_level_ == ObTableConsistencyLevel::EVENTUAL)) { ret = OB_NOT_SUPPORTED; + LOG_USER_ERROR(OB_NOT_SUPPORTED, "consistency level"); LOG_WARN("some options not supported yet", K(ret), "consistency_level", arg_.consistency_level_); } return ret; diff --git a/src/observer/table/ob_table_rpc_processor.cpp b/src/observer/table/ob_table_rpc_processor.cpp index f88904a39..e4c0dd863 100644 --- a/src/observer/table/ob_table_rpc_processor.cpp +++ b/src/observer/table/ob_table_rpc_processor.cpp @@ -132,6 +132,10 @@ int ObTableLoginP::get_ids() if (OB_FAIL(gctx_.schema_service_->get_tenant_schema_guard(OB_SYS_TENANT_ID, guard))) { LOG_WARN("get_schema_guard failed", K(ret)); } else if (OB_FAIL(guard.get_tenant_id(arg_.tenant_name_, result_.tenant_id_))) { + if (ret == OB_ERR_INVALID_TENANT_NAME) { + ret = OB_TENANT_NOT_EXIST; + LOG_USER_ERROR(OB_TENANT_NOT_EXIST, arg_.tenant_name_.length(), arg_.tenant_name_.ptr()); + } LOG_WARN("get_tenant_id failed", K(ret), "tenant", arg_.tenant_name_); } else if (OB_INVALID_ID == result_.tenant_id_) { ret = OB_ERR_INVALID_TENANT_NAME; @@ -142,14 +146,19 @@ int ObTableLoginP::get_ids() result_.database_id_))) { LOG_WARN("failed to get database id", K(ret), "database", arg_.database_name_); } else if (OB_INVALID_ID == result_.database_id_) { - ret = OB_WRONG_DB_NAME; + ret = OB_ERR_BAD_DATABASE; + LOG_USER_ERROR(OB_ERR_BAD_DATABASE, arg_.database_name_.length(), arg_.database_name_.ptr()); LOG_WARN("failed to get database id", K(ret), "database", arg_.database_name_); } else if (OB_FAIL(guard.get_user_id(result_.tenant_id_, arg_.user_name_, ObString::make_string("%")/*assume there is no specific host*/, result_.user_id_))) { + if (ret == OB_ERR_USER_NOT_EXIST) { + LOG_USER_ERROR(OB_ERR_USER_NOT_EXIST); + } LOG_WARN("failed to get user id", K(ret), "user", arg_.user_name_); } else if (OB_INVALID_ID == result_.user_id_) { ret = OB_ERR_USER_NOT_EXIST; + LOG_USER_ERROR(OB_ERR_USER_NOT_EXIST); LOG_WARN("failed to get user id", K(ret), "user", arg_.user_name_); } } @@ -183,6 +192,9 @@ int ObTableLoginP::verify_password(const ObString &tenant, const ObString &user, } else if (gctx_.schema_service_->get_tenant_schema_guard(tenant_id, guard)) { LOG_WARN("fail to get tenant guard", KR(ret), K(tenant_id)); } else if (OB_FAIL(guard.check_user_access(login_info, session_priv, ssl_st, user_info))) { + if (ret == OB_PASSWORD_WRONG) { + LOG_USER_ERROR(OB_PASSWORD_WRONG, user.length(), user.ptr(), tenant.length(), tenant.ptr(), "YES"/*using password*/); + } LOG_WARN("User access denied", K(login_info), K(ret)); } else if (OB_ISNULL(user_info)) { ret = OB_ERR_UNEXPECTED; @@ -273,7 +285,12 @@ int ObTableApiProcessorBase::check_user_access(const ObString &credential_str) } else if (OB_FAIL(guard.get_credential(sess_credetial))) { LOG_WARN("fail to get credential", K(ret)); } else if (sess_credetial->hash_val_ != credential_.hash_val_) { - ret = OB_ERR_NO_PRIVILEGE; + ret = OB_KV_CREDENTIAL_NOT_MATCH; + char user_cred_info[128]; + char sess_cred_info[128]; + int user_len = credential_.to_string(user_cred_info, 128); + int sess_len = sess_credetial->to_string(sess_cred_info, 128); + LOG_USER_ERROR(OB_KV_CREDENTIAL_NOT_MATCH, user_len, user_cred_info, sess_len, sess_cred_info); LOG_WARN("invalid credential", K(ret), K_(credential), K(*sess_credetial)); } else if (sess_credetial->cluster_id_ != credential_.cluster_id_) { ret = OB_ERR_NO_PRIVILEGE; @@ -446,6 +463,7 @@ int ObTableApiProcessorBase::start_trans_(bool is_readonly, bool strong_read = ObTableConsistencyLevel::STRONG == consistency_level; if ((!is_readonly) && (ObTableConsistencyLevel::EVENTUAL == consistency_level)) { ret = OB_NOT_SUPPORTED; + LOG_USER_ERROR(OB_NOT_SUPPORTED, "consistency level"); LOG_WARN("some options not supported yet", K(ret), K(is_readonly), K(consistency_level)); } transaction::ObTransService *txs = MTL(transaction::ObTransService*); diff --git a/src/observer/table/ob_table_session_pool.cpp b/src/observer/table/ob_table_session_pool.cpp index 916fabe33..d24493e9d 100644 --- a/src/observer/table/ob_table_session_pool.cpp +++ b/src/observer/table/ob_table_session_pool.cpp @@ -97,8 +97,9 @@ void ObTableApiSessPoolMgr::destroy() pool_->destroy(); pool_ = nullptr; } - allocator_.reset(); + allocator_.reset(); // when mtl_destroy, all worker thread has beed existed, no need to lock allocator is_inited_ = false; + LOG_INFO("ObTableApiSessPoolMgr destroy successfully"); } } @@ -128,6 +129,7 @@ int ObTableApiSessPoolMgr::get_sess_info(ObTableApiCredential &credential, ObTab if (credential.tenant_id_ != MTL_ID()) { ret = OB_NOT_SUPPORTED; + LOG_USER_ERROR(OB_NOT_SUPPORTED, "access wrong tenant"); LOG_WARN("access wrong tenant", K(ret), K(credential.tenant_id_), K(MTL_ID())); } else if (OB_UNLIKELY(OB_ISNULL(pool_)) && OB_FAIL(create_session_pool_safe())) { LOG_WARN("fail to create session pool", K(ret), K(credential)); @@ -315,8 +317,9 @@ void ObTableApiSessPool::destroy() retired_nodes_.clear(); key_node_map_.destroy(); - allocator_.reset(); + allocator_.reset(); // when mtl_destroy, all worker thread has beed existed, no need to lock allocator is_inited_ = false; + LOG_INFO("ObTableApiSessPool destroy successfully", K(MTL_ID())); } /* @@ -357,7 +360,7 @@ int ObTableApiSessPool::move_node_to_retired_list(ObTableApiSessNode *node) { int ret = OB_SUCCESS; - ObLockGuard guard(lock_); + ObLockGuard guard(retired_nodes_lock_); // lock retired_nodes_ if (OB_ISNULL(node)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("session node is null", K(ret)); @@ -379,17 +382,22 @@ int ObTableApiSessPool::move_node_to_retired_list(ObTableApiSessNode *node) int ObTableApiSessPool::evict_retired_sess() { int ret = OB_SUCCESS; - int64 delete_count = 0; + int64_t delete_count = 0; + int64_t cur_time = ObTimeUtility::current_time(); + ObLockGuard guard(retired_nodes_lock_); // lock retired_nodes_ DLIST_FOREACH_REMOVESAFE_X(node, retired_nodes_, delete_count < BACKCROUND_TASK_DELETE_SESS_NUM) { - if (OB_FAIL(node->remove_unused_sess())) { + if (cur_time - node->get_last_active_ts() < SESS_RETIRE_TIME) { + // do nothing, this node maybe is from ObTableApiSessNodeReplaceOp, some threads maybe is using it. + // we remove it next retire task. + } else if (OB_FAIL(node->remove_unused_sess())) { LOG_WARN("fail to remove unused sess", K(ret), K(*node)); } else { - ObLockGuard guard(lock_); if (node->is_empty()) { ObTableApiSessNode *rm_node = retired_nodes_.remove(node); if (OB_NOT_NULL(rm_node)) { rm_node->~ObTableApiSessNode(); + ObLockGuard alloc_guard(allocator_lock_); // lock allocator_ allocator_.free(rm_node); rm_node = nullptr; delete_count++; @@ -398,6 +406,10 @@ int ObTableApiSessPool::evict_retired_sess() } } + if (delete_count != 0) { + LOG_INFO("evict retired session node", K(delete_count), K(retired_nodes_.get_size())); + } + return ret; } @@ -491,7 +503,7 @@ int ObTableApiSessPool::get_sess_info(ObTableApiCredential &credential, ObTableA int ObTableApiSessPool::create_node_safe(ObTableApiCredential &credential, ObTableApiSessNode *&node) { int ret = OB_SUCCESS; - ObLockGuard guard(lock_); + ObLockGuard alloc_guard(allocator_lock_); // lock allocator_ ObTableApiSessNode *tmp_node = nullptr; void *buf = nullptr; @@ -521,7 +533,7 @@ int ObTableApiSessPool::create_and_add_node_safe(ObTableApiCredential &credentia ret = OB_SUCCESS; // replace error code } // this node has been set by other thread, free it - ObLockGuard guard(lock_); + ObLockGuard alloc_guard(allocator_lock_); // lock allocator_ node->~ObTableApiSessNode(); allocator_.free(node); node = nullptr; @@ -573,6 +585,7 @@ void ObTableApiSessNodeVal::destroy() sess_info_.~ObSQLSessionInfo(); is_inited_ = false; owner_node_ = nullptr; + tenant_id_ = OB_INVALID; } int ObTableApiSessNodeVal::init_sess_info() @@ -582,18 +595,18 @@ int ObTableApiSessNodeVal::init_sess_info() if (!is_inited_) { share::schema::ObSchemaGetterGuard schema_guard; const ObTenantSchema *tenant_schema = nullptr; - if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(MTL_ID(), schema_guard))) { - LOG_WARN("fail to get schema guard", K(ret), K(MTL_ID())); - } else if (OB_FAIL(schema_guard.get_tenant_info(MTL_ID(), tenant_schema))) { - LOG_WARN("fail to get tenant schema", K(ret), K(MTL_ID())); + if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(tenant_id_, schema_guard))) { + LOG_WARN("fail to get schema guard", K(ret), K_(tenant_id)); + } else if (OB_FAIL(schema_guard.get_tenant_info(tenant_id_, tenant_schema))) { + LOG_WARN("fail to get tenant schema", K(ret), K_(tenant_id)); } else if (OB_ISNULL(tenant_schema)) { ret = OB_SCHEMA_ERROR; LOG_WARN("tenant schema is null", K(ret)); - } else if (OB_FAIL(ObTableApiSessUtil::init_sess_info(MTL_ID(), + } else if (OB_FAIL(ObTableApiSessUtil::init_sess_info(tenant_id_, tenant_schema->get_tenant_name_str(), schema_guard, sess_info_))) { - LOG_WARN("fail to init sess info", K(ret), K(MTL_ID())); + LOG_WARN("fail to init sess info", K(ret), K_(tenant_id)); } else { is_inited_ = true; } @@ -640,6 +653,7 @@ void ObTableApiSessNode::destroy() rm_sess->destroy(); } } + ObLockGuard alloc_guard(allocator_lock_); // lock allocator_ allocator_.reset(); } @@ -656,6 +670,7 @@ int ObTableApiSessNode::remove_unused_sess() ObTableApiSessNodeVal *rm_sess = free_list.remove(sess); if (OB_NOT_NULL(rm_sess)) { rm_sess->~ObTableApiSessNodeVal(); + ObLockGuard alloc_guard(allocator_lock_); // lock allocator_ allocator_.free(rm_sess); rm_sess = nullptr; } @@ -701,14 +716,14 @@ int ObTableApiSessNode::get_sess_node_val(ObTableApiSessNodeVal *&val) int ObTableApiSessNode::extend_and_get_sess_val(ObTableApiSessGuard &guard) { int ret = OB_SUCCESS; - - ObLockGuard alloc_guard(lock_); // avoid concurrent allocator_.alloc + ObLockGuard alloc_guard(allocator_lock_); // lock allocator_ void *buf = nullptr; + if (OB_ISNULL(buf = allocator_.alloc(sizeof(ObTableApiSessNodeVal)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to alloc mem for ObTableApiSessNodeVal", K(ret), K(sizeof(ObTableApiSessNodeVal))); } else { - ObTableApiSessNodeVal *val = new (buf) ObTableApiSessNodeVal(this); + ObTableApiSessNodeVal *val = new (buf) ObTableApiSessNodeVal(this, credential_.tenant_id_); if (OB_FAIL(val->init_sess_info())) { LOG_WARN("fail to init sess info", K(ret), K(*val)); } else { diff --git a/src/observer/table/ob_table_session_pool.h b/src/observer/table/ob_table_session_pool.h index 9c9ea19b8..4aaf32d37 100644 --- a/src/observer/table/ob_table_session_pool.h +++ b/src/observer/table/ob_table_session_pool.h @@ -75,7 +75,7 @@ private: common::ObArenaAllocator allocator_; ObTableApiSessPool *pool_; ObTableApiSessEliminationTask elimination_task_; - ObSpinLock lock_; // for get_or_create_sess_pool + ObSpinLock lock_; // for double check pool creating private: DISALLOW_COPY_AND_ASSIGN(ObTableApiSessPoolMgr); }; @@ -106,18 +106,20 @@ public: int evict_retired_sess(); int create_node_safe(ObTableApiCredential &credential, ObTableApiSessNode *&node); int move_node_to_retired_list(ObTableApiSessNode *node); + common::ObIAllocator& get_allocator() { return allocator_; }; private: int replace_sess_node_safe(ObTableApiCredential &credential); int create_and_add_node_safe(ObTableApiCredential &credential); int get_sess_node(uint64_t key, ObTableApiSessNode *&node); private: common::ObArenaAllocator allocator_; + ObSpinLock allocator_lock_; // for lock allocator_ bool is_inited_; CacheKeyNodeMap key_node_map_; // 已经淘汰的node,等待被后台删除 // 前台login时、后台淘汰时都会操作retired_nodes_,因此需要加锁 common::ObDList retired_nodes_; - ObSpinLock lock_;; // for lock retired_nodes_/allocator_ + ObSpinLock retired_nodes_lock_; // for lock retired_nodes_ private: DISALLOW_COPY_AND_ASSIGN(ObTableApiSessPool); }; @@ -127,8 +129,10 @@ class ObTableApiSessNodeVal : public common::ObDLinkBase friend class ObTableApiSessNode; friend class ObTableApiSessGuard; public: - explicit ObTableApiSessNodeVal(ObTableApiSessNode *owner) + explicit ObTableApiSessNodeVal(ObTableApiSessNode *owner, uint64_t tenant_id) : is_inited_(false), + tenant_id_(tenant_id), + sess_info_(tenant_id_), // sess_info_ use 500 tenant default, so we must set tenant_id owner_node_(owner) {} TO_STRING_KV(K_(is_inited), @@ -144,6 +148,7 @@ public: void give_back_to_free_list(); private: bool is_inited_; + uint64_t tenant_id_; sql::ObSQLSessionInfo sess_info_; ObTableApiSessNode *owner_node_; private: @@ -191,10 +196,10 @@ private: int extend_and_get_sess_val(ObTableApiSessGuard &guard); private: common::ObArenaAllocator allocator_; + ObSpinLock allocator_lock_; // for lock allocator_ SessList sess_lists_; int64_t last_active_ts_; ObTableApiCredential credential_; - ObSpinLock lock_;; // for lock allocator_ private: DISALLOW_COPY_AND_ASSIGN(ObTableApiSessNode); }; diff --git a/src/share/object/ob_obj_cast.cpp b/src/share/object/ob_obj_cast.cpp index 939e57062..fa655c39c 100644 --- a/src/share/object/ob_obj_cast.cpp +++ b/src/share/object/ob_obj_cast.cpp @@ -13271,11 +13271,16 @@ int datetime_scale_check_only(const ObAccuracy &accuracy, const ObObj &obj) if (OB_UNLIKELY(scale > MAX_SCALE_FOR_TEMPORAL)) { ret = OB_ERR_TOO_BIG_PRECISION; LOG_USER_ERROR(OB_ERR_TOO_BIG_PRECISION, scale, "CAST", static_cast(MAX_SCALE_FOR_TEMPORAL)); - } else if (OB_UNLIKELY(0 <= scale && scale < MAX_SCALE_FOR_TEMPORAL)) { + } else { int64_t value = obj.get_datetime(); - if (!ObTimeConverter::is_valid_datetime(value)) { - ret = OB_INVALID_DATA; - LOG_WARN("invalid datetime value", K(ret), K(value)); + ObCastMode cast_mode = CM_ERROR_ON_SCALE_OVER; + if (OB_FAIL(time_usec_scale_check(cast_mode, accuracy, value))) { + LOG_WARN("check usec scale fail.", K(ret), K(value), K(accuracy)); + } else if (OB_UNLIKELY(0 <= scale && scale < MAX_SCALE_FOR_TEMPORAL)) { + if (!ObTimeConverter::is_valid_datetime(value)) { + ret = OB_INVALID_DATA; + LOG_WARN("invalid datetime value", K(ret), K(value)); + } } } return ret; diff --git a/src/share/table/ob_table.cpp b/src/share/table/ob_table.cpp index ee046e9b4..b2f7783e5 100644 --- a/src/share/table/ob_table.cpp +++ b/src/share/table/ob_table.cpp @@ -14,7 +14,6 @@ #include "ob_table.h" #include "share/ob_errno.h" #include "lib/utility/ob_unify_serialize.h" -#include "common/row/ob_row.h" #include "rpc/obrpc/ob_rpc_packet.h" #include "sql/engine/expr/ob_expr_lob_utils.h" @@ -1471,44 +1470,65 @@ int ObTableQueryResult::alloc_buf_if_need(const int64_t need_size) int ObTableQueryResult::add_row(const ObNewRow &row) { int ret = OB_SUCCESS; - ret = alloc_buf_if_need(row.get_serialize_size()); + + // 1. check properties count and row count const int64_t N = row.get_count(); - if (OB_SUCC(ret)) { - if (0 != properties_names_.count() - && N != properties_names_.count()) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("cell count not match with property count", K(ret), K(N), - "properties_count", properties_names_.count()); - } - } - for (int i = 0; OB_SUCCESS == ret && i < N; ++i) - { - // Output of TableApi does not have lob locator header, remove lob header before serialize. - // Functions defined by DEF_TEXT_SERIALIZE_FUNCS is called here, refer to ob_obj_funcs.h - ObObjType type = row.get_cell(i).get_type(); - if (is_lob_storage(type)) { - ObObj tmp_obj = row.get_cell(i); - ObString read_data; - if (tmp_obj.has_lob_header()) { - if (OB_FAIL(sql::ObTextStringHelper::read_real_string_data(&allocator_, tmp_obj, read_data))) { - LOG_WARN("failed to get obj", K(ret), K_(buf)); - } else { - tmp_obj.set_lob_value(type, read_data.ptr(), read_data.length()); + if (0 != properties_names_.count() + && N != properties_names_.count()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("cell count not match with property count", K(ret), K(N), + "properties_count", properties_names_.count()); + } else { + // 2. construct new row + ObNewRow new_row = row; + ObObj *lob_cells = nullptr; + const int64_t lob_storage_count = get_lob_storage_count(new_row); + if (lob_storage_count != 0) { + if (OB_ISNULL(lob_cells = static_cast(allocator_.alloc(sizeof(ObObj) * lob_storage_count)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("fail to alloc cells buffer", K(ret), K(lob_storage_count)); + } + for (int i = 0, lob_cell_idx = 0; OB_SUCC(ret) && i < N; ++i) { + const ObObj &cell = new_row.get_cell(i); + ObObjType type = cell.get_type(); + if (is_lob_storage(type)) { + ObString real_data; + if (cell.has_lob_header()) { + if (OB_FAIL(sql::ObTextStringHelper::read_real_string_data(&allocator_, cell, real_data))) { + LOG_WARN("fail to read real string date", K(ret), K(cell)); + } else if (lob_cell_idx >= lob_storage_count) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected index count", K(ret), K(lob_cell_idx), K(lob_storage_count)); + } else { + lob_cells[lob_cell_idx].set_lob_value(type, real_data.ptr(), real_data.length()); + lob_cells[lob_cell_idx].set_collation_type(cell.get_collation_type()); + new_row.get_cell(i) = lob_cells[lob_cell_idx]; // switch lob cell + lob_cell_idx++; + } + } } } - if (OB_FAIL(ret)) { - } else if (OB_FAIL(tmp_obj.serialize(buf_.get_data(), buf_.get_capacity(), buf_.get_position()))) { - LOG_WARN("failed to serialize obj", K(ret), K_(buf)); - } - } else { + } + + // 3. alloc serialize size + int64_t size = new_row.get_serialize_size(); + if (OB_FAIL(ret)) { + } else if (OB_FAIL(alloc_buf_if_need(size))) { + LOG_WARN("failed to alloc buff", K(ret), K(size)); + } + + // 4. serialize + for (int i = 0; OB_SUCC(ret) && i < N; ++i) { if (OB_FAIL(row.get_cell(i).serialize(buf_.get_data(), buf_.get_capacity(), buf_.get_position()))) { LOG_WARN("failed to serialize obj", K(ret), K_(buf)); } } - } // end for + } + if (OB_SUCC(ret)) { ++row_count_; } + return ret; } diff --git a/src/share/table/ob_table.h b/src/share/table/ob_table.h index eddb4f4e4..27091ee73 100644 --- a/src/share/table/ob_table.h +++ b/src/share/table/ob_table.h @@ -29,6 +29,8 @@ #include "share/table/ob_table_ttl_common.h" #include "common/rowkey/ob_rowkey.h" #include "common/ob_role.h" +#include "common/row/ob_row.h" +#include "lib/oblog/ob_warning_buffer.h" namespace oceanbase { namespace common @@ -380,6 +382,16 @@ public: msg_[0] = '\0'; } ~ObTableResult() = default; + void set_err(int err) + { + errno_ = err; + if (err != common::OB_SUCCESS) { + common::ObWarningBuffer *wb = common::ob_get_tsi_warning_buffer(); + if (OB_NOT_NULL(wb)) { + (void)snprintf(msg_, common::OB_MAX_ERROR_MSG_LEN, "%s", wb->get_err_msg()); + } + } + } void set_errno(int err) { errno_ = err; } int get_errno() const { return errno_; } int assign(const ObTableResult &other); @@ -879,6 +891,16 @@ public: private: static const int64_t DEFAULT_BUF_BLOCK_SIZE = common::OB_MALLOC_BIG_BLOCK_SIZE - (1024*1024LL); int alloc_buf_if_need(const int64_t size); + OB_INLINE int64_t get_lob_storage_count(const common::ObNewRow &row) const + { + int64_t count = 0; + for (int64_t i = 0; i < row.get_count(); ++i) { + if (is_lob_storage(row.get_cell(i).get_type())) { + count++; + } + } + return count; + } private: common::ObSEArray properties_names_; // serialize int64_t row_count_; // serialize diff --git a/unittest/observer/table/test_create_executor.cpp b/unittest/observer/table/test_create_executor.cpp index 49867ed04..1e294f215 100644 --- a/unittest/observer/table/test_create_executor.cpp +++ b/unittest/observer/table/test_create_executor.cpp @@ -164,7 +164,7 @@ void TestCreateExecutor::TearDown() { } -ObTableApiSessNodeVal g_sess_node_val(NULL); +ObTableApiSessNodeVal g_sess_node_val(NULL, 500); void TestCreateExecutor::fake_ctx_init_common(ObTableCtx &fake_ctx, ObTableSchema *table_schema) { fake_ctx.table_schema_ = table_schema; @@ -459,21 +459,22 @@ TEST_F(TestCreateExecutor, cons_column_type) col_schema.set_accuracy(acc); uint32_t res_flag = ObRawExprUtils::calc_column_result_flag(col_schema); - ObExprResType column_type; + ObTableColumnInfo column_info; ObTableCtx fake_ctx(allocator_); schema_service_.get_schema_guard(fake_ctx.schema_guard_, 1); fake_ctx_init_common(fake_ctx, &table_schema_); - ASSERT_EQ(OB_SUCCESS, fake_ctx.cons_column_type(col_schema, column_type)); - ASSERT_EQ(ObVarcharType, column_type.get_type()); - ASSERT_EQ(res_flag, column_type.get_result_flag()); - ASSERT_EQ(CS_TYPE_UTF8MB4_GENERAL_CI, column_type.get_collation_type()); - ASSERT_EQ(CS_LEVEL_IMPLICIT, column_type.get_collation_level()); - ASSERT_EQ(1, column_type.get_accuracy().get_length()); + ASSERT_EQ(OB_SUCCESS, fake_ctx.cons_column_info(col_schema, column_info)); + ASSERT_EQ(ObVarcharType, column_info.type_.get_type()); + ASSERT_EQ(res_flag, column_info.type_.get_result_flag()); + ASSERT_EQ(CS_TYPE_UTF8MB4_GENERAL_CI, column_info.type_.get_collation_type()); + ASSERT_EQ(CS_LEVEL_IMPLICIT, column_info.type_.get_collation_level()); + ASSERT_EQ(1, column_info.type_.get_accuracy().get_length()); } TEST_F(TestCreateExecutor, check_column_type) { - ObExprResType column_type; + ObTableColumnInfo column_info; + column_info.type_.set_result_flag(NOT_NULL_WRITE_FLAG); ObObj obj; uint32_t res_flag = 0; ObTableCtx fake_ctx(allocator_); @@ -483,22 +484,23 @@ TEST_F(TestCreateExecutor, check_column_type) // check nullable obj.set_null(); res_flag |= NOT_NULL_FLAG; - column_type.set_result_flag(res_flag); - ASSERT_EQ(OB_BAD_NULL_ERROR, fake_ctx.adjust_column_type(column_type, obj)); + column_info.type_.set_result_flag(res_flag); + column_info.is_nullable_ = false; + ASSERT_EQ(OB_BAD_NULL_ERROR, fake_ctx.adjust_column_type(column_info, obj)); // check data type mismatch res_flag = 0; obj.set_int(1); - column_type.set_result_flag(res_flag); - column_type.set_type(ObVarcharType); - ASSERT_EQ(OB_OBJ_TYPE_ERROR, fake_ctx.adjust_column_type(column_type, obj)); + column_info.type_.set_result_flag(res_flag); + column_info.type_.set_type(ObVarcharType); + ASSERT_EQ(OB_KV_COLUMN_TYPE_NOT_MATCH, fake_ctx.adjust_column_type(column_info, obj)); // check collation obj.set_binary("ttt"); - column_type.set_collation_type(CS_TYPE_UTF8MB4_GENERAL_CI); - ASSERT_EQ(OB_ERR_COLLATION_MISMATCH, fake_ctx.adjust_column_type(column_type, obj)); + column_info.type_.set_collation_type(CS_TYPE_UTF8MB4_GENERAL_CI); + ASSERT_EQ(OB_KV_COLLATION_MISMATCH, fake_ctx.adjust_column_type(column_info, obj)); // collation convert obj.set_varchar("test"); obj.set_collation_type(CS_TYPE_UTF8MB4_BIN); - ASSERT_EQ(OB_SUCCESS, fake_ctx.adjust_column_type(column_type, obj)); + ASSERT_EQ(OB_SUCCESS, fake_ctx.adjust_column_type(column_info, obj)); ASSERT_EQ(CS_TYPE_UTF8MB4_GENERAL_CI, obj.get_collation_type()); } diff --git a/unittest/observer/table/test_table_sess_pool.cpp b/unittest/observer/table/test_table_sess_pool.cpp index 4af3784a4..1f4190ecd 100644 --- a/unittest/observer/table/test_table_sess_pool.cpp +++ b/unittest/observer/table/test_table_sess_pool.cpp @@ -102,7 +102,7 @@ TEST_F(TestTableSessPool, test_node_init) TEST_F(TestTableSessPool, test_node_val_init) { ObTableApiSessNode node(*mock_cred_); - ObTableApiSessNodeVal val(&node); + ObTableApiSessNodeVal val(&node, MTL_ID()); ASSERT_FALSE(val.is_inited_); ASSERT_EQ(&node, val.owner_node_); } @@ -132,7 +132,7 @@ TEST_F(TestTableSessPool, mgr_get_session) ASSERT_NE(0, node->last_active_ts_); // add mock val to node - ObTableApiSessNodeVal val(node); + ObTableApiSessNodeVal val(node, MTL_ID()); val.is_inited_ = true; ASSERT_EQ(true, node->sess_lists_.free_list_.add_last(&val)); @@ -217,7 +217,7 @@ TEST_F(TestTableSessPool, mgr_sess_recycle) // add mock val to node ObTableApiSessNode *node; ASSERT_EQ(OB_SUCCESS, mgr->pool_->get_sess_node(mock_cred_->hash_val_, node)); - ObTableApiSessNodeVal val(node); + ObTableApiSessNodeVal val(node, MTL_ID()); val.is_inited_ = true; ASSERT_EQ(true, node->sess_lists_.free_list_.add_last(&val));