[FEAT MERGE] GIS

This commit is contained in:
obdev
2022-12-23 12:08:27 +00:00
committed by ob-robot
parent 38846e73d6
commit 40d215fc5a
519 changed files with 158600 additions and 8396 deletions

View File

@ -19,11 +19,14 @@
#include "sql/das/ob_das_define.h"
#include "sql/das/ob_das_utils.h"
#include "lib/profile/ob_perf_event.h"
#include "lib/geo/ob_s2adapter.h"
#include "lib/geo/ob_geo_utils.h"
#include "share/ob_ddl_checksum.h"
#include "storage/access/ob_table_scan_iterator.h"
#include "observer/ob_server_struct.h"
#include "observer/ob_server.h"
#include "observer/virtual_table/ob_virtual_data_access_service.h"
#include "observer/omt/ob_tenant_srs_mgr.h"
namespace oceanbase
{
@ -234,6 +237,8 @@ void ObTableScanOpInput::reset()
{
tablet_loc_ = nullptr;
key_ranges_.reset();
mbr_filters_.reset();
range_array_pos_.reset();
not_need_extract_query_range_ = false;
}
@ -350,7 +355,8 @@ ObTableScanSpec::ObTableScanSpec(ObIAllocator &alloc, const ObPhyOperatorType ty
tsc_ctdef_(alloc),
pdml_partition_id_(NULL),
agent_vt_meta_(alloc),
flags_(0)
flags_(0),
is_spatial_ddl_(false)
{
}
@ -373,7 +379,8 @@ OB_SERIALIZE_MEMBER((ObTableScanSpec, ObOpSpec),
tsc_ctdef_,
pdml_partition_id_,
agent_vt_meta_,
ddl_output_cids_);
ddl_output_cids_,
is_spatial_ddl_);
DEF_TO_STRING(ObTableScanSpec)
{
@ -397,7 +404,8 @@ DEF_TO_STRING(ObTableScanSpec)
K(tsc_ctdef_),
K(report_col_checksum_),
K_(agent_vt_meta),
K_(ddl_output_cids));
K_(ddl_output_cids),
K_(is_spatial_ddl));
J_OBJ_END();
return pos;
}
@ -568,7 +576,8 @@ ObTableScanOp::ObTableScanOp(ObExecContext &exec_ctx, const ObOpSpec &spec, ObOp
range_buffers_(NULL),
range_buffer_idx_(0),
group_size_(0),
max_group_size_(0)
max_group_size_(0),
spat_index_()
{
}
@ -1011,7 +1020,18 @@ int ObTableScanOp::prepare_single_scan_range(int64_t group_idx)
} else {
if (OB_UNLIKELY(!need_extract_range())) {
// virtual table, do nothing
} else if (OB_FAIL(ObSQLUtils::extract_pre_query_range(
} else if (MY_CTDEF.pre_query_range_.is_contain_geo_filters() &&
OB_FAIL(ObSQLUtils::extract_geo_query_range(
MY_CTDEF.pre_query_range_,
range_allocator,
ctx_,
key_ranges,
MY_INPUT.mbr_filters_,
get_method,
ObBasicSessionInfo::create_dtc_params(ctx_.get_my_session())))) {
LOG_WARN("failed to extract pre query ranges", K(ret));
} else if (!MY_CTDEF.pre_query_range_.is_contain_geo_filters() &&
OB_FAIL(ObSQLUtils::extract_pre_query_range(
MY_CTDEF.pre_query_range_,
range_allocator,
ctx_,
@ -1318,6 +1338,7 @@ int ObTableScanOp::inner_rescan()
output_row_cnt_ = 0;
iter_end_ = false;
MY_INPUT.key_ranges_.reuse();
MY_INPUT.mbr_filters_.reuse();
if (OB_FAIL(ObOperator::inner_rescan())) {
LOG_WARN("rescan operator failed", K(ret));
} else if (OB_FAIL(build_bnlj_params())) {
@ -1360,6 +1381,7 @@ int ObTableScanOp::close_and_reopen()
tsc_rtdef_.lookup_rtdef_->stmt_allocator_.set_alloc(&das_ref_.get_das_alloc());
}
MY_INPUT.key_ranges_.reuse();
MY_INPUT.mbr_filters_.reuse();
}
return ret;
}
@ -1438,6 +1460,7 @@ int ObTableScanOp::local_iter_reuse()
} else {
tsc_rtdef_.scan_rtdef_.scan_allocator_.set_alloc(table_rescan_allocator_);
MY_INPUT.key_ranges_.reuse();
MY_INPUT.mbr_filters_.reuse();
}
return ret;
}
@ -1605,7 +1628,7 @@ int ObTableScanOp::get_next_batch_with_das(int64_t &count, int64_t capacity)
return ret;
}
int ObTableScanOp::inner_get_next_row()
int ObTableScanOp::inner_get_next_row_implement()
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(0 == limit_param_.limit_)) {
@ -1810,7 +1833,9 @@ int ObTableScanOp::cherry_pick_range_by_tablet_id(ObDASScanOp *scan_op)
{
int ret = OB_SUCCESS;
ObIArray<ObNewRange> &scan_ranges = scan_op->get_scan_param().key_ranges_;
ObIArray<ObSpatialMBR> &mbr_filters = scan_op->get_scan_param().mbr_filters_;
const ObIArray<ObNewRange> &input_ranges = MY_INPUT.key_ranges_;
const ObIArray<ObSpatialMBR> &input_filters = MY_INPUT.mbr_filters_;
ObDASGroupScanOp *batch_op = DAS_GROUP_SCAN_OP(scan_op);
bool add_all = false;
bool prune_all = true;
@ -1852,6 +1877,12 @@ int ObTableScanOp::cherry_pick_range_by_tablet_id(ObDASScanOp *scan_op)
}
}
}
for (int64_t i = 0; OB_SUCC(ret) && i < input_filters.count(); ++i) {
if (OB_FAIL(mbr_filters.push_back(input_filters.at(i)))) {
LOG_WARN("store mbr_filters failed", K(ret));
}
}
if (OB_SUCC(ret) && prune_all && !input_ranges.empty()) {
ObNewRange false_range;
false_range.set_false_range();
@ -2024,7 +2055,10 @@ int ObTableScanOp::reassign_task_ranges(ObGranuleTaskInfo &info)
{
int ret = OB_SUCCESS;
if (MY_SPEC.gi_above_ && !iter_end_) {
if (!MY_INPUT.get_need_extract_query_range()) {
if (OB_UNLIKELY(MY_SPEC.get_query_range().is_contain_geo_filters())) {
MY_INPUT.key_ranges_.reuse();
MY_INPUT.mbr_filters_.reuse();
} else if (!MY_INPUT.get_need_extract_query_range()) {
if (OB_FAIL(MY_INPUT.key_ranges_.assign(info.ranges_))) {
LOG_WARN("assign the range info failed", K(ret), K(info));
} else if (MY_SPEC.is_vt_mapping_) {
@ -2035,6 +2069,7 @@ int ObTableScanOp::reassign_task_ranges(ObGranuleTaskInfo &info)
} else {
// use prepare() to set key ranges if px do not extract query range
MY_INPUT.key_ranges_.reuse();
MY_INPUT.mbr_filters_.reuse();
LOG_DEBUG("do prepare!!!");
}
}
@ -2179,7 +2214,7 @@ int ObTableScanOp::init_ddl_column_checksum()
int ObTableScanOp::corrupt_obj(ObObj &obj)
{
int ret = OB_SUCCESS;
int tmp_ret = E(EventTable::EN_BUILD_GLOBAL_INDEX_WITH_CORRUPTED_DATA) OB_SUCCESS;
int tmp_ret = OB_E(EventTable::EN_BUILD_GLOBAL_INDEX_WITH_CORRUPTED_DATA) OB_SUCCESS;
if (OB_SUCCESS != tmp_ret && obj.is_fixed_len_char_type()) {
char *ptr = obj.get_string().ptr();
int32_t len = obj.get_string_len();
@ -2315,7 +2350,7 @@ int ObTableScanOp::report_ddl_column_checksum()
item.checksum_ = i < column_checksum_.count() ? column_checksum_[i] : 0;
#ifdef ERRSIM
if (OB_SUCC(ret)) {
ret = E(EventTable::EN_DATA_CHECKSUM_DDL_TASK) OB_SUCCESS;
ret = OB_E(EventTable::EN_DATA_CHECKSUM_DDL_TASK) OB_SUCCESS;
// set the checksum of the second column inconsistent with the report checksum of hidden table. (report_column_checksum(ObSSTable &sstable))
if (OB_FAIL(ret) && 17 == item.column_id_) {
item.checksum_ = i;
@ -2491,5 +2526,152 @@ int ObTableScanOp::transform_physical_rowid_rowkey(ObIAllocator &allocator,
return ret;
}
int ObTableScanOp::inner_get_next_row()
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(MY_SPEC.is_spatial_ddl())) {
if (OB_FAIL(inner_get_next_spatial_index_row())) {
if (ret != OB_ITER_END) {
LOG_WARN("spatial index ddl : get next spatial index row failed", K(ret));
}
}
} else if (OB_FAIL(inner_get_next_row_implement())) {
if (ret != OB_ITER_END) {
LOG_WARN("get next row failed", K(ret));
}
}
return ret;
}
int ObTableScanOp::inner_get_next_spatial_index_row()
{
int ret = OB_SUCCESS;
if (OB_ISNULL(spat_index_.spat_rows_)) {
if (OB_FAIL(init_spatial_index_rows())) {
LOG_WARN("init spatial row store failed", K(ret));
}
}
if (OB_SUCC(ret)) {
if (spat_index_.spat_row_index_ >= spat_index_.spat_rows_->count()) {
if (OB_FAIL(ObTableScanOp::inner_get_next_row_implement())) {
if (OB_ITER_END != ret) {
LOG_WARN("get next row failed", K(ret), "op", op_name());
}
} else {
spat_index_.spat_rows_->reuse();
spat_index_.spat_row_index_ = 0;
const ObExprPtrIArray &exprs = MY_SPEC.output_;
ObExpr *expr = exprs.at(3);
ObDatum *in_datum = NULL;
if (OB_FAIL(expr->eval(eval_ctx_, in_datum))) {
LOG_WARN("expression evaluate failed", K(ret));
} else {
ObString geo_wkb = in_datum->get_string();
uint32_t srid = UINT32_MAX;
omt::ObSrsCacheGuard srs_guard;
const ObSrsItem *srs_item = NULL;
const ObSrsBoundsItem *srs_bound = NULL;
ObSQLSessionInfo *my_session = GET_MY_SESSION(ctx_);
uint64_t tenant_id = my_session->get_effective_tenant_id();
ObS2Cellids cellids;
ObString mbr_val(0, static_cast<char *>(spat_index_.mbr_buffer_));
if (OB_FAIL(ObGeoTypeUtil::get_srid_from_wkb(geo_wkb, srid))) {
LOG_WARN("failed to get srid", K(ret), K(geo_wkb));
} else if (srid != 0 &&
OB_FAIL(OTSRS_MGR.get_tenant_srs_guard(tenant_id, srs_guard))) {
LOG_WARN("failed to get srs guard", K(ret), K(tenant_id), K(srid));
} else if (srid != 0 &&
OB_FAIL(srs_guard.get_srs_item(srid, srs_item))) {
LOG_WARN("failed to get srs item", K(ret), K(tenant_id), K(srid));
} else if (((srid == 0) || !(srs_item->is_geographical_srs())) &&
OB_FAIL(OTSRS_MGR.get_srs_bounds(srid, srs_item, srs_bound))) {
LOG_WARN("failed to get srs bound", K(ret), K(srid));
} else if (OB_FAIL(ObGeoTypeUtil::get_cellid_mbr_from_geom(geo_wkb, srs_item, srs_bound,
cellids, mbr_val))) {
LOG_WARN("failed to get cellid", K(ret));
} else if (cellids.size() > SAPTIAL_INDEX_DEFAULT_ROW_COUNT) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("cellid over size", K(ret), K(cellids.size()));
} else if (OB_ISNULL(spat_index_.obj_buffer_)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc memory for spatial index row cells", K(ret));
} else {
ObObj *obj_arr = reinterpret_cast<ObObj *>(spat_index_.obj_buffer_);
uint64_t obj_idx = 0;
for (uint64_t i = 0; OB_SUCC(ret) && i < cellids.size(); i++) {
obj_arr[obj_idx].set_nop_value();
obj_arr[obj_idx].set_uint64(cellids.at(i));
obj_arr[obj_idx + 1].set_nop_value();
obj_arr[obj_idx + 1].set_varchar(mbr_val);
obj_arr[obj_idx + 1].set_collation_type(CS_TYPE_BINARY);
obj_arr[obj_idx + 1].set_collation_level(CS_LEVEL_IMPLICIT);
ObNewRow row;
row.cells_ = &obj_arr[obj_idx];
row.count_ = 2;
obj_idx += 2;
if (OB_FAIL(spat_index_.spat_rows_->push_back(row))) {
LOG_WARN("failed to push back spatial index row", K(ret), K(row));
}
}
}
}
}
}
if (OB_SUCC(ret)) {
ObNewRow &row = (*(spat_index_.spat_rows_))[spat_index_.spat_row_index_++];
ObObj &cellid= row.get_cell(0);
ObObj &mbr = row.get_cell(1);
if (OB_FAIL(fill_generated_cellid_mbr(cellid, mbr))) {
LOG_WARN("fill cellid mbr failed", K(ret), K(cellid), K(mbr));
}
}
}
return ret;
}
int ObTableScanOp::init_spatial_index_rows()
{
int ret = OB_SUCCESS;
void *buf = ctx_.get_allocator().alloc(sizeof(ObSpatIndexRow));
void *mbr_buffer = ctx_.get_allocator().alloc(OB_DEFAULT_MBR_SIZE);
void *obj_buf = ctx_.get_allocator().alloc(sizeof(ObObj) * 2 * SAPTIAL_INDEX_DEFAULT_ROW_COUNT);
if (OB_ISNULL(buf) || OB_ISNULL(mbr_buffer) || OB_ISNULL(obj_buf)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate spatial row store failed", K(ret), K(buf), K(mbr_buffer));
} else {
spat_index_.spat_rows_ = new(buf) ObSpatIndexRow();
spat_index_.mbr_buffer_ = mbr_buffer;
spat_index_.obj_buffer_ = obj_buf;
}
return ret;
}
int ObTableScanOp::fill_generated_cellid_mbr(const ObObj &cellid, const ObObj &mbr)
{
int ret = OB_SUCCESS;
const ObExprPtrIArray &exprs = MY_SPEC.output_;
if (exprs.count() < 2) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid exprs count", K(ret), K(exprs.count()));
} else {
for (uint8_t i = 0; i < 2 && OB_SUCC(ret); i++) {
ObObjDatumMapType type = i == 0 ? OBJ_DATUM_8BYTE_DATA : OBJ_DATUM_STRING;
const ObObj &value = i == 0 ? cellid : mbr;
ObExpr *expr = exprs.at(i);
ObDatum *datum = &expr->locate_datum_for_write(get_eval_ctx());
ObEvalInfo *eval_info = &expr->get_eval_info(get_eval_ctx());
if (OB_FAIL(datum->from_obj(value, type))) {
LOG_WARN("fill spatial index row failed", K(ret));
} else {
eval_info->evaluated_ = true;
eval_info->projected_ = true;
}
}
}
return ret;
}
} // end namespace sql
} // end namespace oceanbase