[CP] [CP] [Bugfix] cherry-pick bugfix from 421 for obkv

This commit is contained in:
WeiXinChan 2024-01-11 12:14:28 +00:00 committed by ob-robot
parent bc445d6721
commit 6716958634
33 changed files with 663 additions and 247 deletions

View File

@ -714,7 +714,7 @@ TEST_F(TestBatchExecute, column_type_check)
ASSERT_EQ(OB_SUCCESS, entity->set_property(CTINYINT, value));
ObTableOperation table_operation = ObTableOperation::insert(*entity);
ASSERT_EQ(OB_SUCCESS, the_table->execute(table_operation, r));
ASSERT_EQ(OB_ERR_COLLATION_MISMATCH, r.get_errno());
ASSERT_EQ(OB_KV_COLLATION_MISMATCH, r.get_errno());
}
{
// case: insert + rowkey + int
@ -728,7 +728,7 @@ TEST_F(TestBatchExecute, column_type_check)
ASSERT_EQ(OB_SUCCESS, entity->set_property(CTINYINT, value));
ObTableOperation table_operation = ObTableOperation::insert(*entity);
ASSERT_EQ(OB_SUCCESS, the_table->execute(table_operation, r));
ASSERT_EQ(OB_OBJ_TYPE_ERROR, r.get_errno());
ASSERT_EQ(OB_KV_COLUMN_TYPE_NOT_MATCH, r.get_errno());
}
{
// case: insert + rowkey + too long
@ -845,6 +845,7 @@ TEST_F(TestBatchExecute, column_type_check)
ASSERT_EQ(OB_SUCCESS, entity->set_property(ObString::make_string("cblob"), value));
int64_t now = ObTimeUtility::current_time();
ObTimeConverter::round_datetime(0, now);
value.set_timestamp(now);
ASSERT_EQ(OB_SUCCESS, entity->set_property(ObString::make_string("ctimestamp"), value));
value.set_datetime(now);
@ -877,7 +878,7 @@ TEST_F(TestBatchExecute, column_type_check)
ASSERT_EQ(OB_SUCCESS, entity->add_rowkey_value(pk2));
ObTableOperation table_operation = ObTableOperation::retrieve(*entity);
ASSERT_EQ(OB_SUCCESS, the_table->execute(table_operation, r));
ASSERT_EQ(OB_ERR_COLLATION_MISMATCH, r.get_errno());
ASSERT_EQ(OB_KV_COLLATION_MISMATCH, r.get_errno());
}
{
// case: replace + rowkey + collation
@ -891,7 +892,7 @@ TEST_F(TestBatchExecute, column_type_check)
ASSERT_EQ(OB_SUCCESS, entity->set_property(CTINYINT, value));
ObTableOperation table_operation = ObTableOperation::replace(*entity);
ASSERT_EQ(OB_SUCCESS, the_table->execute(table_operation, r));
ASSERT_EQ(OB_ERR_COLLATION_MISMATCH, r.get_errno());
ASSERT_EQ(OB_KV_COLLATION_MISMATCH, r.get_errno());
}
{
@ -921,7 +922,7 @@ TEST_F(TestBatchExecute, column_type_check)
ASSERT_EQ(OB_SUCCESS, entity->set_property(CTINYINT, value));
ObTableOperation table_operation = ObTableOperation::insert_or_update(*entity);
ASSERT_EQ(OB_SUCCESS, the_table->execute(table_operation, r));
ASSERT_EQ(OB_ERR_COLLATION_MISMATCH, r.get_errno());
ASSERT_EQ(OB_KV_COLLATION_MISMATCH, r.get_errno());
}
{
@ -951,7 +952,7 @@ TEST_F(TestBatchExecute, column_type_check)
ASSERT_EQ(OB_SUCCESS, entity->set_property(CTINYINT, value));
ObTableOperation table_operation = ObTableOperation::del(*entity);
ASSERT_EQ(OB_SUCCESS, the_table->execute(table_operation, r));
ASSERT_EQ(OB_ERR_COLLATION_MISMATCH, r.get_errno());
ASSERT_EQ(OB_KV_COLLATION_MISMATCH, r.get_errno());
}
{
// case: update + rowkey + collation
@ -965,7 +966,7 @@ TEST_F(TestBatchExecute, column_type_check)
ASSERT_EQ(OB_SUCCESS, entity->set_property(CTINYINT, value));
ObTableOperation table_operation = ObTableOperation::update(*entity);
ASSERT_EQ(OB_SUCCESS, the_table->execute(table_operation, r));
ASSERT_EQ(OB_ERR_COLLATION_MISMATCH, r.get_errno());
ASSERT_EQ(OB_KV_COLLATION_MISMATCH, r.get_errno());
}
{
@ -5096,7 +5097,7 @@ TEST_F(TestBatchExecute, check_scan_range)
ASSERT_EQ(OB_SUCCESS, query.add_scan_range(range));
ASSERT_EQ(OB_SUCCESS, query.set_scan_index(ObString::make_string("primary")));
ASSERT_EQ(OB_ERR_UNEXPECTED, the_table->execute_query(query, iter)); // wrong rowkey size
ASSERT_EQ(OB_KV_SCAN_RANGE_MISSING, the_table->execute_query(query, iter)); // wrong rowkey size
}
// case 2: scan by primary key, but key objs type is invalid
@ -5119,7 +5120,7 @@ TEST_F(TestBatchExecute, check_scan_range)
ASSERT_EQ(OB_SUCCESS, query.add_scan_range(range));
ASSERT_EQ(OB_SUCCESS, query.set_scan_index(ObString::make_string("primary")));
ASSERT_EQ(OB_OBJ_TYPE_ERROR, the_table->execute_query(query, iter)); // wrong rowkey type
ASSERT_EQ(OB_KV_COLUMN_TYPE_NOT_MATCH, the_table->execute_query(query, iter)); // wrong rowkey type
}
// case 3: scan by primary key, but collation type is invalid
@ -5143,7 +5144,7 @@ TEST_F(TestBatchExecute, check_scan_range)
ASSERT_EQ(OB_SUCCESS, query.add_scan_range(range));
ASSERT_EQ(OB_SUCCESS, query.set_scan_index(ObString::make_string("primary")));
ASSERT_EQ(OB_ERR_COLLATION_MISMATCH, the_table->execute_query(query, iter)); // wrong collation type
ASSERT_EQ(OB_KV_COLLATION_MISMATCH, the_table->execute_query(query, iter)); // wrong collation type
}
// case 4: scan by primary key, but accuracy is invalid

View File

@ -736,6 +736,7 @@ int ObHTableRowIterator::get_next_result(ObTableQueryResult *&out_result)
ObHTableMatchCode match_code = ObHTableMatchCode::DONE_SCAN; // initialize
if (ObQueryFlag::Reverse == scan_order_ && (-1 != limit_per_row_per_cf_ || 0 != offset_per_row_per_cf_)) {
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "set limit_per_row_per_cf_ and offset_per_row_per_cf_ in reverse scan");
LOG_WARN("server don't support set limit_per_row_per_cf_ and offset_per_row_per_cf_ in reverse scan yet",
K(ret), K(scan_order_), K(limit_per_row_per_cf_), K(offset_per_row_per_cf_));
}

View File

