[FEAT MERGE] materialized view
Co-authored-by: suz-yang <suz.yang@foxmail.com> Co-authored-by: leftgeek <1094669802@qq.com> Co-authored-by: chimyue <chimyue@gmail.com>
This commit is contained in:
@ -49,7 +49,21 @@ int ObDASIndexDMLAdaptor<DAS_OP_TABLE_DELETE, ObDASDMLIterator>::write_rows(cons
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObAccessService *as = MTL(ObAccessService *);
|
||||
if (OB_FAIL(as->delete_rows(ls_id,
|
||||
if (ctdef.table_param_.get_data_table().is_mlog_table()
|
||||
&& !ctdef.is_access_mlog_as_master_table_) {
|
||||
ObDASMLogDMLIterator mlog_iter(tablet_id, dml_param_, &iter, DAS_OP_TABLE_DELETE);
|
||||
if (OB_FAIL(as->insert_rows(ls_id,
|
||||
tablet_id,
|
||||
*tx_desc_,
|
||||
dml_param_,
|
||||
ctdef.column_ids_,
|
||||
&mlog_iter,
|
||||
affected_rows))) {
|
||||
if (OB_TRY_LOCK_ROW_CONFLICT != ret) {
|
||||
LOG_WARN("insert rows to access service failed", K(ret));
|
||||
}
|
||||
}
|
||||
} else if (OB_FAIL(as->delete_rows(ls_id,
|
||||
tablet_id,
|
||||
*tx_desc_,
|
||||
dml_param_,
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
#include "sql/das/ob_das_utils.h"
|
||||
#include "sql/engine/dml/ob_dml_service.h"
|
||||
#include "sql/engine/expr/ob_expr_lob_utils.h"
|
||||
#include "storage/access/ob_dml_param.h"
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace sql
|
||||
@ -278,6 +279,31 @@ int ObDASDMLIterator::get_next_rows(ObNewRow *&rows, int64_t &row_count)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDASMLogDMLIterator::get_next_row(ObNewRow *&row)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_ISNULL(row_iter_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("dml iterator cannot be null", KR(ret), K_(row_iter));
|
||||
} else if (OB_FAIL(row_iter_->get_next_row(row))) {
|
||||
LOG_WARN("failed to get next row from dml iterator", KR(ret));
|
||||
} else if (OB_ISNULL(row)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("row cannot be null", KR(ret), KP(row));
|
||||
} else {
|
||||
if (OB_FAIL(ObDASUtils::generate_mlog_row(tablet_id_,
|
||||
dml_param_,
|
||||
*row,
|
||||
op_type_,
|
||||
is_old_row_))) {
|
||||
LOG_WARN("failed to generate mlog rows", KR(ret));
|
||||
} else if (DAS_OP_TABLE_UPDATE == op_type_) {
|
||||
is_old_row_ = !is_old_row_;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDASWriteBuffer::DmlShadowRow::init(ObIAllocator &allocator,
|
||||
int64_t datum_cnt,
|
||||
bool strip_lob_locator)
|
||||
|
||||
@ -23,6 +23,10 @@
|
||||
#include "sql/resolver/dml/ob_hint.h"
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace storage
|
||||
{
|
||||
class ObDMLBaseParam;
|
||||
}
|
||||
namespace sql
|
||||
{
|
||||
typedef common::ObFixedArray<common::ObObjMeta, common::ObIAllocator> ObjMetaFixedArray;
|
||||
@ -509,6 +513,38 @@ private:
|
||||
uint32_t spatial_row_idx_;
|
||||
int64_t batch_size_;
|
||||
};
|
||||
|
||||
class ObDASMLogDMLIterator : public ObNewRowIterator
|
||||
{
|
||||
public:
|
||||
ObDASMLogDMLIterator(
|
||||
const ObTabletID &tablet_id,
|
||||
const storage::ObDMLBaseParam &dml_param,
|
||||
ObNewRowIterator *iter,
|
||||
ObDASOpType op_type)
|
||||
: tablet_id_(tablet_id),
|
||||
dml_param_(dml_param),
|
||||
row_iter_(iter),
|
||||
op_type_(op_type),
|
||||
is_old_row_(false)
|
||||
{
|
||||
if ((DAS_OP_TABLE_UPDATE == op_type_)
|
||||
|| (DAS_OP_TABLE_INSERT == op_type_)) {
|
||||
is_old_row_ = true;
|
||||
}
|
||||
}
|
||||
virtual ~ObDASMLogDMLIterator() {}
|
||||
virtual int get_next_row(ObNewRow *&row) override;
|
||||
virtual int get_next_row() override { return OB_NOT_IMPLEMENT; }
|
||||
virtual void reset() override {}
|
||||
|
||||
private:
|
||||
const ObTabletID &tablet_id_;
|
||||
const storage::ObDMLBaseParam &dml_param_;
|
||||
ObNewRowIterator *row_iter_;
|
||||
ObDASOpType op_type_;
|
||||
bool is_old_row_;
|
||||
};
|
||||
} // namespace sql
|
||||
} // namespace oceanbase
|
||||
#endif /* DEV_SRC_SQL_DAS_OB_DAS_DML_CTX_DEFINE_H_ */
|
||||
|
||||
@ -53,7 +53,22 @@ int ObDASIndexDMLAdaptor<DAS_OP_TABLE_INSERT, ObDASDMLIterator>::write_rows(cons
|
||||
int ret = OB_SUCCESS;
|
||||
ObAccessService *as = MTL(ObAccessService *);
|
||||
dml_param_.direct_insert_task_id_ = rtdef.direct_insert_task_id_;
|
||||
if (rtdef.use_put_) {
|
||||
|
||||
if (ctdef.table_param_.get_data_table().is_mlog_table()
|
||||
&& !ctdef.is_access_mlog_as_master_table_) {
|
||||
ObDASMLogDMLIterator mlog_iter(tablet_id, dml_param_, &iter, DAS_OP_TABLE_INSERT);
|
||||
if (OB_FAIL(as->insert_rows(ls_id,
|
||||
tablet_id,
|
||||
*tx_desc_,
|
||||
dml_param_,
|
||||
ctdef.column_ids_,
|
||||
&mlog_iter,
|
||||
affected_rows))) {
|
||||
if (OB_TRY_LOCK_ROW_CONFLICT != ret) {
|
||||
LOG_WARN("insert rows to access service failed", K(ret));
|
||||
}
|
||||
}
|
||||
} else if (rtdef.use_put_) {
|
||||
ret = as->put_rows(ls_id,
|
||||
tablet_id,
|
||||
*tx_desc_,
|
||||
|
||||
@ -269,6 +269,20 @@ int ObDASIndexDMLAdaptor<DAS_OP_TABLE_UPDATE, ObDASUpdIterator>::write_rows(cons
|
||||
LOG_WARN("insert rows to access service failed", K(ret));
|
||||
}
|
||||
}
|
||||
} else if (ctdef.table_param_.get_data_table().is_mlog_table()
|
||||
&& !ctdef.is_access_mlog_as_master_table_) {
|
||||
ObDASMLogDMLIterator mlog_iter(tablet_id, dml_param_, &iter, DAS_OP_TABLE_UPDATE);
|
||||
if (OB_FAIL(as->insert_rows(ls_id,
|
||||
tablet_id,
|
||||
*tx_desc_,
|
||||
dml_param_,
|
||||
ctdef.column_ids_,
|
||||
&mlog_iter,
|
||||
affected_rows))) {
|
||||
if (OB_TRY_LOCK_ROW_CONFLICT != ret) {
|
||||
LOG_WARN("delete rows to access service failed", K(ret));
|
||||
}
|
||||
}
|
||||
} else if (OB_FAIL(as->update_rows(ls_id,
|
||||
tablet_id,
|
||||
*tx_desc_,
|
||||
|
||||
@ -23,6 +23,8 @@
|
||||
#include "observer/omt/ob_tenant_srs.h"
|
||||
#include "lib/geo/ob_s2adapter.h"
|
||||
#include "lib/geo/ob_geo_utils.h"
|
||||
#include "share/ob_tablet_autoincrement_service.h"
|
||||
#include "storage/access/ob_dml_param.h"
|
||||
namespace oceanbase
|
||||
{
|
||||
using namespace common;
|
||||
@ -407,5 +409,62 @@ int ObDASUtils::wait_das_retry(int64_t retry_cnt)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDASUtils::generate_mlog_row(const ObTabletID &tablet_id,
|
||||
const storage::ObDMLBaseParam &dml_param,
|
||||
ObNewRow &row,
|
||||
ObDASOpType op_type,
|
||||
bool is_old_row)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const uint64_t tenant_id = MTL_ID();
|
||||
uint64_t autoinc_seq = 0;
|
||||
ObTabletAutoincrementService &auto_inc = ObTabletAutoincrementService::get_instance();
|
||||
if (OB_ISNULL(dml_param.table_param_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("table param is null", KR(ret));
|
||||
} else if (!dml_param.table_param_->get_data_table().is_mlog_table()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("data table is not materialized view log",
|
||||
KR(ret), K(dml_param.table_param_->get_data_table()));
|
||||
} else if (row.count_ < 4) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("each mlog row should at least contain 4 columns", KR(ret), K(row.count_));
|
||||
} else if (OB_FAIL(auto_inc.get_autoinc_seq(tenant_id, tablet_id, autoinc_seq))) {
|
||||
LOG_WARN("get_autoinc_seq fail", K(ret), K(tenant_id), K(tablet_id));
|
||||
} else {
|
||||
// sequence_col is the first primary key
|
||||
int sequence_col = 0;
|
||||
int dmltype_col = row.count_ - 2;
|
||||
int old_new_col = row.count_ - 1;
|
||||
const ObTableDMLParam::ObColDescArray &col_descs = dml_param.table_param_->get_col_descs();
|
||||
bool is_heap_base_table = (OB_MLOG_ROWID_COLUMN_ID == col_descs.at(row.count_ - 1).col_id_);
|
||||
// if the base table is heap table, then the last column is mlog_rowid,
|
||||
// therefore, row = | sequence_col | partition key cols | ... | dmltype_col | old_new_col | rowid_col |
|
||||
// otherwise, row = | sequence_col | partition key cols | ... | dmltype_col | old_new_col |
|
||||
if (is_heap_base_table) {
|
||||
dmltype_col = dmltype_col - 1;
|
||||
old_new_col = old_new_col - 1;
|
||||
}
|
||||
|
||||
row.cells_[sequence_col].set_int(ObObjType::ObIntType, static_cast<int64_t>(autoinc_seq));
|
||||
if (sql::DAS_OP_TABLE_DELETE == op_type) {
|
||||
row.cells_[dmltype_col].set_varchar("D");
|
||||
row.cells_[old_new_col].set_varchar("O");
|
||||
} else if (sql::DAS_OP_TABLE_UPDATE == op_type) {
|
||||
row.cells_[dmltype_col].set_varchar("U");
|
||||
if (is_old_row) {
|
||||
row.cells_[old_new_col].set_varchar("O");
|
||||
} else {
|
||||
row.cells_[old_new_col].set_varchar("N");
|
||||
}
|
||||
} else {
|
||||
row.cells_[dmltype_col].set_varchar("I");
|
||||
row.cells_[old_new_col].set_varchar("N");
|
||||
}
|
||||
row.cells_[dmltype_col].set_collation_type(ObCollationType::CS_TYPE_UTF8MB4_GENERAL_CI);
|
||||
row.cells_[old_new_col].set_collation_type(ObCollationType::CS_TYPE_UTF8MB4_GENERAL_CI);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
} // namespace sql
|
||||
} // namespace oceanbase
|
||||
|
||||
@ -64,6 +64,11 @@ public:
|
||||
const ObDASWriteBuffer::DmlRow &dml_row,
|
||||
ObSpatIndexRow &spat_rows);
|
||||
static int wait_das_retry(int64_t retry_cnt);
|
||||
static int generate_mlog_row(const common::ObTabletID &tablet_id,
|
||||
const storage::ObDMLBaseParam &dml_param,
|
||||
common::ObNewRow &row,
|
||||
ObDASOpType op_type,
|
||||
bool is_old_row);
|
||||
};
|
||||
} // namespace sql
|
||||
} // namespace oceanbase
|
||||
|
||||
Reference in New Issue
Block a user