[CP] tableapi query_and_mutate & batch atomic & select all column default & is/is_not

This commit is contained in:
obdev
2023-02-21 13:17:29 +00:00
committed by ob-robot
parent 36a33d63c8
commit 37cb2f29ae
52 changed files with 4852 additions and 1506 deletions

View File

@ -21,6 +21,7 @@
#include "ob_table_scan_executor.h"
#include "ob_table_cg_service.h"
#include "ob_htable_filter_operator.h"
#include "ob_table_query_common.h"
using namespace oceanbase::observer;
using namespace oceanbase::common;
@ -156,63 +157,25 @@ int ObTableQueryP::query_and_result(ObTableApiScanExecutor *executor)
{
int ret = OB_SUCCESS;
ObTableQueryResultIterator *result_iter = nullptr;
ObHTableFilterOperator *htable_result_iter = nullptr;
ObNormalTableQueryResultIterator *normal_result_iter = nullptr;
bool is_htable = arg_.query_.get_htable_filter().is_valid();
bool is_hkv = (ObTableEntityType::ET_HKV == arg_.entity_type_);
int32_t result_count = 0;
const int64_t timeout_ts = get_timeout_ts();
ObTableApiScanRowIterator row_iter;
// 1. create result iterator
if (is_htable) {
if (OB_FAIL(ObTableService::check_htable_query_args(arg_.query_))) {
LOG_WARN("fail to check htable query args", K(ret));
} else {
htable_result_iter = OB_NEWx(ObHTableFilterOperator, (&allocator_), arg_.query_, result_);
if (OB_ISNULL(htable_result_iter)) {
LOG_WARN("fail to alloc htable query result iterator", K(ret));
} else if (htable_result_iter->parse_filter_string(&allocator_)) {
LOG_WARN("fail to parse htable filter string", K(ret));
} else {
result_iter = htable_result_iter;
}
}
if (OB_FAIL(ObTableQueryUtils::generate_query_result_iterator(allocator_,
arg_.query_,
is_hkv,
result_,
tb_ctx_,
result_iter))) {
LOG_WARN("fail to generate query result iterator", K(ret));
} else if (OB_FAIL(row_iter.open(executor))) {
LOG_WARN("fail to open scan row iterator", K(ret));
} else {
normal_result_iter = OB_NEWx(ObNormalTableQueryResultIterator, (&allocator_), arg_.query_, result_);
if (OB_ISNULL(normal_result_iter)) {
LOG_WARN("fail to alloc normal query result iterator", K(ret));
} else {
result_iter = normal_result_iter;
}
}
// 2. set scan row iter to result iterator
if (OB_SUCC(ret)) {
if (OB_ISNULL(executor)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("executor is null", K(ret));
} else if (OB_FAIL(row_iter.open(executor))) {
LOG_WARN("fail to open scan row iterator", K(ret));
} else {
if (is_htable) {
htable_result_iter->set_scan_result(&row_iter);
ObHColumnDescriptor desc;
const ObString &comment = executor->get_table_ctx().get_table_schema()->get_comment_str();
if (OB_FAIL(desc.from_string(comment))) {
LOG_WARN("fail to parse hcolumn_desc from comment string", K(ret), K(comment));
} else if (desc.get_time_to_live() > 0) {
htable_result_iter->set_ttl(desc.get_time_to_live());
}
} else {
normal_result_iter->set_scan_result(&row_iter);
}
}
}
// 3. loop get row and serialize row
if (OB_SUCC(ret)) {
result_iter->set_scan_result(&row_iter);
// hbase model, compress the result packet
if (is_htable) {
if (is_hkv) {
ObCompressorType compressor_type = INVALID_COMPRESSOR;
if (OB_FAIL(ObCompressorPool::get_instance().get_compressor_type(
GCONF.tableapi_transport_compress_func, compressor_type))) {
@ -223,6 +186,10 @@ int ObTableQueryP::query_and_result(ObTableApiScanExecutor *executor)
this->set_result_compress_type(compressor_type);
ret = OB_SUCCESS; // reset ret
}
}
// 2. loop get row and serialize row
if (OB_SUCC(ret)) {
// one_result references to result_
ObTableQueryResult *one_result = nullptr;
while (OB_SUCC(ret)) {
@ -268,7 +235,7 @@ int ObTableQueryP::query_and_result(ObTableApiScanExecutor *executor)
}
// record events
if (is_htable) {
if (is_hkv) {
stat_event_type_ = ObTableProccessType::TABLE_API_HBASE_QUERY; // hbase query
} else {
stat_event_type_ = ObTableProccessType::TABLE_API_TABLE_QUERY;// table query