@ -120,6 +120,7 @@ int ObHTableFilterParser::create_comparator(const SimpleString &bytes, hfilter::
//comparator = OB_NEWx(hfilter::RegexStringComparator, allocator_, comparator_value);
LOG_WARN("regexstring comparator not supported yet", K(ret));
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "regexstring comparator");
} else if (comparator_type == SUB_STRING_TYPE) {
comparator = OB_NEWx(hfilter::SubStringComparator, allocator_, comparator_value);
} else {

View File

@ -167,6 +167,7 @@ int RegexStringComparator::compare_to(const ObString &b)
{
// @todo
UNUSED(b);
LOG_USER_ERROR(OB_NOT_SUPPORTED, "regexstring comparator");
LOG_WARN_RET(OB_NOT_SUPPORTED, "regexstring comparator not supported yet");
return 0;
}

View File

@ -171,6 +171,10 @@ int ObTableAggCalculator::aggregate_sum(uint64_t idx, const ObNewRow &row)
int64_t res = 0;
if (sql::ObExprAdd::is_add_out_of_range(sum_val.get_int(), cur_value.get_int(), res)) {
ret = OB_DATA_OUT_OF_RANGE;
if (idx < agg_columns_.count()) {
int64_t row_num = 0;
LOG_USER_ERROR(OB_DATA_OUT_OF_RANGE, agg_columns_.at(idx).length(), agg_columns_.at(idx).ptr(), row_num);
}
LOG_WARN("data out of range", K(ret), K(sum_val), K(cur_value));
} else {
sum_val.set_int(res);
@ -190,6 +194,10 @@ int ObTableAggCalculator::aggregate_sum(uint64_t idx, const ObNewRow &row)
uint64_t res = 0;
if (sql::ObExprAdd::is_add_out_of_range(sum_val.get_uint64(), cur_value.get_uint64(), res)) {
ret = OB_DATA_OUT_OF_RANGE;
if (idx < agg_columns_.count()) {
int64_t row_num = 0;
LOG_USER_ERROR(OB_DATA_OUT_OF_RANGE, agg_columns_.at(idx).length(), agg_columns_.at(idx).ptr(), row_num);
}
LOG_WARN("data out of range", K(ret), K(sum_val), K(cur_value));
} else {
sum_val.set_uint64(res);
@ -206,6 +214,10 @@ int ObTableAggCalculator::aggregate_sum(uint64_t idx, const ObNewRow &row)
double res = sum_val.get_double() + (double)cur_value.get_float();
if (INFINITY == res) {
ret = OB_DATA_OUT_OF_RANGE;
if (idx < agg_columns_.count()) {
int64_t row_num = 0;
LOG_USER_ERROR(OB_DATA_OUT_OF_RANGE, agg_columns_.at(idx).length(), agg_columns_.at(idx).ptr(), row_num);
}
LOG_WARN("data out of range", K(ret), K(sum_val), K(cur_value));
} else {
sum_val.set_double(res);
@ -222,6 +234,10 @@ int ObTableAggCalculator::aggregate_sum(uint64_t idx, const ObNewRow &row)
double res = sum_val.get_double() + cur_value.get_double();
if (INFINITY == res) {
ret = OB_DATA_OUT_OF_RANGE;
if (idx < agg_columns_.count()) {
int64_t row_num = 0;
LOG_USER_ERROR(OB_DATA_OUT_OF_RANGE, agg_columns_.at(idx).length(), agg_columns_.at(idx).ptr(), row_num);
}
LOG_WARN("data out of range", K(ret), K(sum_val), K(cur_value));
} else {
sum_val.set_double(res);
@ -231,6 +247,10 @@ int ObTableAggCalculator::aggregate_sum(uint64_t idx, const ObNewRow &row)
}
default: {
ret = OB_NOT_SUPPORTED;
char err_msg[128];
const char *type_str = ob_obj_type_str(sum_type);
(void)sprintf(err_msg, "%s type for sum aggregation", type_str);
LOG_USER_ERROR(OB_NOT_SUPPORTED, err_msg);
LOG_WARN("this data type does not support aggregate sum operation", K(ret), K(sum_type));
}
}
@ -257,6 +277,10 @@ int ObTableAggCalculator::aggregate_avg(uint64_t idx, const ObNewRow &row)
double res = sum + (double)cur_value.get_int();
if (INFINITY == res) {
ret = OB_DATA_OUT_OF_RANGE;
if (idx < agg_columns_.count()) {
int64_t row_num = 0;
LOG_USER_ERROR(OB_DATA_OUT_OF_RANGE, agg_columns_.at(idx).length(), agg_columns_.at(idx).ptr(), row_num);
}
LOG_WARN("data out of range", K(ret), K(sum), K(cur_value));
} else {
sum = res;
@ -272,6 +296,10 @@ int ObTableAggCalculator::aggregate_avg(uint64_t idx, const ObNewRow &row)
double res = sum + (double)cur_value.get_uint64();
if (INFINITY == res) {
ret = OB_DATA_OUT_OF_RANGE;
if (idx < agg_columns_.count()) {
int64_t row_num = 0;
LOG_USER_ERROR(OB_DATA_OUT_OF_RANGE, agg_columns_.at(idx).length(), agg_columns_.at(idx).ptr(), row_num);
}
LOG_WARN("data out of range", K(ret), K(sum), K(cur_value));
} else {
sum = res;
@ -284,6 +312,10 @@ int ObTableAggCalculator::aggregate_avg(uint64_t idx, const ObNewRow &row)
double res = sum + (double)cur_value.get_float();
if (INFINITY == res) {
ret = OB_DATA_OUT_OF_RANGE;
if (idx < agg_columns_.count()) {
int64_t row_num = 0;
LOG_USER_ERROR(OB_DATA_OUT_OF_RANGE, agg_columns_.at(idx).length(), agg_columns_.at(idx).ptr(), row_num);
}
LOG_WARN("data out of range", K(ret), K(sum), K(cur_value));
} else {
sum = res;
@ -296,6 +328,10 @@ int ObTableAggCalculator::aggregate_avg(uint64_t idx, const ObNewRow &row)
double res = sum + cur_value.get_double();
if (INFINITY == res) {
ret = OB_DATA_OUT_OF_RANGE;
if (idx < agg_columns_.count()) {
int64_t row_num = 0;
LOG_USER_ERROR(OB_DATA_OUT_OF_RANGE, agg_columns_.at(idx).length(), agg_columns_.at(idx).ptr(), row_num);
}
LOG_WARN("data out of range", K(ret), K(sum), K(cur_value));
} else {
sum = res;
@ -304,6 +340,10 @@ int ObTableAggCalculator::aggregate_avg(uint64_t idx, const ObNewRow &row)
}
default: {
ret = OB_NOT_SUPPORTED;
char err_msg[128];
const char *type_str = ob_obj_type_str(avg_type);
(void)sprintf(err_msg, "%s type for avg aggregation", type_str);
LOG_USER_ERROR(OB_NOT_SUPPORTED, err_msg);
LOG_WARN("this data type does not support aggregate avg operation", K(ret), K(avg_type));
}
}
@ -352,6 +392,7 @@ int ObTableAggCalculator::aggregate(const ObNewRow &row) {
}
default: {
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "invalid aggregation type");
LOG_WARN("unexpected agg type", K(ret), K(agg_type));
break;
}

View File

@ -67,6 +67,7 @@ int ObTableBatchExecuteP::check_arg()
if (!(arg_.consistency_level_ == ObTableConsistencyLevel::STRONG ||
arg_.consistency_level_ == ObTableConsistencyLevel::EVENTUAL)) {
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "invalid consistency level");
LOG_WARN("some options not supported yet", K(ret),
"consistency_level", arg_.consistency_level_);
}
@ -79,6 +80,7 @@ int ObTableBatchExecuteP::check_arg2() const
if (arg_.returning_rowkey()
|| arg_.returning_affected_entity()) {
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "returning rowkey or returning affected entity");
LOG_WARN("some options not supported yet", K(ret),
"returning_rowkey", arg_.returning_rowkey(),
"returning_affected_entity", arg_.returning_affected_entity());
@ -182,6 +184,7 @@ int ObTableBatchExecuteP::try_process()
LOG_WARN("fail to check index supported", K(ret), K(table_id_));
} else if (OB_UNLIKELY(!is_index_supported)) {
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "global index");
LOG_WARN("index type is not supported by table api", K(ret));
} else {
if (batch_operation.is_readonly()) {
@ -345,10 +348,8 @@ int ObTableBatchExecuteP::htable_put()
LOG_WARN("fail to get htable lock handle", K(ret));
} else if (OB_FAIL(ObHTableUtils::lock_htable_rows(table_id, batch_operation, *lock_handle, ObHTableLockMode::SHARED))) {
LOG_WARN("fail to lock htable rows", K(ret), K(table_id), K(batch_operation));
} else if (OB_FAIL(ObTableOpWrapper::get_or_create_spec<TABLE_API_EXEC_INSERT_UP>(tb_ctx_,
cache_guard,
spec))) {
LOG_WARN("fail to get or create spec", K(ret));
} else if (OB_FAIL(ObTableOpWrapper::get_insert_up_spec(tb_ctx_, cache_guard, spec))) {
LOG_WARN("fail to get insert up spec", K(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < batch_operation.count(); ++i) {
const ObTableOperation &table_operation = batch_operation.at(i);
@ -368,7 +369,7 @@ int ObTableBatchExecuteP::htable_put()
ObTableOperationResult op_result;
op_result.set_type(ObTableOperationType::INSERT_OR_UPDATE);
op_result.set_entity(result_entity_);
op_result.set_errno(ret);
op_result.set_err(ret);
op_result.set_affected_rows(affected_rows);
result_.reset();
if (OB_FAIL(result_.push_back(op_result))) {
@ -440,7 +441,7 @@ int ObTableBatchExecuteP::multi_get()
}
}
op_result.set_entity(*result_entity);
op_result.set_errno(ret);
op_result.set_err(ret);
op_result.set_type(tb_ctx_.get_opertion_type());
if (OB_FAIL(result_.push_back(op_result))) {
LOG_WARN("fail to push back op result", K(ret), K(i));
@ -573,7 +574,7 @@ int ObTableBatchExecuteP::htable_delete()
ObTableOperationResult single_op_result;
single_op_result.set_entity(result_entity_);
single_op_result.set_type(ObTableOperationType::DEL);
single_op_result.set_errno(ret);
single_op_result.set_err(ret);
single_op_result.set_affected_rows(affected_rows);
result_.reset();
if (OB_FAIL(result_.push_back(single_op_result))) {
@ -610,10 +611,8 @@ int ObTableBatchExecuteP::multi_insert()
LOG_WARN("fail to start readonly transaction", K(ret));
} else if (OB_FAIL(tb_ctx_.init_trans(get_trans_desc(), get_tx_snapshot()))) {
LOG_WARN("fail to init trans", K(ret), K(tb_ctx_));
} else if (OB_FAIL(ObTableOpWrapper::get_or_create_spec<TABLE_API_EXEC_INSERT>(tb_ctx_,
cache_guard,
spec))) {
LOG_WARN("fail to get or create spec", K(ret));
} else if (OB_FAIL(ObTableOpWrapper::get_insert_spec(tb_ctx_, cache_guard, spec))) {
LOG_WARN("fail to get or create insert spec", K(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < batch_operation.count(); ++i) {
const ObTableOperation &table_operation = batch_operation.at(i);
@ -766,7 +765,7 @@ int ObTableBatchExecuteP::batch_execute_internal(const ObTableBatchOperation &ba
ret = process_get(op_tb_ctx, op_result);
break;
case ObTableOperationType::INSERT:
ret = ObTableOpWrapper::process_op<TABLE_API_EXEC_INSERT>(op_tb_ctx, op_result);
ret = ObTableOpWrapper::process_insert_op(op_tb_ctx, op_result);
break;
case ObTableOperationType::DEL:
ret = ObTableOpWrapper::process_op<TABLE_API_EXEC_DELETE>(op_tb_ctx, op_result);
@ -774,15 +773,13 @@ int ObTableBatchExecuteP::batch_execute_internal(const ObTableBatchOperation &ba
case ObTableOperationType::UPDATE:
ret = ObTableOpWrapper::process_op<TABLE_API_EXEC_UPDATE>(op_tb_ctx, op_result);
break;
case ObTableOperationType::INSERT_OR_UPDATE:
ret = ObTableOpWrapper::process_op<TABLE_API_EXEC_INSERT_UP>(op_tb_ctx, op_result);
break;
case ObTableOperationType::REPLACE:
ret = ObTableOpWrapper::process_op<TABLE_API_EXEC_REPLACE>(op_tb_ctx, op_result);
break;
case ObTableOperationType::INSERT_OR_UPDATE:
case ObTableOperationType::APPEND:
case ObTableOperationType::INCREMENT:
ret = ObTableOpWrapper::process_op<TABLE_API_EXEC_INSERT_UP>(op_tb_ctx, op_result);
ret = ObTableOpWrapper::process_insert_up_op(op_tb_ctx, op_result);
break;
default:
ret = OB_ERR_UNEXPECTED;
@ -914,7 +911,7 @@ int ObTableBatchExecuteP::process_get(table::ObTableCtx &op_tb_ctx,
result_entity))) {
LOG_WARN("fail to cosntruct result entity", K(ret));
}
result.set_errno(ret);
result.set_err(ret);
result.set_type(op_tb_ctx.get_opertion_type());
return ret;
}
@ -978,6 +975,7 @@ int ObTableBatchExecuteP::htable_mutate_row()
default: {
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "mutation type");
LOG_WARN("not supported mutation type", K(ret), K(table_operation));
break;
}
@ -1040,7 +1038,7 @@ int ObTableBatchExecuteP::execute_htable_delete(const ObTableBatchOperation &bat
ObTableOperationResult single_op_result;
single_op_result.set_entity(result_entity_);
single_op_result.set_type(ObTableOperationType::DEL);
single_op_result.set_errno(ret);
single_op_result.set_err(ret);
single_op_result.set_affected_rows(affected_rows);
result_.reset();
if (OB_FAIL(result_.push_back(single_op_result))) {
@ -1066,7 +1064,7 @@ int ObTableBatchExecuteP::execute_htable_put(const ObTableBatchOperation &batch_
LOG_WARN("fail to init table ctx", K(ret));
} else if (OB_FAIL(tb_ctx.init_trans(get_trans_desc(), get_tx_snapshot()))) {
LOG_WARN("fail to init trans", K(ret), K(tb_ctx));
} else if (OB_FAIL(ObTableOpWrapper::process_op<TABLE_API_EXEC_INSERT_UP>(tb_ctx, single_op_result))) {
} else if (OB_FAIL(ObTableOpWrapper::process_insert_up_op(tb_ctx, single_op_result))) {
LOG_WARN("fail to process insertup op", K(ret));
} else if (FALSE_IT(result_.reset())) {
} else if (OB_FAIL(result_.push_back(single_op_result))) {

View File

@ -229,7 +229,10 @@ int ObTableApiCacheGuard::append_column_ids(const ObITableEntity *entity,
const ObColumnSchemaV2 *col_schema = nullptr;
for (int64_t i = 0; i < properties_names.count(); i++) {
if (OB_ISNULL(col_schema = table_schema->get_column_schema(properties_names.at(i)))) {
ret = OB_ERR_COLUMN_NOT_FOUND;
ret = OB_ERR_BAD_FIELD_ERROR;
const ObString &column = properties_names.at(i);
const ObString &table = table_schema->get_table_name_str();
LOG_USER_ERROR(OB_ERR_BAD_FIELD_ERROR, column.length(), column.ptr(), table.length(), table.ptr());
LOG_WARN("fail to get column schema", K(ret), K(i), K(properties_names.at(i)));
} else if (OB_FAIL(op_column_ids.push_back(col_schema->get_column_id()))) {
LOG_WARN("fail to push back column id", K(ret), K(i), K(col_schema->get_column_id()));

View File

@ -447,6 +447,7 @@ int ObTableExprCgService::generate_assign_expr(ObTableCtx &ctx, ObTableAssignmen
} else if (item->is_generated_column_) {
if (!item->is_stored_generated_column_) {
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "assign virtual generated column");
LOG_WARN("assign virtual generated column is not supported", K(ret));
} else {
if (OB_FAIL(build_generated_column_expr(ctx, *item, item->generated_expr_str_, tmp_expr))) {
@ -1043,10 +1044,16 @@ int ObTableExprCgService::refresh_properties_exprs_frame(ObTableCtx &ctx,
bool not_found = (OB_SEARCH_NOT_FOUND == entity.get_property(item.column_name_, prop_value));
if (not_found) {
obj = &item.default_value_;
if (!item.is_nullable_ && !item.is_auto_increment_ && obj->is_null()) {
ret = OB_ERR_NO_DEFAULT_FOR_FIELD;
LOG_USER_ERROR(OB_ERR_NO_DEFAULT_FOR_FIELD, to_cstring(item.column_name_));
LOG_WARN("column can not be null", K(ret), K(item));
}
} else {
obj = &prop_value;
}
if (T_FUN_SYS_AUTOINC_NEXTVAL == expr->type_) {
if (OB_FAIL(ret)) {
} else if (T_FUN_SYS_AUTOINC_NEXTVAL == expr->type_) {
ObObj null_obj;
null_obj.set_null();
obj = not_found ? &null_obj : &prop_value;
@ -1145,6 +1152,7 @@ int ObTableExprCgService::refresh_assign_exprs_frame(ObTableCtx &ctx,
LOG_WARN("unexpected assign projector_index_", K(ret), K(new_row), K(assign.column_item_));
} else if (assign.column_item_->is_virtual_generated_column_) {
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "update virtual generated column");
LOG_WARN("virtual generated column not support to update", K(ret), K(assign));
} else {
// on update current timestamp will not find value
@ -1214,6 +1222,56 @@ int ObTableDmlCgService::replace_exprs_with_dependant(ObTableCtx &ctx,
return ret;
}
/*
add column infos for check nullable before insert new_row to das.
*/
int ObTableDmlCgService::add_all_column_infos(ObTableCtx &ctx,
ObIAllocator &allocator,
ColContentFixedArray &column_infos)
{
int ret = OB_SUCCESS;
ObSEArray<uint64_t, 64> column_ids;
ObIArray<ObTableColumnItem> &items = ctx.get_column_items();
const ObTableSchema *table_schema = ctx.get_table_schema();
if (OB_ISNULL(table_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("table schema is null", K(ret));
} else if (OB_FAIL(table_schema->get_column_ids(column_ids))) {
LOG_WARN("fail to get column ids", K(ret));
} else if (OB_FAIL(column_infos.init(items.count()))) {
LOG_WARN("fail to init column infos capacity", K(ret), K(items.count()));
}
for (int64_t i= 0; OB_SUCC(ret) && i < items.count(); i++) {
const ObTableColumnItem &item = items.at(i);
ObColumnRefRawExpr *column_expr = item.expr_;
if (OB_ISNULL(column_expr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("column ref expr is null", K(ret));
} else {
ColumnContent column_content;
int64_t idx = 0;
column_content.auto_filled_timestamp_ = column_expr->get_result_type().has_result_flag(ON_UPDATE_NOW_FLAG);
column_content.is_nullable_ = !column_expr->get_result_type().is_not_null_for_write();
column_content.is_predicate_column_ = false;
column_content.is_implicit_ = false;
if (OB_FAIL(ob_write_string(allocator, column_expr->get_column_name(), column_content.column_name_))) {
LOG_WARN("fail to copy column name", K(ret), K(column_expr->get_column_name()));
} else if (!has_exist_in_array(column_ids, column_expr->get_column_id(), &idx)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("column not exists in schema columns", K(ret), KPC(column_expr), K(column_ids));
} else if (FALSE_IT(column_content.projector_index_ = static_cast<uint64_t>(idx))) {
//do nothing
} else if (OB_FAIL(column_infos.push_back(column_content))) {
LOG_WARN("fail to store colum content to column infos", K(ret), K(column_content));
}
}
}
return ret;
}
/*
genreate insert ctdef
- replace exprs with depenedant expr if there are generated column.
@ -1238,6 +1296,8 @@ int ObTableDmlCgService::generate_insert_ctdef(ObTableCtx &ctx,
LOG_WARN("fail to assign new row", K(ret));
} else if (OB_FAIL(generate_base_ctdef(ctx, ins_ctdef, old_row, new_row))) {
LOG_WARN("fail to generate dml base ctdef", K(ret));
} else if (OB_FAIL(add_all_column_infos(ctx, allocator, ins_ctdef.column_infos_))) {
LOG_WARN("fail to add all column infos", K(ret));
} else if (OB_FAIL(generate_das_ins_ctdef(ctx,
ctx.get_ref_table_id(),
ins_ctdef.das_ctdef_,
@ -2005,7 +2065,6 @@ int ObTableDmlCgService::generate_das_base_ctdef(uint64_t index_tid,
base_ctdef.is_ignore_ = false; // insert ignore
base_ctdef.is_batch_stmt_ = false;
base_ctdef.is_table_api_ = true;
int64_t binlog_row_image = share::ObBinlogRowImage::FULL;
ObSQLSessionInfo &session = ctx.get_session_info();
if (OB_FAIL(generate_column_info(index_tid, ctx, base_ctdef))) {
@ -2017,11 +2076,9 @@ int ObTableDmlCgService::generate_das_base_ctdef(uint64_t index_tid,
LOG_WARN("fail to get table schema version", K(ret));
} else if (OB_FAIL(convert_table_param(ctx, base_ctdef))) {
LOG_WARN("fail to convert table dml param", K(ret));
} else if (OB_FAIL(session.get_binlog_row_image(binlog_row_image))) {
LOG_WARN("fail to get binlog row image", K(ret));
} else {
base_ctdef.tz_info_ = *session.get_tz_info_wrap().get_time_zone_info();
base_ctdef.is_total_quantity_log_ = (share::ObBinlogRowImage::FULL == binlog_row_image);
base_ctdef.is_total_quantity_log_ = ctx.is_total_quantity_log();
base_ctdef.encrypt_meta_.reset();
}
@ -2334,8 +2391,9 @@ int ObTableTscCgService::replace_gen_col_exprs(const ObTableCtx &ctx,
for (int64_t i = 0; i < access_exprs.count() && OB_SUCC(ret); i++) {
ObRawExpr *expr = access_exprs.at(i);
if (!expr->is_column_ref_expr()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected expr type", K(ret), K(*expr));
if (OB_FAIL(res_access_expr.push_back(expr))) {
LOG_WARN("fail to push back expr", K(ret));
}
} else if (FALSE_IT(ref_expr = static_cast<ObColumnRefRawExpr*>(expr))) {
} else if (!ref_expr->is_virtual_generated_column()) {
if (OB_FAIL(res_access_expr.push_back(expr))) {

View File

@ -235,6 +235,9 @@ private:
sql::ObRowkeyCstCtdefArray &cst_ctdefs);
static int replace_exprs_with_dependant(ObTableCtx &ctx,
common::ObIArray<sql::ObRawExpr *> &dst_exprs);
static int add_all_column_infos(ObTableCtx &ctx,
common::ObIAllocator &allocator,
sql::ColContentFixedArray &column_infos);
private:
DISALLOW_COPY_AND_ASSIGN(ObTableDmlCgService);
};

View File

@ -19,6 +19,8 @@
#include "sql/engine/expr/ob_expr_lob_utils.h"
#include "ob_table_aggregation.h"
using namespace oceanbase::common;
namespace oceanbase
{
namespace table
@ -75,7 +77,7 @@ int ObTableCtx::init_sess_info(ObTableApiCredential &credential)
}
int ObTableCtx::init_common(ObTableApiCredential &credential,
const common::ObTabletID &arg_tablet_id,
const ObTabletID &arg_tablet_id,
const uint64_t table_id,
const int64_t &timeout_ts)
{
@ -140,6 +142,7 @@ int ObTableCtx::construct_column_items()
item.is_stored_generated_column_ = col_schema->is_stored_generated_column();
item.is_virtual_generated_column_ = col_schema->is_virtual_generated_column();
item.is_auto_increment_ = col_schema->is_autoincrement();
item.is_nullable_ = col_schema->is_nullable();
item.generated_expr_str_ = item.default_value_.get_string();
item.auto_filled_timestamp_ = col_schema->is_on_update_current_timestamp();
item.rowkey_position_ = col_schema->get_rowkey_position();
@ -301,8 +304,8 @@ int ObTableCtx::init_physical_plan_ctx(int64_t timeout_ts, int64_t tenant_schema
- adjust entity
*/
int ObTableCtx::init_common(ObTableApiCredential &credential,
const common::ObTabletID &arg_tablet_id,
const common::ObString &arg_table_name,
const ObTabletID &arg_tablet_id,
const ObString &arg_table_name,
const int64_t &timeout_ts)
{
int ret = OB_SUCCESS;
@ -317,6 +320,11 @@ int ObTableCtx::init_common(ObTableApiCredential &credential,
false, /* is_index */
table_schema_))) {
LOG_WARN("fail to get table schema", K(ret), K(tenant_id), K(database_id), K(arg_table_name));
} else if (OB_ISNULL(table_schema_)) {
ret = OB_TABLE_NOT_EXIST;
ObString db("");
LOG_USER_ERROR(OB_TABLE_NOT_EXIST, db.ptr(), arg_table_name.ptr());
LOG_WARN("table not exist", K(ret), K(tenant_id), K(database_id), K(arg_table_name));
} else if (OB_FAIL(inner_init_common(credential, arg_tablet_id, arg_table_name, timeout_ts))) {
LOG_WARN("fail to inner init common", KR(ret), K(credential), K(arg_tablet_id),
K(arg_table_name), K(timeout_ts));
@ -326,8 +334,8 @@ int ObTableCtx::init_common(ObTableApiCredential &credential,
}
int ObTableCtx::inner_init_common(ObTableApiCredential &credential,
const common::ObTabletID &arg_tablet_id,
const common::ObString &table_name,
const ObTabletID &arg_tablet_id,
const ObString &table_name,
const int64_t &timeout_ts)
{
int ret = OB_SUCCESS;
@ -353,6 +361,7 @@ int ObTableCtx::inner_init_common(ObTableApiCredential &credential,
if (is_scan_) { // 扫描场景使用table_schema上的tablet id,客户端已经做了路由分发
if (table_schema_->is_partitioned_table()) { // 不支持分区表
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "invalid tablet id in partition table when do scan");
LOG_WARN("partitioned table not supported", K(ret), K(table_name));
} else {
tablet_id = table_schema_->get_tablet_id();
@ -380,6 +389,8 @@ int ObTableCtx::inner_init_common(ObTableApiCredential &credential,
LOG_WARN("fail to construct column items", K(ret));
} else if (!is_scan_ && OB_FAIL(adjust_entity())) {
LOG_WARN("fail to adjust entity", K(ret));
} else if (OB_FAIL(sess_guard_.get_sess_info().get_binlog_row_image(binlog_row_image_type_))) {
LOG_WARN("fail to get binlog row image", K(ret));
} else {
table_name_ = table_name;
ref_table_id_ = table_schema_->get_table_id();
@ -394,11 +405,11 @@ int ObTableCtx::inner_init_common(ObTableApiCredential &credential,
return ret;
}
// get columns type from index_schema or primary table schema
int ObTableCtx::generate_columns_type(ObIArray<ObExprResType> &columns_type)
// get columns info from index_schema or primary table schema
int ObTableCtx::generate_column_infos(ObIArray<ObTableColumnInfo> &columns_infos)
{
int ret = OB_SUCCESS;
ObExprResType tmp_column_type;
ObTableColumnInfo tmp_column_info;
const ObColumnSchemaV2 *column_schema = nullptr;
if (is_index_scan_) {
@ -407,10 +418,10 @@ int ObTableCtx::generate_columns_type(ObIArray<ObExprResType> &columns_type)
if (OB_ISNULL(column_schema = index_schema_->get_column_schema(index_col_ids_.at(i)))) {
ret = OB_SCHEMA_ERROR;
LOG_WARN("fail to get column schema", K(ret), K(index_col_ids_.at(i)));
} else if (OB_FAIL(cons_column_type(*column_schema, tmp_column_type))) {
LOG_WARN("fail to cons column type", K(ret));
} else if (OB_FAIL(columns_type.push_back(tmp_column_type))) {
LOG_WARN("fail to push back column type", K(ret), K(tmp_column_type));
} else if (OB_FAIL(cons_column_info(*column_schema, tmp_column_info))) {
LOG_WARN("fail to cons column info", K(ret));
} else if (OB_FAIL(columns_infos.push_back(tmp_column_info))) {
LOG_WARN("fail to push back column info", K(ret), K(tmp_column_info));
}
}
} else { // primary key
@ -423,10 +434,10 @@ int ObTableCtx::generate_columns_type(ObIArray<ObExprResType> &columns_type)
} else if (OB_ISNULL(column_schema = table_schema_->get_column_schema(column_id))) {
ret = OB_SCHEMA_ERROR;
LOG_WARN("fail to get column schema", K(ret), K(column_id));
} else if (OB_FAIL(cons_column_type(*column_schema, tmp_column_type))) {
LOG_WARN("fail to cons column type", K(ret));
} else if (OB_FAIL(columns_type.push_back(tmp_column_type))) {
LOG_WARN("fail to push back column type", K(ret), K(tmp_column_type));
} else if (OB_FAIL(cons_column_info(*column_schema, tmp_column_info))) {
LOG_WARN("fail to cons column info", K(ret));
} else if (OB_FAIL(columns_infos.push_back(tmp_column_info))) {
LOG_WARN("fail to push back column info", K(ret), K(tmp_column_info));
}
}
}
@ -434,25 +445,29 @@ int ObTableCtx::generate_columns_type(ObIArray<ObExprResType> &columns_type)
return ret;
}
int ObTableCtx::cons_column_type(const ObColumnSchemaV2 &column_schema,
ObExprResType &column_type)
int ObTableCtx::cons_column_info(const ObColumnSchemaV2 &column_schema,
ObTableColumnInfo &column_info)
{
int ret = OB_SUCCESS;
column_type.set_type(column_schema.get_data_type());
column_type.set_result_flag(ObRawExprUtils::calc_column_result_flag(column_schema));
column_info.type_.set_type(column_schema.get_data_type());
column_info.type_.set_result_flag(ObRawExprUtils::calc_column_result_flag(column_schema));
column_info.column_name_ = column_schema.get_column_name_str();
column_info.is_auto_inc_ = column_schema.is_autoincrement();
column_info.is_nullable_ = column_schema.is_nullable();
if (ob_is_string_type(column_schema.get_data_type()) || ob_is_json(column_schema.get_data_type())) {
column_type.set_collation_type(column_schema.get_collation_type());
column_type.set_collation_level(CS_LEVEL_IMPLICIT);
column_info.type_.set_collation_type(column_schema.get_collation_type());
column_info.type_.set_collation_level(CS_LEVEL_IMPLICIT);
} else {
column_type.set_collation_type(CS_TYPE_BINARY);
column_type.set_collation_level(CS_LEVEL_NUMERIC);
column_info.type_.set_collation_type(CS_TYPE_BINARY);
column_info.type_.set_collation_level(CS_LEVEL_NUMERIC);
}
const ObAccuracy &accuracy = column_schema.get_accuracy();
column_type.set_accuracy(accuracy);
const bool is_zerofill = column_type.has_result_flag(ZEROFILL_FLAG);
column_info.type_.set_accuracy(accuracy);
const bool is_zerofill = column_info.type_.has_result_flag(ZEROFILL_FLAG);
if (is_zerofill) {
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "modifing column with ZEROFILL flag");
LOG_WARN("modifing column with ZEROFILL flag is not supported", K(ret), K(column_schema));
}
@ -496,25 +511,28 @@ int ObTableCtx::read_real_lob(ObIAllocator &allocator, ObObj &obj)
- check collation for string type and convert obj type to the column type (char, varchar or text)
- check accuracy
*/
int ObTableCtx::adjust_column_type(const ObExprResType &column_type,
ObObj &obj,
bool is_autoincrement/*=false*/)
int ObTableCtx::adjust_column_type(const ObTableColumnInfo &column_info, ObObj &obj)
{
int ret = OB_SUCCESS;
const bool is_not_nullable = column_type.is_not_null_for_read();
const ObExprResType &column_type = column_info.type_;
const ObCollationType cs_type = column_type.get_collation_type();
// 1. check nullable
if (is_not_nullable && obj.is_null()) {
if (!is_autoincrement) {
if (!column_info.is_nullable_ && obj.is_null()) {
if (!column_info.is_auto_inc_) {
ret = OB_BAD_NULL_ERROR;
LOG_USER_ERROR(OB_BAD_NULL_ERROR, column_info.column_name_.length(), column_info.column_name_.ptr());
}
} else if (obj.is_null() || is_inc()) {
// continue
} else if (column_type.get_type() != obj.get_type()
&& !(ob_is_string_type(column_type.get_type()) && ob_is_string_type(obj.get_type()))) {
// 2. data type mismatch
ret = OB_OBJ_TYPE_ERROR;
ret = OB_KV_COLUMN_TYPE_NOT_MATCH;
const char *schema_type_str = ob_obj_type_str(column_type.get_type());
const char *obj_type_str = ob_obj_type_str(obj.get_type());
LOG_USER_ERROR(OB_KV_COLUMN_TYPE_NOT_MATCH, column_info.column_name_.length(), column_info.column_name_.ptr(),
static_cast<int>(strlen(schema_type_str)), schema_type_str, static_cast<int>(strlen(obj_type_str)), obj_type_str);
LOG_WARN("object type mismatch with column type", K(ret), K(column_type), K(obj));
} else {
// 3. check collation
@ -530,7 +548,11 @@ int ObTableCtx::adjust_column_type(const ObExprResType &column_type,
// same charset, convert it
obj.set_collation_type(cs_type);
} else {
ret = OB_ERR_COLLATION_MISMATCH;
ret = OB_KV_COLLATION_MISMATCH;
const char *schema_coll_str = ObCharset::collation_name(cs_type);
const char *obj_coll_str = ObCharset::collation_name(obj.get_collation_type());
LOG_USER_ERROR(OB_KV_COLLATION_MISMATCH, column_info.column_name_.length(), column_info.column_name_.ptr(),
static_cast<int>(strlen(schema_coll_str)), schema_coll_str, static_cast<int>(strlen(obj_coll_str)), obj_coll_str);
LOG_WARN("collation type mismatch with column", K(ret), K(column_type), K(obj));
}
if (OB_SUCC(ret)) {
@ -546,6 +568,16 @@ int ObTableCtx::adjust_column_type(const ObExprResType &column_type,
// 4. check accuracy
if (OB_SUCC(ret)) {
if (OB_FAIL(ob_obj_accuracy_check_only(column_type.get_accuracy(), cs_type, obj))) {
if (ret == OB_DATA_OUT_OF_RANGE) {
int64_t row_num = 0;
LOG_USER_ERROR(OB_DATA_OUT_OF_RANGE, column_info.column_name_.length(), column_info.column_name_.ptr(), row_num);
} else if (ret == OB_OPERATE_OVERFLOW) {
const char *type_str = ob_obj_type_str(column_type.get_type());
LOG_USER_ERROR(OB_OPERATE_OVERFLOW, type_str, column_info.column_name_.ptr());
} else if (ret == OB_ERR_DATA_TOO_LONG) {
int64_t row_num = 0;
LOG_USER_ERROR(OB_ERR_DATA_TOO_LONG, column_info.column_name_.length(), column_info.column_name_.ptr(), row_num);
}
LOG_WARN("accuracy check failed", K(ret), K(obj), K(column_type));
}
}
@ -564,12 +596,12 @@ int ObTableCtx::adjust_column_type(const ObExprResType &column_type,
int ObTableCtx::adjust_column(const ObColumnSchemaV2 &col_schema, ObObj &obj)
{
int ret = OB_SUCCESS;
ObExprResType column_type;
ObTableColumnInfo column_info;
if (OB_FAIL(cons_column_type(col_schema, column_type))) {
LOG_WARN("fail to construct column type", K(ret), K(col_schema));
} else if (OB_FAIL(adjust_column_type(column_type, obj, col_schema.is_autoincrement()))) {
LOG_WARN("fail to adjust rowkey column type", K(ret), K(obj));
if (OB_FAIL(cons_column_info(col_schema, column_info))) {
LOG_WARN("fail to construct column info", K(ret), K(col_schema));
} else if (OB_FAIL(adjust_column_type(column_info, obj))) {
LOG_WARN("fail to adjust rowkey column type", K(ret), K(obj), K(column_info));
}
return ret;
@ -608,6 +640,7 @@ int ObTableCtx::adjust_rowkey()
has_auto_inc = true;
if (col_schema->is_part_key_column()) {
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "auto increment column set to be partition column");
LOG_WARN("auto increment column could not be partition column", K(ret), K(*col_schema));
} else if (!is_full_filled) { // curr column is auto_increment and user not fill,no need to check
need_check = false;
@ -629,7 +662,8 @@ int ObTableCtx::adjust_rowkey()
if (OB_FAIL(ret)) {
// do nothing
} else if (!has_auto_inc && entity_rowkey_cnt != schema_rowkey_cnt) {
ret = OB_ERR_UNEXPECTED;
ret = OB_KV_ROWKEY_COUNT_NOT_MATCH;
LOG_USER_ERROR(OB_KV_ROWKEY_COUNT_NOT_MATCH, schema_rowkey_cnt, entity_rowkey_cnt);
LOG_WARN("entity rowkey count mismatch table schema rowkey count",
K(ret), K(entity_rowkey_cnt), K(schema_rowkey_cnt));
}
@ -660,12 +694,15 @@ int ObTableCtx::adjust_properties()
const ObString &col_name = prop_names.at(i);
ObObj &prop_obj = const_cast<ObObj &>(prop_objs.at(i));
if (OB_ISNULL(col_schema = table_schema_->get_column_schema(col_name))) {
ret = OB_ERR_COLUMN_NOT_FOUND;
ret = OB_ERR_BAD_FIELD_ERROR;
const ObString &table = table_schema_->get_table_name_str();
LOG_USER_ERROR(OB_ERR_BAD_FIELD_ERROR, col_name.length(), col_name.ptr(), table.length(), table.ptr());
LOG_WARN("fail to get column schema", K(ret), K(col_name));
} else if (is_get) {
// do nothing
} else if (col_schema->is_rowkey_column()) {
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "mutate rowkey column");
LOG_WARN("property should not be rowkey column", K(ret), K(prop_names), K(i));
} else if (OB_FAIL(adjust_column(*col_schema, prop_obj))) {
LOG_WARN("fail to adjust column", K(ret), K(prop_obj));
@ -722,11 +759,11 @@ int ObTableCtx::generate_key_range(const ObIArray<ObNewRange> &scan_ranges)
{
int ret = OB_SUCCESS;
int64_t padding_num = -1;
ObArray<sql::ObExprResType> columns_type;
ObSEArray<ObTableColumnInfo, 32> columns_infos;
int64_t N = scan_ranges.count();
if (OB_FAIL(generate_columns_type(columns_type))) {
LOG_WARN("fail to generate columns type", K(ret));
if (OB_FAIL(generate_column_infos(columns_infos))) {
LOG_WARN("fail to generate columns infos", K(ret));
} else if (is_index_scan_) {
// 索引扫描场景下用户可能没有填写rowkey的key_range,需要加上
padding_num = index_schema_->get_rowkey_column_num() - index_col_ids_.count();
@ -745,17 +782,18 @@ int ObTableCtx::generate_key_range(const ObIArray<ObNewRange> &scan_ranges)
if (p_key->is_min_row() || p_key->is_max_row()) {
// do nothing
} else {
if (p_key->get_obj_cnt() != columns_type.count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("wrong rowkey size", K(ret), K(i), K(j), K(*p_key), K(columns_type));
if (p_key->get_obj_cnt() != columns_infos.count()) {
ret = OB_KV_SCAN_RANGE_MISSING;
LOG_USER_ERROR(OB_KV_SCAN_RANGE_MISSING, p_key->get_obj_cnt(), columns_infos.count());
LOG_WARN("wrong rowkey size", K(ret), K(i), K(j), K(*p_key), K(columns_infos));
} else {
const int64_t M = p_key->get_obj_cnt();
for (int64_t k = 0; OB_SUCCESS == ret && k < M; ++k) {
ObObj &obj = const_cast<ObObj&>(p_key->get_obj_ptr()[k]);
if (obj.is_min_value() || obj.is_max_value()) {
// do nothing
} else if (OB_FAIL(adjust_column_type(columns_type.at(k), obj))) {
LOG_WARN("fail to adjust column type", K(ret), K(columns_type.at(k)), K(obj));
} else if (OB_FAIL(adjust_column_type(columns_infos.at(k), obj))) {
LOG_WARN("fail to adjust column type", K(ret), K(columns_infos.at(k)), K(obj));
}
}
}
@ -865,7 +903,7 @@ int ObTableCtx::init_scan(const ObTableQuery &query,
} else {
// init index_col_ids_
if (OB_SUCC(ret)) {
const common::ObIndexInfo &index_info = index_schema_->get_index_info();
const ObIndexInfo &index_info = index_schema_->get_index_info();
if (OB_FAIL(index_info.get_column_ids(index_col_ids_))) {
LOG_WARN("fail to get index column ids", K(ret), K(index_info));
}
@ -988,7 +1026,7 @@ int ObTableCtx::add_stored_generated_column_assignment(const ObTableAssignment &
}
}
}
return init_dml_related_tid();
return ret;
}
/*
@ -1015,6 +1053,7 @@ int ObTableCtx::init_assignments(const ObTableEntity &entity)
if (OB_SUCCESS == entity.get_property(item.column_name_, prop_obj)) {
if (item.rowkey_position_ > 0) {
ret = OB_ERR_UPDATE_ROWKEY_COLUMN;
LOG_USER_ERROR(OB_ERR_UPDATE_ROWKEY_COLUMN);
LOG_WARN("can not update rowkey column", K(ret));
} else {
ObTableAssignment assign(&item);
@ -1248,10 +1287,15 @@ int ObTableCtx::init_append(bool return_affected_entity, bool return_rowkey)
} else if (assign.column_item_->auto_filled_timestamp_) {
// do nothing
} else if (delta.is_null()) {
ret = OB_OBJ_TYPE_ERROR;
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "append null");
LOG_WARN("append NULL is illegal", K(ret), K(delta));
} else if (OB_UNLIKELY(!ob_is_string_type(delta.get_type()))) {
ret = OB_OBJ_TYPE_ERROR;
ret = OB_KV_COLUMN_TYPE_NOT_MATCH;
const char *schema_type_str = "stringTc";
const char *obj_type_str = ob_obj_type_str(delta.get_type());
LOG_USER_ERROR(OB_KV_COLUMN_TYPE_NOT_MATCH, assign.column_item_->column_name_.length(), assign.column_item_->column_name_.ptr(),
static_cast<int>(strlen(schema_type_str)), schema_type_str, static_cast<int>(strlen(obj_type_str)), obj_type_str);
LOG_WARN("can only append string type", K(ret), K(delta));
} else {
const ObString &column_name = assign.column_item_->column_name_;
@ -1310,11 +1354,16 @@ int ObTableCtx::init_increment(bool return_affected_entity, bool return_rowkey)
LOG_WARN("column item is null", K(ret), K(assign));
} else if (assign.column_item_->is_auto_increment_) {
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "increment auto increment column");
LOG_WARN("not support increment auto increment column", K(ret), K(assign));
} else if (assign.column_item_->auto_filled_timestamp_) {
// do nothing
} else if (!ob_is_int_tc(delta.get_type())) {
ret = OB_OBJ_TYPE_ERROR;
ret = OB_KV_COLUMN_TYPE_NOT_MATCH;
const char *schema_type_str = "intTc";
const char *obj_type_str = ob_obj_type_str(delta.get_type());
LOG_USER_ERROR(OB_KV_COLUMN_TYPE_NOT_MATCH, assign.column_item_->column_name_.length(), assign.column_item_->column_name_.ptr(),
static_cast<int>(strlen(schema_type_str)), schema_type_str, static_cast<int>(strlen(obj_type_str)), obj_type_str);
LOG_WARN("delta should only be signed integer type", K(ret), K(delta));
} else {
const ObString &column_name = assign.column_item_->column_name_;
@ -1575,7 +1624,7 @@ int ObTableCtx::init_agg_cell_proj(int64_t size)
}
int ObTableCtx::add_aggregate_proj(int64_t cell_idx,
const common::ObString &column_name,
const ObString &column_name,
const ObIArray<ObTableAggregation> &aggregations)
{
int ret = OB_SUCCESS;
@ -1703,20 +1752,22 @@ int ObTableCtx::get_related_tablet_id(const share::schema::ObTableSchema &index_
int ObTableCtx::check_insert_up_can_use_put(bool &use_put)
{
int ret = OB_SUCCESS;
use_put = true;
use_put = false;
bool can_use_put = true;
if (is_inc_or_append()) { // increment or append operarion need old value to calculate, can not use put
use_put = false;
can_use_put = false;
} else if (ObTableOperationType::INSERT_OR_UPDATE != operation_type_) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid operation type", K(ret), K_(operation_type));
} else if (is_htable()) { // htable has no index and alway full filled.
use_put = true;
can_use_put = true;
} else if (is_client_set_put_ && !related_index_ids_.empty()) {
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "table with index use put");
LOG_WARN("client set use_put flag, but has local index is not support", K(ret), K_(related_index_ids));
} else if (!related_index_ids_.empty()) { // has index, can not use put
use_put = false;
can_use_put = false;
} else if (OB_ISNULL(table_schema_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("table schema is null", K(ret));
@ -1729,14 +1780,19 @@ int ObTableCtx::check_insert_up_can_use_put(bool &use_put)
- table_schema_->get_rowkey_column_num() <= entity_->get_properties_count())) { // all columns are filled
} else if (is_client_set_put_ && !is_all_columns_filled) {
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "all columns not filled but use put");
LOG_WARN("client set use_put flag, but not fill all columns is not support", K(ret), KPC_(table_schema), KPC_(entity));
} else if (is_client_set_put_ || is_all_columns_filled) {
use_put = true;
can_use_put = true;
} else { // some columns are missing
use_put = false;
can_use_put = false;
}
}
if (OB_SUCC(ret) && can_use_put && !is_total_quantity_log()) {
use_put = true;
}
return ret;
}

View File

@ -34,6 +34,7 @@ struct ObTableColumnItem : public sql::ColumnItem
is_stored_generated_column_(false),
is_virtual_generated_column_(false),
is_auto_increment_(false),
is_nullable_(true),
rowkey_position_(-1)
{}
TO_STRING_KV("ColumnItem", static_cast<const sql::ColumnItem &>(*this),
@ -45,6 +46,7 @@ struct ObTableColumnItem : public sql::ColumnItem
K_(generated_expr_str),
K_(dependant_exprs),
K_(is_auto_increment),
K_(is_nullable),
K_(rowkey_position));
sql::ObRawExpr *raw_expr_; // column ref expr or calculate expr
bool is_generated_column_;
@ -56,9 +58,35 @@ struct ObTableColumnItem : public sql::ColumnItem
common::ObString generated_expr_str_;
common::ObSEArray<sql::ObRawExpr*, 8, common::ModulePageAllocator, true> dependant_exprs_;
bool is_auto_increment_;
bool is_nullable_;
int64_t rowkey_position_; // greater than zero if this is rowkey column, 0 if this is common column
};
struct ObTableColumnInfo
{
ObTableColumnInfo()
: type_(),
is_auto_inc_(false),
is_nullable_(true)
{
}
ObTableColumnInfo(sql::ObExprResType type, const common::ObString &column_name, bool is_auto_inc = false, bool is_nullable = true)
: type_(type),
column_name_(column_name),
is_auto_inc_(is_auto_inc),
is_nullable_(is_nullable)
{
}
sql::ObExprResType type_;
common::ObString column_name_;
bool is_auto_inc_;
bool is_nullable_;
TO_STRING_KV(K_(type),
K_(column_name),
K_(is_auto_inc),
K_(is_nullable));
};
struct ObTableAssignment : public sql::ObAssignment
{
ObTableAssignment()
@ -152,6 +180,7 @@ public:
is_ttl_table_ = false;
is_skip_scan_ = false;
is_client_set_put_ = false;
binlog_row_image_type_ = ObBinlogRowImage::FULL;
}
virtual ~ObTableCtx()
{}
@ -186,7 +215,8 @@ public:
K_(cur_cluster_version),
K_(is_ttl_table),
K_(is_skip_scan),
K_(is_client_set_put));
K_(is_client_set_put),
K_(binlog_row_image_type));
public:
//////////////////////////////////////// getter ////////////////////////////////////////////////
// for common
@ -242,6 +272,7 @@ public:
OB_INLINE const common::ObIArray<uint64_t>& get_select_col_ids() const { return select_col_ids_; }
OB_INLINE const common::ObIArray<uint64_t>& get_query_col_ids() const { return query_col_ids_; }
OB_INLINE const common::ObIArray<common::ObString>& get_query_col_names() const { return query_col_names_; }
OB_INLINE bool is_total_quantity_log() const { return binlog_row_image_type_ == ObBinlogRowImage::FULL; }
// for update
OB_INLINE bool is_for_update() const { return is_for_update_; }
OB_INLINE bool is_inc_or_append() const
@ -373,7 +404,7 @@ private:
int init_sess_info(ObTableApiCredential &credential);
// for scan
int init_index_info(const common::ObString &index_name);
int generate_columns_type(common::ObIArray<sql::ObExprResType> &columns_type);
int generate_column_infos(common::ObIArray<ObTableColumnInfo> &columns_infos);
int generate_key_range(const common::ObIArray<common::ObNewRange> &scan_ranges);
// for dml
int init_dml_related_tid();
@ -396,9 +427,9 @@ private:
private:
int construct_column_items();
int cons_column_type(const share::schema::ObColumnSchemaV2 &column_schema,
sql::ObExprResType &column_type);
int adjust_column_type(const ObExprResType &column_type, ObObj &obj, bool is_autoincrement = false);
int cons_column_info(const share::schema::ObColumnSchemaV2 &column_schema,
ObTableColumnInfo &column_info);
int adjust_column_type(const ObTableColumnInfo &column_info, ObObj &ob);
int adjust_column(const ObColumnSchemaV2 &col_schema, ObObj &obj);
int adjust_rowkey();
int adjust_properties();
@ -483,6 +514,7 @@ private:
bool is_skip_scan_;
// for put
bool is_client_set_put_;
int64_t binlog_row_image_type_;
private:
DISALLOW_COPY_AND_ASSIGN(ObTableCtx);
};

View File

@ -80,7 +80,7 @@ void ObTableExecuteEndTransCb::callback(int cb_param)
CHECK_BALANCE("[table async callback]");
if (cb_param != OB_SUCCESS) {
// commit failed
result_.set_errno(cb_param);
result_.set_err(cb_param);
result_.set_affected_rows(0);
result_entity_.reset();
}
@ -143,7 +143,7 @@ void ObTableBatchExecuteEndTransCb::callback(int cb_param)
// same result for all
ObTableOperationResult single_op_result;
single_op_result.set_entity(result_entity_);
single_op_result.set_errno(cb_param);
single_op_result.set_err(cb_param);
single_op_result.set_type(table_operation_type_);
if (OB_FAIL(result_.push_back(single_op_result))) {
LOG_WARN("failed to add result", K(ret)); // @todo reset the connection

View File

@ -81,6 +81,7 @@ int ObTableApiExecuteP::check_arg()
if (!(arg_.consistency_level_ == ObTableConsistencyLevel::STRONG ||
arg_.consistency_level_ == ObTableConsistencyLevel::EVENTUAL)) {
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "consistency level");
LOG_WARN("some options not supported yet", K(ret),
"consistency_level", arg_.consistency_level_,
"operation_type", arg_.table_operation_.type());
@ -97,6 +98,7 @@ int ObTableApiExecuteP::check_arg2() const
ObTableOperationType::Type::INCREMENT != op_type) {
if (arg_.returning_rowkey() || arg_.returning_affected_entity()) {
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "returning rowkey or affected entity");
LOG_WARN("some options not supported yet", K(ret),
"returning_rowkey", arg_.returning_rowkey(),
"returning_affected_entity", arg_.returning_affected_entity(),
@ -221,6 +223,7 @@ int ObTableApiExecuteP::try_process()
// do nothing
} else if (OB_UNLIKELY(!is_index_supported)) {
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "global index");
LOG_WARN("index type is not supported by table api", K(ret));
} else if (OB_FAIL(check_arg2())) {
LOG_WARN("fail to check arg", K(ret));
@ -230,11 +233,7 @@ int ObTableApiExecuteP::try_process()
switch (table_operation.type()) {
case ObTableOperationType::INSERT:
stat_event_type_ = ObTableProccessType::TABLE_API_SINGLE_INSERT;
if (tb_ctx_.is_ttl_table()) {
ret = process_dml_op<TABLE_API_EXEC_TTL>();
} else {
ret = process_dml_op<TABLE_API_EXEC_INSERT>();
}
ret = process_insert();
break;
case ObTableOperationType::GET:
stat_event_type_ = ObTableProccessType::TABLE_API_SINGLE_GET;
@ -250,11 +249,7 @@ int ObTableApiExecuteP::try_process()
break;
case ObTableOperationType::INSERT_OR_UPDATE:
stat_event_type_ = ObTableProccessType::TABLE_API_SINGLE_INSERT_OR_UPDATE;
if (tb_ctx_.is_ttl_table()) {
ret = process_dml_op<TABLE_API_EXEC_TTL>();
} else {
ret = process_dml_op<TABLE_API_EXEC_INSERT_UP>();
}
ret = process_insert_up();
break;
case ObTableOperationType::REPLACE:
stat_event_type_ = ObTableProccessType::TABLE_API_SINGLE_REPLACE;
@ -262,19 +257,11 @@ int ObTableApiExecuteP::try_process()
break;
case ObTableOperationType::INCREMENT:
stat_event_type_ = ObTableProccessType::TABLE_API_SINGLE_INCREMENT;
if (tb_ctx_.is_ttl_table()) {
ret = process_dml_op<TABLE_API_EXEC_TTL>();
} else {
ret = process_dml_op<TABLE_API_EXEC_INSERT_UP>();
}
ret = process_insert_up();
break;
case ObTableOperationType::APPEND:
stat_event_type_ = ObTableProccessType::TABLE_API_SINGLE_APPEND;
if (tb_ctx_.is_ttl_table()) {
ret = process_dml_op<TABLE_API_EXEC_TTL>();
} else {
ret = process_dml_op<TABLE_API_EXEC_INSERT_UP>();
}
ret = process_insert_up();
break;
default:
ret = OB_INVALID_ARGUMENT;
@ -286,7 +273,7 @@ int ObTableApiExecuteP::try_process()
if (OB_FAIL(ret)) {
// init_tb_ctx will return some replaceable error code
result_.set_errno(ret);
result_.set_err(ret);
table::ObTableApiUtil::replace_ret_code(ret);
}
@ -420,7 +407,7 @@ int ObTableApiExecuteP::process_get()
}
release_read_trans();
result_.set_errno(ret);
result_.set_err(ret);
ObTableApiUtil::replace_ret_code(ret);
result_.set_type(arg_.table_operation_.type());

View File

@ -68,7 +68,7 @@ private:
SERVER_LOG(WARN, "fail to process op", K(ret));
}
result_.set_errno(ret);
result_.set_err(ret);
table::ObTableApiUtil::replace_ret_code(ret);
int tmp_ret = ret;
if (OB_FAIL(end_trans(OB_SUCCESS != ret, req_, get_timeout_ts()))) {
@ -79,6 +79,26 @@ private:
return ret;
}
int process_get();
int process_insert()
{
int ret = OB_SUCCESS;
if (!tb_ctx_.is_ttl_table()) {
ret = process_dml_op<table::TABLE_API_EXEC_INSERT>();
} else {
ret = process_dml_op<table::TABLE_API_EXEC_TTL>();
}
return ret;
}
int process_insert_up()
{
int ret = OB_SUCCESS;
if (!tb_ctx_.is_ttl_table()) {
ret = process_dml_op<table::TABLE_API_EXEC_INSERT_UP>();
} else {
ret = process_dml_op<table::TABLE_API_EXEC_TTL>();
}
return ret;
}
private:
table::ObTableEntity request_entity_;
table::ObTableEntity result_entity_;

View File

@ -103,6 +103,7 @@ int ObTableComparator::compare_to(const ObIArray<ObString> &select_columns,
} else {
// not support others
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "only for int and string, other column type");
LOG_WARN("do not support other column type, only for int, string", K(ret), K(column_type));
}
} else {

View File

@ -23,6 +23,33 @@ namespace oceanbase
{
namespace table
{
int ObTableApiModifyExecutor::check_row_null(const ObExprPtrIArray &row, const ColContentIArray &column_infos)
{
int ret = OB_SUCCESS;
if (row.count() < column_infos.count()) { // column_infos count less than row count when do update
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid row count", K(ret), K(row), K(column_infos));
}
for (int i = 0; OB_SUCC(ret) && i < column_infos.count(); i++) {
ObDatum *datum = NULL;
const bool is_nullable = column_infos.at(i).is_nullable_;
uint64_t col_idx = column_infos.at(i).projector_index_;
if (OB_FAIL(row.at(col_idx)->eval(eval_ctx_, datum))) {
LOG_WARN("fail to eval datum", K(ret), K(row), K(column_infos), K(col_idx));
} else if (!is_nullable && datum->is_null()) {
const ObString &column_name = column_infos.at(i).column_name_;
ret = OB_BAD_NULL_ERROR;
LOG_USER_ERROR(OB_BAD_NULL_ERROR, column_name.length(), column_name.ptr());
LOG_WARN("bad null error", K(ret), K(row), K(column_name));
}
}
return ret;
}
int ObTableApiModifyExecutor::open()
{
int ret = OB_SUCCESS;
@ -239,6 +266,8 @@ int ObTableApiModifyExecutor::insert_row_to_das(const ObTableInsCtDef &ins_ctdef
ObChunkDatumStore::StoredRow* stored_row = nullptr;
if (OB_FAIL(calc_tablet_loc(tablet_loc))) {
LOG_WARN("fail to calc partition key", K(ret));
} else if (OB_FAIL(check_row_null(ins_ctdef.new_row_, ins_ctdef.column_infos_))) {
LOG_WARN("fail to check row nullable", K(ret));
} else if (OB_FAIL(ObDMLService::insert_row(ins_ctdef.das_ctdef_,
ins_rtdef.das_rtdef_,
tablet_loc,
@ -428,6 +457,7 @@ int ObTableApiModifyExecutor::check_rowkey_change(const ObChunkDatumStore::Store
// do nothing
} else if (!ObDatum::binary_equal(upd_old_row.cells()[i], upd_new_row.cells()[i])) {
ret = OB_ERR_UPDATE_ROWKEY_COLUMN;
LOG_USER_ERROR(OB_ERR_UPDATE_ROWKEY_COLUMN);
LOG_WARN("can not update rowkey column", K(ret));
}
}
@ -469,6 +499,7 @@ int ObTableApiModifyExecutor::to_expr_skip_old(const ObChunkDatumStore::StoredRo
LOG_WARN("unexpected assign projector_index_", K(ret), K(new_row), K(assign.column_item_));
} else if (assign.column_item_->is_virtual_generated_column_) {
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "update virtual generated column");
LOG_WARN("virtual generated column not support to update", K(ret), K(assign));
} else {
ObExpr *expr = new_row.at(assign.column_item_->col_idx_);
@ -607,6 +638,8 @@ int ObTableApiModifyExecutor::insert_upd_new_row_to_das(const ObTableUpdCtDef &u
} else if (OB_ISNULL(upd_rtdef.dins_rtdef_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("dins_rtdef_ is null", K(ret));
} else if (OB_FAIL(check_row_null(upd_ctdef.new_row_, upd_ctdef.assign_columns_))) {
LOG_WARN("fail to check row nullable", K(ret));
} else if (OB_FAIL(ObDMLService::insert_row(*upd_ctdef.dins_ctdef_,
*upd_rtdef.dins_rtdef_,
tablet_loc,

View File

@ -128,6 +128,7 @@ protected:
int stored_row_to_exprs(const ObChunkDatumStore::StoredRow &row,
const common::ObIArray<ObExpr*> &exprs,
ObEvalCtx &ctx);
int check_row_null(const ObExprPtrIArray &row, const ColContentIArray &column_infos);
protected:
sql::ObDMLRtCtx dml_rtctx_;
int64_t affected_rows_;

View File

@ -23,7 +23,7 @@ using namespace oceanbase::table;
////////////////////////////////////////////////////////////////
int ObTableMoveResponseSender::get_replica(const uint64_t table_id,
const common::ObTabletID &tablet_id,
const ObTabletID &tablet_id,
table::ObTableMoveReplicaInfo &replica)
{
int ret = OB_SUCCESS;
@ -50,28 +50,33 @@ int ObTableMoveResponseSender::get_replica(const uint64_t table_id,
}
int ObTableMoveResponseSender::init(const uint64_t table_id,
const common::ObTabletID &tablet_id,
const ObTabletID &tablet_id,
share::schema::ObMultiVersionSchemaService &schema_service)
{
int ret = OB_SUCCESS;
ObTableMoveReplicaInfo &replica = result_.get_replica_info();
if (OB_FAIL(get_replica(table_id, tablet_id, replica))) {
LOG_WARN("fail to get partition info", K(ret), K(table_id), K(tablet_id));
share::schema::ObSchemaGetterGuard schema_guard;
const share::schema::ObTableSchema *table_schema = nullptr;
if (OB_FAIL(schema_service.get_tenant_schema_guard(MTL_ID(), schema_guard))) {
LOG_WARN("fail to get schema guard", K(ret));
} else if (OB_FAIL(schema_guard.get_table_schema(MTL_ID(), table_id, table_schema))) {
LOG_WARN("fail to get table schema", K(table_id), K(ret));
} else if (OB_ISNULL(table_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("NULL ptr", K(ret), K(table_id));
} else {
share::schema::ObSchemaGetterGuard schema_guard;
const share::schema::ObTableSchema *table_schema = nullptr;
if (OB_FAIL(schema_service.get_tenant_schema_guard(MTL_ID(), schema_guard))) {
LOG_WARN("fail to get schema guard", K(ret));
} else if (OB_FAIL(schema_guard.get_table_schema(MTL_ID(), table_id, table_schema))) {
LOG_WARN("fail to get table schema", K(table_id), K(ret));
} else if (OB_ISNULL(table_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("NULL ptr", K(ret), K(table_id));
ObTabletID tmp_tablet_id = tablet_id;
if (!table_schema->is_partitioned_table()) {
tmp_tablet_id = table_schema->get_tablet_id();
}
if (OB_FAIL(get_replica(table_id, tmp_tablet_id, replica))) {
LOG_WARN("fail to get partition info", K(ret), K(table_id), K(tmp_tablet_id));
} else {
replica.set_table_id(table_id);
replica.set_schema_version(table_schema->get_schema_version());
replica.set_tablet_id(tablet_id);
replica.set_tablet_id(tmp_tablet_id);
// set move pcode
response_sender_.set_pcode(obrpc::OB_TABLE_API_MOVE);

View File

@ -62,7 +62,7 @@ int ObTableOpWrapper::process_op_with_spec(ObTableCtx &tb_ctx,
ret = COVER_SUCC(tmp_ret);
}
op_result.set_errno(ret);
op_result.set_err(ret);
op_result.set_type(tb_ctx.get_opertion_type());
spec->destroy_executor(executor);
return ret;
@ -201,6 +201,78 @@ int ObTableOpWrapper::process_get_with_spec(ObTableCtx &tb_ctx,
return ret;
}
int ObTableOpWrapper::get_insert_spec(ObTableCtx &tb_ctx,
ObTableApiCacheGuard &cache_guard,
ObTableApiSpec *&spec)
{
int ret = OB_SUCCESS;
if (!tb_ctx.is_ttl_table()) {
if (OB_FAIL(ObTableOpWrapper::get_or_create_spec<TABLE_API_EXEC_INSERT>(tb_ctx, cache_guard, spec))) {
LOG_WARN("fail to get or create insert spec", K(ret));
}
} else {
if (OB_FAIL(ObTableOpWrapper::get_or_create_spec<TABLE_API_EXEC_TTL>(tb_ctx, cache_guard, spec))) {
LOG_WARN("fail to get or create ttl spec", K(ret));
}
}
return ret;
}
int ObTableOpWrapper::get_insert_up_spec(ObTableCtx &tb_ctx,
ObTableApiCacheGuard &cache_guard,
ObTableApiSpec *&spec)
{
int ret = OB_SUCCESS;
if (!tb_ctx.is_ttl_table()) {
if (OB_FAIL(ObTableOpWrapper::get_or_create_spec<TABLE_API_EXEC_INSERT_UP>(tb_ctx, cache_guard, spec))) {
LOG_WARN("fail to get or create insert up spec", K(ret));
}
} else {
if (OB_FAIL(ObTableOpWrapper::get_or_create_spec<TABLE_API_EXEC_TTL>(tb_ctx, cache_guard, spec))) {
LOG_WARN("fail to get or create ttl spec", K(ret));
}
}
return ret;
}
int ObTableOpWrapper::process_insert_op(ObTableCtx &tb_ctx, ObTableOperationResult &op_result)
{
int ret = OB_SUCCESS;
if (!tb_ctx.is_ttl_table()) {
if (OB_FAIL(ObTableOpWrapper::process_op<TABLE_API_EXEC_INSERT>(tb_ctx, op_result))) {
LOG_WARN("fail to process insert operation", K(ret));
}
} else {
if (OB_FAIL(ObTableOpWrapper::process_op<TABLE_API_EXEC_TTL>(tb_ctx, op_result))) {
LOG_WARN("fail to process ttl insert operation", K(ret));
}
}
return ret;
}
int ObTableOpWrapper::process_insert_up_op(ObTableCtx &tb_ctx, ObTableOperationResult &op_result)
{
int ret = OB_SUCCESS;
if (!tb_ctx.is_ttl_table()) {
if (OB_FAIL(ObTableOpWrapper::process_op<TABLE_API_EXEC_INSERT_UP>(tb_ctx, op_result))) {
LOG_WARN("fail to process insert up operation", K(ret));
}
} else {
if (OB_FAIL(ObTableOpWrapper::process_op<TABLE_API_EXEC_TTL>(tb_ctx, op_result))) {
LOG_WARN("fail to process ttl insert up operation", K(ret));
}
}
return ret;
}
int ObTableApiUtil::construct_entity_from_row(ObIAllocator &allocator,
ObNewRow *row,
const ObTableSchema *table_schema,
@ -223,7 +295,9 @@ int ObTableApiUtil::construct_entity_from_row(ObIAllocator &allocator,
for (int64_t i = 0; OB_SUCC(ret) && i < N; ++i) {
const ObString &name = arr_col->at(i);
if (OB_ISNULL(column_schema = table_schema->get_column_schema(name))) {
ret = OB_ERR_COLUMN_NOT_FOUND;
ret = OB_ERR_BAD_FIELD_ERROR;
const ObString &table = table_schema->get_table_name_str();
LOG_USER_ERROR(OB_ERR_BAD_FIELD_ERROR, name.length(), name.ptr(), table.length(), table.ptr());
LOG_WARN("column not exist", K(ret), K(name));
} else {
int64_t column_idx = table_schema->get_column_idx(column_schema->get_column_id());

View File

@ -68,6 +68,10 @@ public:
// get特有的逻辑,单独处理
static int process_get(ObTableCtx &tb_ctx, ObNewRow *&row);
static int process_get_with_spec(ObTableCtx &tb_ctx, ObTableApiSpec *spec, ObNewRow *&row);
static int get_insert_spec(ObTableCtx &tb_ctx, ObTableApiCacheGuard &cache_guard, ObTableApiSpec *&spec);
static int get_insert_up_spec(ObTableCtx &tb_ctx, ObTableApiCacheGuard &cache_guard, ObTableApiSpec *&spec);
static int process_insert_op(ObTableCtx &tb_ctx, ObTableOperationResult &op_result);
static int process_insert_up_op(ObTableCtx &tb_ctx, ObTableOperationResult &op_result);
private:
static int process_affected_entity(ObTableCtx &tb_ctx,
const ObTableApiSpec &spec,
@ -91,7 +95,9 @@ public:
if (OB_ERR_PRIMARY_KEY_DUPLICATE == ret
|| OB_BAD_NULL_ERROR == ret
|| OB_OBJ_TYPE_ERROR == ret
|| OB_KV_COLUMN_TYPE_NOT_MATCH == ret
|| OB_ERR_COLLATION_MISMATCH == ret
|| OB_KV_COLLATION_MISMATCH == ret
|| OB_ERR_DATA_TOO_LONG == ret
|| OB_DATA_OUT_OF_RANGE == ret) {
ret = OB_SUCCESS;

View File

@ -75,6 +75,7 @@ int ObTableQueryAndMutateP::check_arg()
LOG_WARN("invalid table query request", K(ret), K(query));
} else if ((ObTableEntityType::ET_HKV == arg_.entity_type_) && !hfilter.is_valid()) {
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "QueryAndMutate hbase model not set hfilter");
LOG_WARN("QueryAndMutate hbase model should set hfilter", K(ret));
} else if ((ObTableEntityType::ET_KV == arg_.entity_type_) && (1 != mutations.count())) {
ret = OB_ERR_UNEXPECTED;
@ -320,7 +321,7 @@ int ObTableQueryAndMutateP::execute_htable_put(const ObITableEntity &new_entity)
LOG_WARN("fail to init table ctx", K(ret));
} else if (OB_FAIL(tb_ctx.init_trans(get_trans_desc(), get_tx_snapshot()))) {
LOG_WARN("fail to init trans", K(ret), K(tb_ctx));
} else if (OB_FAIL(ObTableOpWrapper::process_op<TABLE_API_EXEC_INSERT_UP>(tb_ctx, op_result))) {
} else if (OB_FAIL(ObTableOpWrapper::process_insert_up_op(tb_ctx, op_result))) {
LOG_WARN("fail to process insert up op", K(ret));
}
}
@ -644,7 +645,7 @@ int ObTableQueryAndMutateP::execute_htable_insert(const ObITableEntity &new_enti
LOG_WARN("fail to init table ctx", K(ret));
} else if (OB_FAIL(tb_ctx.init_trans(get_trans_desc(), get_tx_snapshot()))) {
LOG_WARN("fail to init trans", K(ret), K(tb_ctx));
} else if (OB_FAIL(ObTableOpWrapper::process_op<TABLE_API_EXEC_INSERT>(tb_ctx, op_result))) {
} else if (OB_FAIL(ObTableOpWrapper::process_insert_op(tb_ctx, op_result))) {
LOG_WARN("fail to process insert op", K(ret));
}
}
@ -705,6 +706,7 @@ int ObTableQueryAndMutateP::execute_htable_mutation(ObTableQueryResultIterator *
}
default: {
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "mutation type");
LOG_WARN("not supported mutation type", K(ret), "type", mutation.type());
break;
}
@ -847,42 +849,24 @@ int ObTableQueryAndMutateP::execute_one_mutation(ObTableQueryResult &one_result,
}
break;
}
case ObTableOperationType::INCREMENT: {
if (tb_ctx_.is_ttl_table()) {
ret = process_dml_op<TABLE_API_EXEC_TTL>(*new_entity, tmp_affect_rows);
} else {
ret = process_dml_op<TABLE_API_EXEC_INSERT_UP>(*new_entity, tmp_affect_rows);
}
if (OB_FAIL(ret)) {
LOG_WARN("fail to do increment", K(ret), K(tb_ctx_.is_ttl_table()));
} else {
affected_rows += tmp_affect_rows;
}
break;
}
case ObTableOperationType::INCREMENT:
case ObTableOperationType::APPEND: {
if (tb_ctx_.is_ttl_table()) {
ret = process_dml_op<TABLE_API_EXEC_TTL>(*new_entity, tmp_affect_rows);
} else {
ret = process_dml_op<TABLE_API_EXEC_INSERT_UP>(*new_entity, tmp_affect_rows);
}
if (OB_FAIL(ret)) {
LOG_WARN("fail to do append", K(ret), K(tb_ctx_.is_ttl_table()));
} else {
ret = process_insert_up(*new_entity, tmp_affect_rows);
if (OB_SUCC(ret)) {
affected_rows += tmp_affect_rows;
}
break;
}
case ObTableOperationType::INSERT: { // 使用mutation上的entity执行insert
if (OB_FAIL(process_dml_op<TABLE_API_EXEC_INSERT>(mutate_entity, tmp_affect_rows))) {
LOG_WARN("ail to execute table insert", K(ret));
} else {
ret = process_insert(mutate_entity, tmp_affect_rows);
if (OB_SUCC(ret)) {
affected_rows += tmp_affect_rows;
}
break;
}
default: {
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "mutation type");
LOG_WARN("not supported mutation type", K(ret), "type", mutation.type());
break;
}

View File

@ -108,6 +108,26 @@ private:
return ret;
}
int process_insert(const table::ObITableEntity &new_entity, int64_t &affected_rows)
{
int ret = OB_SUCCESS;
if (!tb_ctx_.is_ttl_table()) {
ret = process_dml_op<table::TABLE_API_EXEC_INSERT>(new_entity, affected_rows);
} else {
ret = process_dml_op<table::TABLE_API_EXEC_TTL>(new_entity, affected_rows);
}
return ret;
}
int process_insert_up(const table::ObITableEntity &new_entity, int64_t &affected_rows)
{
int ret = OB_SUCCESS;
if (!tb_ctx_.is_ttl_table()) {
ret = process_dml_op<table::TABLE_API_EXEC_INSERT_UP>(new_entity, affected_rows);
} else {
ret = process_dml_op<table::TABLE_API_EXEC_TTL>(new_entity, affected_rows);
}
return ret;
}
private:
common::ObArenaAllocator allocator_;
table::ObTableCtx tb_ctx_;

View File

@ -50,6 +50,7 @@ int ObTableQueryUtils::check_htable_query_args(const ObTableQuery &query,
LOG_WARN("htable scan should not set Offset and Limit", K(ret), K(query));
} else if (ObQueryFlag::Forward != query.get_scan_order() && ObQueryFlag::Reverse != query.get_scan_order()) {
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "scan order");
LOG_WARN("TableQuery with htable_filter only support forward and reverse scan yet", K(ret));
}
}

View File

@ -55,6 +55,7 @@ int ObTableQueryP::check_arg()
} else if (!(arg_.consistency_level_ == ObTableConsistencyLevel::STRONG ||
arg_.consistency_level_ == ObTableConsistencyLevel::EVENTUAL)) {
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "consistency level");
LOG_WARN("some options not supported yet", K(ret),
"consistency_level", arg_.consistency_level_);
}

View File

@ -296,6 +296,7 @@ int ObTableQuerySyncP::check_arg()
} else if (!(arg_.consistency_level_ == ObTableConsistencyLevel::STRONG ||
arg_.consistency_level_ == ObTableConsistencyLevel::EVENTUAL)) {
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "consistency level");
LOG_WARN("some options not supported yet", K(ret), "consistency_level", arg_.consistency_level_);
}
return ret;

View File

@ -132,6 +132,10 @@ int ObTableLoginP::get_ids()
if (OB_FAIL(gctx_.schema_service_->get_tenant_schema_guard(OB_SYS_TENANT_ID, guard))) {
LOG_WARN("get_schema_guard failed", K(ret));
} else if (OB_FAIL(guard.get_tenant_id(arg_.tenant_name_, result_.tenant_id_))) {
if (ret == OB_ERR_INVALID_TENANT_NAME) {
ret = OB_TENANT_NOT_EXIST;
LOG_USER_ERROR(OB_TENANT_NOT_EXIST, arg_.tenant_name_.length(), arg_.tenant_name_.ptr());
}
LOG_WARN("get_tenant_id failed", K(ret), "tenant", arg_.tenant_name_);
} else if (OB_INVALID_ID == result_.tenant_id_) {
ret = OB_ERR_INVALID_TENANT_NAME;
@ -142,14 +146,19 @@ int ObTableLoginP::get_ids()
result_.database_id_))) {
LOG_WARN("failed to get database id", K(ret), "database", arg_.database_name_);
} else if (OB_INVALID_ID == result_.database_id_) {
ret = OB_WRONG_DB_NAME;
ret = OB_ERR_BAD_DATABASE;
LOG_USER_ERROR(OB_ERR_BAD_DATABASE, arg_.database_name_.length(), arg_.database_name_.ptr());
LOG_WARN("failed to get database id", K(ret), "database", arg_.database_name_);
} else if (OB_FAIL(guard.get_user_id(result_.tenant_id_, arg_.user_name_,
ObString::make_string("%")/*assume there is no specific host*/,
result_.user_id_))) {
if (ret == OB_ERR_USER_NOT_EXIST) {
LOG_USER_ERROR(OB_ERR_USER_NOT_EXIST);
}
LOG_WARN("failed to get user id", K(ret), "user", arg_.user_name_);
} else if (OB_INVALID_ID == result_.user_id_) {
ret = OB_ERR_USER_NOT_EXIST;
LOG_USER_ERROR(OB_ERR_USER_NOT_EXIST);
LOG_WARN("failed to get user id", K(ret), "user", arg_.user_name_);
}
}
@ -183,6 +192,9 @@ int ObTableLoginP::verify_password(const ObString &tenant, const ObString &user,
} else if (gctx_.schema_service_->get_tenant_schema_guard(tenant_id, guard)) {
LOG_WARN("fail to get tenant guard", KR(ret), K(tenant_id));
} else if (OB_FAIL(guard.check_user_access(login_info, session_priv, ssl_st, user_info))) {
if (ret == OB_PASSWORD_WRONG) {
LOG_USER_ERROR(OB_PASSWORD_WRONG, user.length(), user.ptr(), tenant.length(), tenant.ptr(), "YES"/*using password*/);
}
LOG_WARN("User access denied", K(login_info), K(ret));
} else if (OB_ISNULL(user_info)) {
ret = OB_ERR_UNEXPECTED;
@ -273,7 +285,12 @@ int ObTableApiProcessorBase::check_user_access(const ObString &credential_str)
} else if (OB_FAIL(guard.get_credential(sess_credetial))) {
LOG_WARN("fail to get credential", K(ret));
} else if (sess_credetial->hash_val_ != credential_.hash_val_) {
ret = OB_ERR_NO_PRIVILEGE;
ret = OB_KV_CREDENTIAL_NOT_MATCH;
char user_cred_info[128];
char sess_cred_info[128];
int user_len = credential_.to_string(user_cred_info, 128);
int sess_len = sess_credetial->to_string(sess_cred_info, 128);
LOG_USER_ERROR(OB_KV_CREDENTIAL_NOT_MATCH, user_len, user_cred_info, sess_len, sess_cred_info);
LOG_WARN("invalid credential", K(ret), K_(credential), K(*sess_credetial));
} else if (sess_credetial->cluster_id_ != credential_.cluster_id_) {
ret = OB_ERR_NO_PRIVILEGE;
@ -446,6 +463,7 @@ int ObTableApiProcessorBase::start_trans_(bool is_readonly,
bool strong_read = ObTableConsistencyLevel::STRONG == consistency_level;
if ((!is_readonly) && (ObTableConsistencyLevel::EVENTUAL == consistency_level)) {
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "consistency level");
LOG_WARN("some options not supported yet", K(ret), K(is_readonly), K(consistency_level));
}
transaction::ObTransService *txs = MTL(transaction::ObTransService*);

View File

@ -97,8 +97,9 @@ void ObTableApiSessPoolMgr::destroy()
pool_->destroy();
pool_ = nullptr;
}
allocator_.reset();
allocator_.reset(); // when mtl_destroy, all worker thread has beed existed, no need to lock allocator
is_inited_ = false;
LOG_INFO("ObTableApiSessPoolMgr destroy successfully");
}
}
@ -128,6 +129,7 @@ int ObTableApiSessPoolMgr::get_sess_info(ObTableApiCredential &credential, ObTab
if (credential.tenant_id_ != MTL_ID()) {
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "access wrong tenant");
LOG_WARN("access wrong tenant", K(ret), K(credential.tenant_id_), K(MTL_ID()));
} else if (OB_UNLIKELY(OB_ISNULL(pool_)) && OB_FAIL(create_session_pool_safe())) {
LOG_WARN("fail to create session pool", K(ret), K(credential));
@ -315,8 +317,9 @@ void ObTableApiSessPool::destroy()
retired_nodes_.clear();
key_node_map_.destroy();
allocator_.reset();
allocator_.reset(); // when mtl_destroy, all worker thread has beed existed, no need to lock allocator
is_inited_ = false;
LOG_INFO("ObTableApiSessPool destroy successfully", K(MTL_ID()));
}
/*
@ -357,7 +360,7 @@ int ObTableApiSessPool::move_node_to_retired_list(ObTableApiSessNode *node)
{
int ret = OB_SUCCESS;
ObLockGuard<ObSpinLock> guard(lock_);
ObLockGuard<ObSpinLock> guard(retired_nodes_lock_); // lock retired_nodes_
if (OB_ISNULL(node)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("session node is null", K(ret));
@ -379,17 +382,22 @@ int ObTableApiSessPool::move_node_to_retired_list(ObTableApiSessNode *node)
int ObTableApiSessPool::evict_retired_sess()
{
int ret = OB_SUCCESS;
int64 delete_count = 0;
int64_t delete_count = 0;
int64_t cur_time = ObTimeUtility::current_time();
ObLockGuard<ObSpinLock> guard(retired_nodes_lock_); // lock retired_nodes_
DLIST_FOREACH_REMOVESAFE_X(node, retired_nodes_, delete_count < BACKCROUND_TASK_DELETE_SESS_NUM) {
if (OB_FAIL(node->remove_unused_sess())) {
if (cur_time - node->get_last_active_ts() < SESS_RETIRE_TIME) {
// do nothing, this node maybe is from ObTableApiSessNodeReplaceOp, some threads maybe is using it.
// we remove it next retire task.
} else if (OB_FAIL(node->remove_unused_sess())) {
LOG_WARN("fail to remove unused sess", K(ret), K(*node));
} else {
ObLockGuard<ObSpinLock> guard(lock_);
if (node->is_empty()) {
ObTableApiSessNode *rm_node = retired_nodes_.remove(node);
if (OB_NOT_NULL(rm_node)) {
rm_node->~ObTableApiSessNode();
ObLockGuard<ObSpinLock> alloc_guard(allocator_lock_); // lock allocator_
allocator_.free(rm_node);
rm_node = nullptr;
delete_count++;
@ -398,6 +406,10 @@ int ObTableApiSessPool::evict_retired_sess()
}
}
if (delete_count != 0) {
LOG_INFO("evict retired session node", K(delete_count), K(retired_nodes_.get_size()));
}
return ret;
}
@ -491,7 +503,7 @@ int ObTableApiSessPool::get_sess_info(ObTableApiCredential &credential, ObTableA
int ObTableApiSessPool::create_node_safe(ObTableApiCredential &credential, ObTableApiSessNode *&node)
{
int ret = OB_SUCCESS;
ObLockGuard<ObSpinLock> guard(lock_);
ObLockGuard<ObSpinLock> alloc_guard(allocator_lock_); // lock allocator_
ObTableApiSessNode *tmp_node = nullptr;
void *buf = nullptr;
@ -521,7 +533,7 @@ int ObTableApiSessPool::create_and_add_node_safe(ObTableApiCredential &credentia
ret = OB_SUCCESS; // replace error code
}
// this node has been set by other thread, free it
ObLockGuard<ObSpinLock> guard(lock_);
ObLockGuard<ObSpinLock> alloc_guard(allocator_lock_); // lock allocator_
node->~ObTableApiSessNode();
allocator_.free(node);
node = nullptr;
@ -573,6 +585,7 @@ void ObTableApiSessNodeVal::destroy()
sess_info_.~ObSQLSessionInfo();
is_inited_ = false;
owner_node_ = nullptr;
tenant_id_ = OB_INVALID;
}
int ObTableApiSessNodeVal::init_sess_info()
@ -582,18 +595,18 @@ int ObTableApiSessNodeVal::init_sess_info()
if (!is_inited_) {
share::schema::ObSchemaGetterGuard schema_guard;
const ObTenantSchema *tenant_schema = nullptr;
if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(MTL_ID(), schema_guard))) {
LOG_WARN("fail to get schema guard", K(ret), K(MTL_ID()));
} else if (OB_FAIL(schema_guard.get_tenant_info(MTL_ID(), tenant_schema))) {
LOG_WARN("fail to get tenant schema", K(ret), K(MTL_ID()));
if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(tenant_id_, schema_guard))) {
LOG_WARN("fail to get schema guard", K(ret), K_(tenant_id));
} else if (OB_FAIL(schema_guard.get_tenant_info(tenant_id_, tenant_schema))) {
LOG_WARN("fail to get tenant schema", K(ret), K_(tenant_id));
} else if (OB_ISNULL(tenant_schema)) {
ret = OB_SCHEMA_ERROR;
LOG_WARN("tenant schema is null", K(ret));
} else if (OB_FAIL(ObTableApiSessUtil::init_sess_info(MTL_ID(),
} else if (OB_FAIL(ObTableApiSessUtil::init_sess_info(tenant_id_,
tenant_schema->get_tenant_name_str(),
schema_guard,
sess_info_))) {
LOG_WARN("fail to init sess info", K(ret), K(MTL_ID()));
LOG_WARN("fail to init sess info", K(ret), K_(tenant_id));
} else {
is_inited_ = true;
}
@ -640,6 +653,7 @@ void ObTableApiSessNode::destroy()
rm_sess->destroy();
}
}
ObLockGuard<ObSpinLock> alloc_guard(allocator_lock_); // lock allocator_
allocator_.reset();
}
@ -656,6 +670,7 @@ int ObTableApiSessNode::remove_unused_sess()
ObTableApiSessNodeVal *rm_sess = free_list.remove(sess);
if (OB_NOT_NULL(rm_sess)) {
rm_sess->~ObTableApiSessNodeVal();
ObLockGuard<ObSpinLock> alloc_guard(allocator_lock_); // lock allocator_
allocator_.free(rm_sess);
rm_sess = nullptr;
}
@ -701,14 +716,14 @@ int ObTableApiSessNode::get_sess_node_val(ObTableApiSessNodeVal *&val)
int ObTableApiSessNode::extend_and_get_sess_val(ObTableApiSessGuard &guard)
{
int ret = OB_SUCCESS;
ObLockGuard<ObSpinLock> alloc_guard(lock_); // avoid concurrent allocator_.alloc
ObLockGuard<ObSpinLock> alloc_guard(allocator_lock_); // lock allocator_
void *buf = nullptr;
if (OB_ISNULL(buf = allocator_.alloc(sizeof(ObTableApiSessNodeVal)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to alloc mem for ObTableApiSessNodeVal", K(ret), K(sizeof(ObTableApiSessNodeVal)));
} else {
ObTableApiSessNodeVal *val = new (buf) ObTableApiSessNodeVal(this);
ObTableApiSessNodeVal *val = new (buf) ObTableApiSessNodeVal(this, credential_.tenant_id_);
if (OB_FAIL(val->init_sess_info())) {
LOG_WARN("fail to init sess info", K(ret), K(*val));
} else {

View File

@ -75,7 +75,7 @@ private:
common::ObArenaAllocator allocator_;
ObTableApiSessPool *pool_;
ObTableApiSessEliminationTask elimination_task_;
ObSpinLock lock_; // for get_or_create_sess_pool
ObSpinLock lock_; // for double check pool creating
private:
DISALLOW_COPY_AND_ASSIGN(ObTableApiSessPoolMgr);
};
@ -106,18 +106,20 @@ public:
int evict_retired_sess();
int create_node_safe(ObTableApiCredential &credential, ObTableApiSessNode *&node);
int move_node_to_retired_list(ObTableApiSessNode *node);
common::ObIAllocator& get_allocator() { return allocator_; };
private:
int replace_sess_node_safe(ObTableApiCredential &credential);
int create_and_add_node_safe(ObTableApiCredential &credential);
int get_sess_node(uint64_t key, ObTableApiSessNode *&node);
private:
common::ObArenaAllocator allocator_;
ObSpinLock allocator_lock_; // for lock allocator_
bool is_inited_;
CacheKeyNodeMap key_node_map_;
// 已经淘汰的node,等待被后台删除
// 前台login时、后台淘汰时都会操作retired_nodes_,因此需要加锁
common::ObDList<ObTableApiSessNode> retired_nodes_;
ObSpinLock lock_;; // for lock retired_nodes_/allocator_
ObSpinLock retired_nodes_lock_; // for lock retired_nodes_
private:
DISALLOW_COPY_AND_ASSIGN(ObTableApiSessPool);
};
@ -127,8 +129,10 @@ class ObTableApiSessNodeVal : public common::ObDLinkBase<ObTableApiSessNodeVal>
friend class ObTableApiSessNode;
friend class ObTableApiSessGuard;
public:
explicit ObTableApiSessNodeVal(ObTableApiSessNode *owner)
explicit ObTableApiSessNodeVal(ObTableApiSessNode *owner, uint64_t tenant_id)
: is_inited_(false),
tenant_id_(tenant_id),
sess_info_(tenant_id_), // sess_info_ use 500 tenant default, so we must set tenant_id
owner_node_(owner)
{}
TO_STRING_KV(K_(is_inited),
@ -144,6 +148,7 @@ public:
void give_back_to_free_list();
private:
bool is_inited_;
uint64_t tenant_id_;
sql::ObSQLSessionInfo sess_info_;
ObTableApiSessNode *owner_node_;
private:
@ -191,10 +196,10 @@ private:
int extend_and_get_sess_val(ObTableApiSessGuard &guard);
private:
common::ObArenaAllocator allocator_;
ObSpinLock allocator_lock_; // for lock allocator_
SessList sess_lists_;
int64_t last_active_ts_;
ObTableApiCredential credential_;
ObSpinLock lock_;; // for lock allocator_
private:
DISALLOW_COPY_AND_ASSIGN(ObTableApiSessNode);
};

View File

@ -13271,11 +13271,16 @@ int datetime_scale_check_only(const ObAccuracy &accuracy, const ObObj &obj)
if (OB_UNLIKELY(scale > MAX_SCALE_FOR_TEMPORAL)) {
ret = OB_ERR_TOO_BIG_PRECISION;
LOG_USER_ERROR(OB_ERR_TOO_BIG_PRECISION, scale, "CAST", static_cast<int64_t>(MAX_SCALE_FOR_TEMPORAL));
} else if (OB_UNLIKELY(0 <= scale && scale < MAX_SCALE_FOR_TEMPORAL)) {
} else {
int64_t value = obj.get_datetime();
if (!ObTimeConverter::is_valid_datetime(value)) {
ret = OB_INVALID_DATA;
LOG_WARN("invalid datetime value", K(ret), K(value));
ObCastMode cast_mode = CM_ERROR_ON_SCALE_OVER;
if (OB_FAIL(time_usec_scale_check(cast_mode, accuracy, value))) {
LOG_WARN("check usec scale fail.", K(ret), K(value), K(accuracy));
} else if (OB_UNLIKELY(0 <= scale && scale < MAX_SCALE_FOR_TEMPORAL)) {
if (!ObTimeConverter::is_valid_datetime(value)) {
ret = OB_INVALID_DATA;
LOG_WARN("invalid datetime value", K(ret), K(value));
}
}
}
return ret;

View File

@ -14,7 +14,6 @@
#include "ob_table.h"
#include "share/ob_errno.h"
#include "lib/utility/ob_unify_serialize.h"
#include "common/row/ob_row.h"
#include "rpc/obrpc/ob_rpc_packet.h"
#include "sql/engine/expr/ob_expr_lob_utils.h"
@ -1471,44 +1470,65 @@ int ObTableQueryResult::alloc_buf_if_need(const int64_t need_size)
int ObTableQueryResult::add_row(const ObNewRow &row)
{
int ret = OB_SUCCESS;
ret = alloc_buf_if_need(row.get_serialize_size());
// 1. check properties count and row count
const int64_t N = row.get_count();
if (OB_SUCC(ret)) {
if (0 != properties_names_.count()
&& N != properties_names_.count()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("cell count not match with property count", K(ret), K(N),
"properties_count", properties_names_.count());
}
}
for (int i = 0; OB_SUCCESS == ret && i < N; ++i)
{
// Output of TableApi does not have lob locator header, remove lob header before serialize.
// Functions defined by DEF_TEXT_SERIALIZE_FUNCS is called here, refer to ob_obj_funcs.h
ObObjType type = row.get_cell(i).get_type();
if (is_lob_storage(type)) {
ObObj tmp_obj = row.get_cell(i);
ObString read_data;
if (tmp_obj.has_lob_header()) {
if (OB_FAIL(sql::ObTextStringHelper::read_real_string_data(&allocator_, tmp_obj, read_data))) {
LOG_WARN("failed to get obj", K(ret), K_(buf));
} else {
tmp_obj.set_lob_value(type, read_data.ptr(), read_data.length());
if (0 != properties_names_.count()
&& N != properties_names_.count()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("cell count not match with property count", K(ret), K(N),
"properties_count", properties_names_.count());
} else {
// 2. construct new row
ObNewRow new_row = row;
ObObj *lob_cells = nullptr;
const int64_t lob_storage_count = get_lob_storage_count(new_row);
if (lob_storage_count != 0) {
if (OB_ISNULL(lob_cells = static_cast<ObObj*>(allocator_.alloc(sizeof(ObObj) * lob_storage_count)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to alloc cells buffer", K(ret), K(lob_storage_count));
}
for (int i = 0, lob_cell_idx = 0; OB_SUCC(ret) && i < N; ++i) {
const ObObj &cell = new_row.get_cell(i);
ObObjType type = cell.get_type();
if (is_lob_storage(type)) {
ObString real_data;
if (cell.has_lob_header()) {
if (OB_FAIL(sql::ObTextStringHelper::read_real_string_data(&allocator_, cell, real_data))) {
LOG_WARN("fail to read real string date", K(ret), K(cell));
} else if (lob_cell_idx >= lob_storage_count) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected index count", K(ret), K(lob_cell_idx), K(lob_storage_count));
} else {
lob_cells[lob_cell_idx].set_lob_value(type, real_data.ptr(), real_data.length());
lob_cells[lob_cell_idx].set_collation_type(cell.get_collation_type());
new_row.get_cell(i) = lob_cells[lob_cell_idx]; // switch lob cell
lob_cell_idx++;
}
}
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(tmp_obj.serialize(buf_.get_data(), buf_.get_capacity(), buf_.get_position()))) {
LOG_WARN("failed to serialize obj", K(ret), K_(buf));
}
} else {
}
// 3. alloc serialize size
int64_t size = new_row.get_serialize_size();
if (OB_FAIL(ret)) {
} else if (OB_FAIL(alloc_buf_if_need(size))) {
LOG_WARN("failed to alloc buff", K(ret), K(size));
}
// 4. serialize
for (int i = 0; OB_SUCC(ret) && i < N; ++i) {
if (OB_FAIL(row.get_cell(i).serialize(buf_.get_data(), buf_.get_capacity(), buf_.get_position()))) {
LOG_WARN("failed to serialize obj", K(ret), K_(buf));
}
}
} // end for
}
if (OB_SUCC(ret)) {
++row_count_;
}
return ret;
}

View File

@ -29,6 +29,8 @@
#include "share/table/ob_table_ttl_common.h"
#include "common/rowkey/ob_rowkey.h"
#include "common/ob_role.h"
#include "common/row/ob_row.h"
#include "lib/oblog/ob_warning_buffer.h"
namespace oceanbase
{
namespace common
@ -380,6 +382,16 @@ public:
msg_[0] = '\0';
}
~ObTableResult() = default;
void set_err(int err)
{
errno_ = err;
if (err != common::OB_SUCCESS) {
common::ObWarningBuffer *wb = common::ob_get_tsi_warning_buffer();
if (OB_NOT_NULL(wb)) {
(void)snprintf(msg_, common::OB_MAX_ERROR_MSG_LEN, "%s", wb->get_err_msg());
}
}
}
void set_errno(int err) { errno_ = err; }
int get_errno() const { return errno_; }
int assign(const ObTableResult &other);
@ -879,6 +891,16 @@ public:
private:
static const int64_t DEFAULT_BUF_BLOCK_SIZE = common::OB_MALLOC_BIG_BLOCK_SIZE - (1024*1024LL);
int alloc_buf_if_need(const int64_t size);
OB_INLINE int64_t get_lob_storage_count(const common::ObNewRow &row) const
{
int64_t count = 0;
for (int64_t i = 0; i < row.get_count(); ++i) {
if (is_lob_storage(row.get_cell(i).get_type())) {
count++;
}
}
return count;
}
private:
common::ObSEArray<ObString, 16> properties_names_; // serialize
int64_t row_count_; // serialize

View File

@ -164,7 +164,7 @@ void TestCreateExecutor::TearDown()
{
}
ObTableApiSessNodeVal g_sess_node_val(NULL);
ObTableApiSessNodeVal g_sess_node_val(NULL, 500);
void TestCreateExecutor::fake_ctx_init_common(ObTableCtx &fake_ctx, ObTableSchema *table_schema)
{
fake_ctx.table_schema_ = table_schema;
@ -459,21 +459,22 @@ TEST_F(TestCreateExecutor, cons_column_type)
col_schema.set_accuracy(acc);
uint32_t res_flag = ObRawExprUtils::calc_column_result_flag(col_schema);
ObExprResType column_type;
ObTableColumnInfo column_info;
ObTableCtx fake_ctx(allocator_);
schema_service_.get_schema_guard(fake_ctx.schema_guard_, 1);
fake_ctx_init_common(fake_ctx, &table_schema_);
ASSERT_EQ(OB_SUCCESS, fake_ctx.cons_column_type(col_schema, column_type));
ASSERT_EQ(ObVarcharType, column_type.get_type());
ASSERT_EQ(res_flag, column_type.get_result_flag());
ASSERT_EQ(CS_TYPE_UTF8MB4_GENERAL_CI, column_type.get_collation_type());
ASSERT_EQ(CS_LEVEL_IMPLICIT, column_type.get_collation_level());
ASSERT_EQ(1, column_type.get_accuracy().get_length());
ASSERT_EQ(OB_SUCCESS, fake_ctx.cons_column_info(col_schema, column_info));
ASSERT_EQ(ObVarcharType, column_info.type_.get_type());
ASSERT_EQ(res_flag, column_info.type_.get_result_flag());
ASSERT_EQ(CS_TYPE_UTF8MB4_GENERAL_CI, column_info.type_.get_collation_type());
ASSERT_EQ(CS_LEVEL_IMPLICIT, column_info.type_.get_collation_level());
ASSERT_EQ(1, column_info.type_.get_accuracy().get_length());
}
TEST_F(TestCreateExecutor, check_column_type)
{
ObExprResType column_type;
ObTableColumnInfo column_info;
column_info.type_.set_result_flag(NOT_NULL_WRITE_FLAG);
ObObj obj;
uint32_t res_flag = 0;
ObTableCtx fake_ctx(allocator_);
@ -483,22 +484,23 @@ TEST_F(TestCreateExecutor, check_column_type)
// check nullable
obj.set_null();
res_flag |= NOT_NULL_FLAG;
column_type.set_result_flag(res_flag);
ASSERT_EQ(OB_BAD_NULL_ERROR, fake_ctx.adjust_column_type(column_type, obj));
column_info.type_.set_result_flag(res_flag);
column_info.is_nullable_ = false;
ASSERT_EQ(OB_BAD_NULL_ERROR, fake_ctx.adjust_column_type(column_info, obj));
// check data type mismatch
res_flag = 0;
obj.set_int(1);
column_type.set_result_flag(res_flag);
column_type.set_type(ObVarcharType);
ASSERT_EQ(OB_OBJ_TYPE_ERROR, fake_ctx.adjust_column_type(column_type, obj));
column_info.type_.set_result_flag(res_flag);
column_info.type_.set_type(ObVarcharType);
ASSERT_EQ(OB_KV_COLUMN_TYPE_NOT_MATCH, fake_ctx.adjust_column_type(column_info, obj));
// check collation
obj.set_binary("ttt");
column_type.set_collation_type(CS_TYPE_UTF8MB4_GENERAL_CI);
ASSERT_EQ(OB_ERR_COLLATION_MISMATCH, fake_ctx.adjust_column_type(column_type, obj));
column_info.type_.set_collation_type(CS_TYPE_UTF8MB4_GENERAL_CI);
ASSERT_EQ(OB_KV_COLLATION_MISMATCH, fake_ctx.adjust_column_type(column_info, obj));
// collation convert
obj.set_varchar("test");
obj.set_collation_type(CS_TYPE_UTF8MB4_BIN);
ASSERT_EQ(OB_SUCCESS, fake_ctx.adjust_column_type(column_type, obj));
ASSERT_EQ(OB_SUCCESS, fake_ctx.adjust_column_type(column_info, obj));
ASSERT_EQ(CS_TYPE_UTF8MB4_GENERAL_CI, obj.get_collation_type());
}

View File

@ -102,7 +102,7 @@ TEST_F(TestTableSessPool, test_node_init)
TEST_F(TestTableSessPool, test_node_val_init)
{
ObTableApiSessNode node(*mock_cred_);
ObTableApiSessNodeVal val(&node);
ObTableApiSessNodeVal val(&node, MTL_ID());
ASSERT_FALSE(val.is_inited_);
ASSERT_EQ(&node, val.owner_node_);
}
@ -132,7 +132,7 @@ TEST_F(TestTableSessPool, mgr_get_session)
ASSERT_NE(0, node->last_active_ts_);
// add mock val to node
ObTableApiSessNodeVal val(node);
ObTableApiSessNodeVal val(node, MTL_ID());
val.is_inited_ = true;
ASSERT_EQ(true, node->sess_lists_.free_list_.add_last(&val));
@ -217,7 +217,7 @@ TEST_F(TestTableSessPool, mgr_sess_recycle)
// add mock val to node
ObTableApiSessNode *node;
ASSERT_EQ(OB_SUCCESS, mgr->pool_->get_sess_node(mock_cred_->hash_val_, node));
ObTableApiSessNodeVal val(node);
ObTableApiSessNodeVal val(node, MTL_ID());
val.is_inited_ = true;
ASSERT_EQ(true, node->sess_lists_.free_list_.add_last(&val));