Fix reading invalid 8 bytes column in vectorized query.
This commit is contained in:
@ -192,7 +192,7 @@ public:
|
||||
static uint32_t get_reserved_size(const ObObjDatumMapType type);
|
||||
// From ObObj, the caller is responsible for ensuring %ptr_ has enough memory
|
||||
inline int from_obj(const ObObj &obj, const ObObjDatumMapType map_type);
|
||||
inline int from_storage_datum(const ObDatum &datum, const ObObjDatumMapType map_type);
|
||||
inline int from_storage_datum(const ObDatum &datum, const ObObjDatumMapType map_type, bool need_copy = false);
|
||||
// From ObObj, the caller is responsible for ensuring %ptr_ has enough memory
|
||||
inline int from_obj(const ObObj &obj);
|
||||
inline int64_t checksum(const int64_t current) const;
|
||||
@ -725,7 +725,7 @@ inline int ObDatum::from_obj(const ObObj &obj, const ObObjDatumMapType map_type)
|
||||
return ret;
|
||||
}
|
||||
|
||||
inline int ObDatum::from_storage_datum(const ObDatum &datum, const ObObjDatumMapType map_type)
|
||||
inline int ObDatum::from_storage_datum(const ObDatum &datum, const ObObjDatumMapType map_type, bool need_copy)
|
||||
{
|
||||
int ret = common::OB_SUCCESS;
|
||||
if (datum.is_ext()) {
|
||||
@ -733,6 +733,9 @@ inline int ObDatum::from_storage_datum(const ObDatum &datum, const ObObjDatumMap
|
||||
COMMON_LOG(WARN, "Invalid argument for ext storage datum to datum", K(ret), K(datum));
|
||||
} else if (datum.is_null()) {
|
||||
set_null();
|
||||
} else if (need_copy) {
|
||||
memcpy(no_cv(ptr_), datum.ptr_, datum.len_);
|
||||
pack_ = datum.pack_;
|
||||
} else {
|
||||
switch (map_type) {
|
||||
case OBJ_DATUM_NULL: { datum2datum<OBJ_DATUM_NULL>(datum); break; }
|
||||
|
@ -436,13 +436,15 @@ public:
|
||||
return ctx.frames_[frame_idx_] + res_buf_off_;
|
||||
}
|
||||
|
||||
|
||||
// locate expr datum && reset ptr_ to reserved buf
|
||||
OB_INLINE ObDatum &locate_datum_for_write(ObEvalCtx &ctx) const;
|
||||
|
||||
// locate batch datums and reset datum ptr_ to reserved buf
|
||||
inline ObDatum *locate_datums_for_update(ObEvalCtx &ctx, const int64_t size) const;
|
||||
|
||||
// reset ptr in ObDatum to reserved buf
|
||||
OB_INLINE void reset_ptr_in_datum(ObEvalCtx &ctx, const int64_t datum_idx) const;
|
||||
|
||||
OB_INLINE ObDatum &locate_param_datum(ObEvalCtx &ctx, int param_index) const
|
||||
{
|
||||
return args_[param_index]->locate_expr_datum(ctx);
|
||||
@ -972,6 +974,16 @@ inline ObDatum *ObExpr::locate_datums_for_update(ObEvalCtx &ctx,
|
||||
return datums;
|
||||
}
|
||||
|
||||
OB_INLINE void ObExpr::reset_ptr_in_datum(ObEvalCtx &ctx, const int64_t datum_idx) const
|
||||
{
|
||||
OB_ASSERT(datum_idx >= 0);
|
||||
char *frame = ctx.frames_[frame_idx_];
|
||||
OB_ASSERT(NULL != frame);
|
||||
ObDatum *expr_datum = reinterpret_cast<ObDatum *>(frame + datum_off_) + datum_idx;
|
||||
char *data_pos = frame + res_buf_off_ + res_buf_len_ * datum_idx;
|
||||
expr_datum->ptr_ = data_pos;
|
||||
}
|
||||
|
||||
template <typename ...TS>
|
||||
OB_INLINE int ObExpr::eval_param_value(ObEvalCtx &ctx, TS &...args) const
|
||||
{
|
||||
|
@ -209,7 +209,7 @@ int ObVectorStore::fill_rows(
|
||||
} else {
|
||||
blocksstable::ObMicroBlockReader *block_reader = static_cast<blocksstable::ObMicroBlockReader*>(reader);
|
||||
if (OB_FAIL(block_reader->get_rows(cols_projector_, col_params_, map_types_, default_row_,
|
||||
row_ids_, row_capacity, row_buf_, datums_))) {
|
||||
row_ids_, row_capacity, row_buf_, datums_, exprs_, eval_ctx_))) {
|
||||
LOG_WARN("fail to copy rows", K(ret), K(cols_projector_), K(row_capacity),
|
||||
"row_ids", common::ObArrayWrap<const int64_t>(row_ids_, row_capacity));
|
||||
}
|
||||
|
@ -309,6 +309,7 @@ struct ObStorageDatum : public common::ObDatum
|
||||
OB_INLINE int64_t get_deep_copy_size() const;
|
||||
OB_INLINE ObStorageDatum& operator=(const ObStorageDatum &other);
|
||||
OB_INLINE int64_t storage_to_string(char *buf, int64_t buf_len) const;
|
||||
OB_INLINE bool need_copy_for_encoding_column_with_flat_format(const ObObjDatumMapType map_type) const;
|
||||
//only for unittest
|
||||
OB_INLINE bool operator==(const ObStorageDatum &other) const;
|
||||
OB_INLINE bool operator==(const ObObj &other) const;
|
||||
@ -671,6 +672,11 @@ OB_INLINE int64_t ObStorageDatum::storage_to_string(char *buf, int64_t buf_len)
|
||||
return pos;
|
||||
}
|
||||
|
||||
OB_INLINE bool ObStorageDatum::need_copy_for_encoding_column_with_flat_format(const ObObjDatumMapType map_type) const
|
||||
{
|
||||
return OBJ_DATUM_STRING == map_type && sizeof(uint64_t) == len_ && is_local_buf();
|
||||
}
|
||||
|
||||
struct ObGhostRowUtil {
|
||||
public:
|
||||
ObGhostRowUtil() = delete;
|
||||
|
@ -613,7 +613,9 @@ int ObMicroBlockReader::get_rows(
|
||||
const int64_t *row_ids,
|
||||
const int64_t row_cap,
|
||||
ObDatumRow &row_buf,
|
||||
common::ObIArray<ObDatum *> &datums)
|
||||
common::ObIArray<ObDatum *> &datums,
|
||||
sql::ExprFixedArray &exprs,
|
||||
sql::ObEvalCtx &eval_ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t row_idx = common::OB_INVALID_INDEX;
|
||||
@ -642,6 +644,7 @@ int ObMicroBlockReader::get_rows(
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < cols_projector.count(); ++i) {
|
||||
common::ObDatum &datum = datums.at(i)[idx];
|
||||
int32_t col_idx = cols_projector.at(i);
|
||||
bool need_copy = false;
|
||||
if (col_idx >= read_info_->get_request_count()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("Unexpected col idx", K(ret), K(i), K(col_idx), K(read_info_->get_request_count()));
|
||||
@ -653,9 +656,13 @@ int ObMicroBlockReader::get_rows(
|
||||
LOG_WARN("Fail to transfer datum", K(ret), K(i), K(idx), K(row_idx), K(default_row));
|
||||
}
|
||||
LOG_TRACE("Transfer nop value", K(ret), K(idx), K(row_idx), K(col_idx), K(default_row));
|
||||
} else if (OB_FAIL(datum.from_storage_datum(row_buf.storage_datums_[col_idx], map_types.at(i)))) {
|
||||
LOG_WARN("Failed to from storage datum", K(ret), K(idx), K(row_idx), K(col_idx),
|
||||
K(row_buf.storage_datums_[col_idx]), KPC_(header));
|
||||
} else if (row_buf.storage_datums_[col_idx].need_copy_for_encoding_column_with_flat_format(map_types.at(i))) {
|
||||
exprs[i]->reset_ptr_in_datum(eval_ctx, idx);
|
||||
need_copy = true;
|
||||
}
|
||||
if (OB_SUCC(ret) && OB_FAIL(datum.from_storage_datum(row_buf.storage_datums_[col_idx], map_types.at(i), need_copy))) {
|
||||
LOG_WARN("Failed to from storage datum", K(ret), K(idx), K(row_idx), K(col_idx), K(need_copy),
|
||||
K(row_buf.storage_datums_[col_idx]), KPC_(header));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -93,7 +93,9 @@ public:
|
||||
const int64_t *row_ids,
|
||||
const int64_t row_cap,
|
||||
ObDatumRow &row_buf,
|
||||
common::ObIArray<ObDatum *> &datums);
|
||||
common::ObIArray<ObDatum *> &datums,
|
||||
sql::ExprFixedArray &exprs,
|
||||
sql::ObEvalCtx &eval_ctx);
|
||||
virtual int get_row_count(
|
||||
int32_t col,
|
||||
const int64_t *row_ids,
|
||||
|
Reference in New Issue
Block a user