[feature](vec)(load) Support vtablet sink to enable insert into by using vec query engine (#7957)

Support vtablet sink to enable insert into query in vec query engine
This commit is contained in:
HappenLee
2022-02-08 11:04:09 +08:00
committed by GitHub
parent 505acae931
commit ef233701b3
9 changed files with 115 additions and 102 deletions

View File

@ -145,8 +145,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
Status status;
DCHECK(thrift_sink.__isset.olap_table_sink);
if (is_vec) {
// sink->reset(new stream_load::VOlapTableSink(pool, row_desc, output_exprs, &status));
return Status::NotSupported("VOlapTableSink is not supported yet");
sink->reset(new stream_load::VOlapTableSink(pool, row_desc, output_exprs, &status));
} else {
sink->reset(new stream_load::OlapTableSink(pool, row_desc, output_exprs, &status));
}

View File

@ -530,15 +530,15 @@ bool VOlapTablePartitionParam::find_tablet(BlockRow* block_row, const VOlapTable
Status VOlapTablePartitionParam::_create_partition_keys(const std::vector<TExprNode>& t_exprs,
BlockRow* part_key) {
for (int i = 0; i < t_exprs.size(); i++) {
RETURN_IF_ERROR(_create_partition_key(t_exprs[i], part_key->first,
RETURN_IF_ERROR(_create_partition_key(t_exprs[i], part_key,
_partition_slot_locs[i]));
}
return Status::OK();
}
Status VOlapTablePartitionParam::_create_partition_key(const TExprNode& t_expr, vectorized::Block* block,
Status VOlapTablePartitionParam::_create_partition_key(const TExprNode& t_expr, BlockRow* part_key,
uint16_t pos) {
auto column = std::move(*block->get_by_position(pos).column).mutate();
auto column = std::move(*part_key->first->get_by_position(pos).column).mutate();
switch (t_expr.node_type) {
case TExprNodeType::DATE_LITERAL: {
vectorized::VecDateTimeValue dt;
@ -586,18 +586,6 @@ Status VOlapTablePartitionParam::_create_partition_key(const TExprNode& t_expr,
} case TExprNodeType::STRING_LITERAL: {
int len = t_expr.string_literal.value.size();
const char* str_val = t_expr.string_literal.value.c_str();
// CHAR is a fixed-length string and needs to use the length in the slot definition,
// VARVHAR is a variable-length string and needs to use the length of the string itself
// padding 0 to CHAR field
// if (TYPE_CHAR == slot_desc->type().type && len < slot_desc->type().len) {
// auto new_ptr = (char*)_mem_pool->allocate(slot_desc->type().len);
// memset(new_ptr, 0, slot_desc->type().len);
// memcpy(new_ptr, str_val, len);
//
// str_val = new_ptr;
// len = slot_desc->type().len;
// }
column->insert_data(str_val, len);
break;
} case TExprNodeType::BOOL_LITERAL: {
@ -609,6 +597,7 @@ Status VOlapTablePartitionParam::_create_partition_key(const TExprNode& t_expr,
return Status::InternalError(ss.str());
}
}
part_key->second = column->size() - 1;
return Status::OK();
}

View File

@ -273,7 +273,7 @@ public:
private:
Status _create_partition_keys(const std::vector<TExprNode>& t_exprs, BlockRow* part_key);
Status _create_partition_key(const TExprNode& t_expr, vectorized::Block* block, uint16_t pos);
Status _create_partition_key(const TExprNode& t_expr, BlockRow* part_key, uint16_t pos);
uint32_t _compute_dist_hash(BlockRow* key) const;

View File

@ -56,6 +56,9 @@ static const uint16_t OLAP_VARCHAR_MAX_LENGTH = 65535;
// the max length supported for string type 2GB
static const uint32_t OLAP_STRING_MAX_LENGTH = 2147483647;
// the max length supported for vec string type 1MB
static constexpr auto MAX_SIZE_OF_VEC_STRING = 1024l * 1024;
// the max length supported for array
static const uint16_t OLAP_ARRAY_MAX_LENGTH = 65535;

View File

@ -96,8 +96,6 @@ Status RowBlockV2::convert_to_row_block(RowCursor* helper, RowBlock* dst) {
}
Status RowBlockV2::_copy_data_to_column(int cid, doris::vectorized::MutableColumnPtr& origin_column) {
constexpr auto MAX_SIZE_OF_VEC_STRING = 1024l * 1024;
auto* column = origin_column.get();
bool nullable_mark_array[_selected_size];

View File

@ -296,8 +296,14 @@ Status PlanFragmentExecutor::open_vectorized_internal() {
if (_collect_query_statistics_with_every_batch) {
_collect_query_statistics();
}
RETURN_IF_ERROR(_sink->send(runtime_state(), block));
auto st =_sink->send(runtime_state(), block);
if (st.is_end_of_file()) {
break;
}
RETURN_IF_ERROR(st);
}
{
SCOPED_TIMER(profile()->total_time_counter());
_collect_query_statistics();
@ -318,6 +324,7 @@ Status PlanFragmentExecutor::open_vectorized_internal() {
return Status::OK();
}
Status PlanFragmentExecutor::get_vectorized_internal(::doris::vectorized::Block** block) {
if (_done) {
*block = nullptr;

View File

@ -777,10 +777,9 @@ doris::Tuple* Block::deep_copy_tuple(const doris::TupleDescriptor& desc, MemPool
for (int i = 0; i < desc.slots().size(); ++i) {
auto slot_desc = desc.slots()[i];
auto column_ptr = get_by_position(column_offset + i).column;
auto data_ref = column_ptr->get_data_at(row);
auto data_ref = get_by_position(column_offset + i).column->get_data_at(row);
if (data_ref.size == 0) {
if (data_ref.data == nullptr) {
dst->set_null(slot_desc->null_indicator_offset());
continue;
} else {

View File

@ -55,16 +55,19 @@ Status VOlapTableSink::open(RuntimeState* state) {
Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block) {
Status status = Status::OK();
if (UNLIKELY(input_block->rows() == 0)) { return status; }
auto rows = input_block->rows();
auto bytes = input_block->bytes();
if (UNLIKELY(rows == 0)) { return status; }
SCOPED_TIMER(_profile->total_time_counter());
_number_input_rows += input_block->rows();
_number_input_rows += rows;
// update incrementally so that FE can get the progress.
// the real 'num_rows_load_total' will be set when sink being closed.
state->update_num_rows_load_total(input_block->rows());
state->update_num_bytes_load_total(input_block->bytes());
DorisMetrics::instance()->load_rows->increment(input_block->rows());
DorisMetrics::instance()->load_bytes->increment(input_block->bytes());
state->update_num_rows_load_total(rows);
state->update_num_bytes_load_total(bytes);
DorisMetrics::instance()->load_rows->increment(rows);
DorisMetrics::instance()->load_bytes->increment(bytes);
vectorized::Block block(input_block->get_columns_with_type_and_name());
if (!_output_vexpr_ctxs.empty()) {
@ -75,21 +78,26 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block)
}
auto num_rows = block.rows();
int num_invalid_rows = 0;
int filtered_rows = 0;
{
SCOPED_RAW_TIMER(&_validate_data_ns);
_filter_vec.resize(num_rows);
num_invalid_rows = _validate_data(state, &block, reinterpret_cast<bool*>(_filter_vec.data()));
_number_filtered_rows += num_invalid_rows;
_filter_bitmap.Reset(block.rows());
bool stop_processing = false;
RETURN_IF_ERROR(_validate_data(state, &block, &_filter_bitmap, &filtered_rows, &stop_processing));
_number_filtered_rows += filtered_rows;
if (stop_processing) {
// should be returned after updating "_number_filtered_rows", to make sure that load job can be cancelled
// because of "data unqualified"
return Status::EndOfFile("Encountered unqualified data, stop processing");
}
}
BlockRow block_row;
SCOPED_RAW_TIMER(&_send_data_ns);
// TODO(cmy): vtablet_sink does not implement this "stop_processing" logic.
// This is just for passing compilation.
bool stop_processing = false;
for (int i = 0; i < num_rows; ++i) {
if (num_invalid_rows > 0 && _filter_vec[i] != 0) {
if (filtered_rows > 0 && _filter_bitmap.Get(i)) {
continue;
}
const VOlapTablePartition* partition = nullptr;
@ -103,6 +111,9 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block)
return buf.data();
}, &stop_processing));
_number_filtered_rows++;
if (stop_processing) {
return Status::EndOfFile("Encountered unqualified data, stop processing");
}
continue;
}
_partition_ids.emplace(partition->id);
@ -113,6 +124,11 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block)
_number_output_rows++;
}
}
// check intolerable failure
for (auto index_channel : _channels) {
RETURN_IF_ERROR(index_channel->check_intolerable_failure());
}
return Status::OK();
}
@ -122,14 +138,15 @@ Status VOlapTableSink::close(RuntimeState* state, Status exec_status) {
return OlapTableSink::close(state, exec_status);
}
int VOlapTableSink::_validate_data(doris::RuntimeState* state, doris::vectorized::Block* block,
bool* filter_map) {
// TODO(cmy): implement it
return 0;
#if 0
Status VOlapTableSink::_validate_data(RuntimeState* state, vectorized::Block* block, Bitmap* filter_bitmap, int* filtered_rows,
bool* stop_processing) {
const auto num_rows = block->rows();
// set all row is valid
memset(filter_map, 0, num_rows * sizeof(bool));
fmt::memory_buffer error_msg;
auto set_invalid_and_append_error_msg = [&](int row) {
filter_bitmap->Set(row, true);
return state->append_error_msg_to_file([]() -> std::string { return ""; },
[&error_msg]() -> std::string { return error_msg.data(); }, stop_processing);
};
for (int i = 0; i < _output_tuple_desc->slots().size(); ++i) {
SlotDescriptor* desc = _output_tuple_desc->slots()[i];
@ -139,12 +156,13 @@ int VOlapTableSink::_validate_data(doris::RuntimeState* state, doris::vectorized
if (desc->is_nullable() && desc->type() == TYPE_OBJECT) {
const auto& null_map = vectorized::check_and_get_column<vectorized::ColumnNullable>(*column)
->get_null_map_data();
fmt::format_to(error_msg, "null is not allowed for bitmap column, column_name: {}; ",
desc->col_name());
for (int j = 0; j < num_rows; ++j) {
if (!filter_map[j]) {
if (!filter_bitmap->Get(j)) {
if (null_map[j]) {
state->append_error_msg_to_file("", std::string("null is not allowed for "
"bitmap column, column_name: ") + desc->col_name() + ";");
filter_map[j] = true;
RETURN_IF_ERROR(set_invalid_and_append_error_msg(j));
}
}
}
@ -154,67 +172,68 @@ int VOlapTableSink::_validate_data(doris::RuntimeState* state, doris::vectorized
switch (desc->type().type) {
case TYPE_CHAR:
case TYPE_VARCHAR: {
case TYPE_VARCHAR:
case TYPE_STRING: {
const auto column_string = assert_cast<const vectorized::ColumnString *>(real_column_ptr.get());
for (int j = 0; j < num_rows; ++j) {
if (!filter_map[j]) {
if (!filter_bitmap->Get(j)) {
auto str_val = column_string->get_data_at(j);
bool invalid = str_val.size > std::min(desc->type().len, (int)MAX_SIZE_OF_VEC_STRING);
error_msg.clear();
if (str_val.size > desc->type().len) {
state->append_error_msg_to_file("", fmt::format(
"the length of input is too long than schema. "
"column_name: {}; input_str: [{}] schema length: {}; actual length: {}; ",
desc->col_name(), str_val.to_string(),
desc->type().len, str_val.size));
filter_map[j] = true;
fmt::format_to(error_msg, "{}", "the length of input is too long than schema. ");
fmt::format_to(error_msg, "column_name: {}; ", desc->col_name());
fmt::format_to(error_msg, "input str: [{}] ", str_val.to_string());
fmt::format_to(error_msg, "schema length: {}; ", desc->type().len);
fmt::format_to(error_msg, "actual length: {}; ", str_val.size);
} else if (str_val.size > MAX_SIZE_OF_VEC_STRING) {
fmt::format_to(error_msg, "{}", "the length of input string is too long than vec schema. ");
fmt::format_to(error_msg, "column_name: {}; ", desc->col_name());
fmt::format_to(error_msg, "input str: [{}] ", str_val.to_string());
fmt::format_to(error_msg, "schema length: {}; ", MAX_SIZE_OF_VEC_STRING);
fmt::format_to(error_msg, "actual length: {}; ", str_val.size);
}
if (invalid) {
RETURN_IF_ERROR(set_invalid_and_append_error_msg(j));
}
}
}
break;
}
// TODO: Support TYPE_STRING in the future
// case TYPE_STRING: {
// StringValue* str_val = (StringValue*)slot;
// if (str_val->len > desc->type().MAX_STRING_LENGTH) {
// ss << "the length of input is too long than schema. "
// << "column_name: " << desc->col_name() << "; "
// << "first 128 bytes of input_str: [" << std::string(str_val->ptr, 128)
// << "] "
// << "schema length: " << desc->type().MAX_STRING_LENGTH << "; "
// << "actual length: " << str_val->len << "; ";
// row_valid = false;
// continue;
// }
// break;
// }
case TYPE_DECIMALV2: {
auto column_decimal = const_cast<vectorized::ColumnDecimal
<vectorized::Decimal128> *>(assert_cast<const vectorized::ColumnDecimal
<vectorized::Decimal128> *>(real_column_ptr.get()));
for (int j = 0; j < num_rows; ++j) {
if (!filter_map[j]) {
if (!filter_bitmap->Get(j)) {
auto dec_val = binary_cast<vectorized::Int128, DecimalV2Value>(
column_decimal->get_data()[j]);
error_msg.clear();
bool invalid = false;
if (dec_val.greater_than_scale(desc->type().scale)) {
auto code = dec_val.round(&dec_val, desc->type().scale, HALF_UP);
column_decimal->get_data()[j] = binary_cast<DecimalV2Value, vectorized::Int128>(
dec_val);
if (code != E_DEC_OK) {
state->append_error_msg_to_file("", "round one decimal failed.value=" +
dec_val.to_string());
filter_map[j] = true;
fmt::format_to(error_msg, "round one decimal failed.value={}; ", dec_val.to_string());
invalid = true;
}
}
if (dec_val > _max_decimalv2_val[i] || dec_val < _min_decimalv2_val[i]) {
state->append_error_msg_to_file("", fmt::format(
"decimal value is not valid for definition, column={}, "
"value={}, precision={}, scale= {};",
desc->col_name(), dec_val.to_string(), desc->type().precision,
desc->type().scale));
filter_map[j] = true;
fmt::format_to(error_msg, "decimal value is not valid for definition, column={}", desc->col_name());
fmt::format_to(error_msg, ", value={}", dec_val.to_string());
fmt::format_to(error_msg, ", precision={}, scale={}; ", desc->type().precision, desc->type().scale);
invalid = true;
}
if (invalid) {
RETURN_IF_ERROR(set_invalid_and_append_error_msg(j));
}
}
}
@ -224,13 +243,17 @@ int VOlapTableSink::_validate_data(doris::RuntimeState* state, doris::vectorized
auto column_string = assert_cast<const vectorized::ColumnString *>(real_column_ptr.get());
for (int j = 0; j < num_rows; ++j) {
if (!filter_map[j]) {
if (!filter_bitmap->Get(j)) {
auto str_val = column_string->get_data_at(j);
if (!HyperLogLog::is_valid(Slice(str_val.data, str_val.size))) {
state->append_error_msg_to_file("", std::string(
"Content of HLL type column is invalid column_name: " + desc->col_name() +
";"));
filter_map[j] = true;
bool invalid = false;
error_msg.clear();
if(!HyperLogLog::is_valid(Slice(str_val.data, str_val.size))) {
fmt::format_to(error_msg, "Content of HLL type column is invalid. column name: {}; ", desc->col_name());
invalid = true;
}
if (invalid) {
RETURN_IF_ERROR(set_invalid_and_append_error_msg(j));
}
}
}
@ -250,15 +273,10 @@ int VOlapTableSink::_validate_data(doris::RuntimeState* state, doris::vectorized
if (!desc->is_nullable() && column_ptr) {
const auto& null_map = column_ptr->get_null_map_data();
for (int j = 0; j < null_map.size(); ++j) {
if (null_map[j] && !filter_map[j]) {
filter_map[j] = true;
std::stringstream ss;
ss << "null value for not null column, column=" << desc->col_name();
#if BE_TEST
LOG(INFO) << ss.str();
#else
state->append_error_msg_to_file("", ss.str());
#endif
fmt::format_to(error_msg, "null value for not null column, column={}; ",
desc->col_name());
if (null_map[j] && !filter_bitmap->Get(j)) {
RETURN_IF_ERROR(set_invalid_and_append_error_msg(j));
}
}
block->get_by_position(i).column = column_ptr->get_nested_column_ptr();
@ -268,12 +286,11 @@ int VOlapTableSink::_validate_data(doris::RuntimeState* state, doris::vectorized
}
}
auto filter_row = 0;
*filtered_rows = 0;
for (int i = 0; i < num_rows; ++i) {
filter_row += filter_map[i];
*filtered_rows += filter_bitmap->Get(i);
}
return filter_row;
#endif
return Status::OK();
}
} // namespace stream_load

View File

@ -54,11 +54,12 @@ private:
// make input data valid for OLAP table
// return number of invalid/filtered rows.
// invalid row number is set in Bitmap
int _validate_data(RuntimeState* state, vectorized::Block* block, bool* filter_map);
// set stop_processing is we want to stop the whole process now.
Status _validate_data(RuntimeState* state, vectorized::Block* block, Bitmap* filter_bitmap, int* filtered_rows,
bool* stop_processing);
VOlapTablePartitionParam* _vpartition = nullptr;
std::vector<vectorized::VExprContext*> _output_vexpr_ctxs;
std::vector<uint8_t> _filter_vec;
};
} // namespace stream_load