[Improvement] spark load without agg and de/serialization (#6270)

fix #6269 

The outline of our changes is to improve our memory in case of OOM in BE and to speed up the calculation.
1. We do not need to do Aggregation in load, which has already been done in the ETL spark job.
2. Based on 1, we do not need to serialize/deserialize bitmap/HLL objects.
This commit is contained in:
Xiang Wei
2021-08-19 14:15:01 +08:00
committed by GitHub
parent 4ea2fcefbc
commit c65ec3136b
2 changed files with 72 additions and 11 deletions

View File

@ -150,7 +150,7 @@ OLAPStatus PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TP
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "fail to convert tmp file when realtime push. res=" << res
<< ", failed to process realtime push."
<< ", table=" << tablet->full_name()
<< ", tablet=" << tablet->full_name()
<< ", transaction_id=" << request.transaction_id;
for (TabletVars& tablet_var : *tablet_vars) {
if (tablet_var.tablet == nullptr) {
@ -944,6 +944,70 @@ OLAPStatus PushBrokerReader::init(const Schema* schema, const TBrokerScanRange&
return OLAP_SUCCESS;
}
OLAPStatus PushBrokerReader::fill_field_row(RowCursorCell* dst, const char* src, bool src_null, MemPool* mem_pool, FieldType type){
switch (type) {
case OLAP_FIELD_TYPE_DECIMAL: {
dst->set_is_null(src_null);
if (src_null) {
break;
}
auto *decimal_value = reinterpret_cast<const DecimalV2Value *>(src);
auto *storage_decimal_value = reinterpret_cast<decimal12_t *>(dst->mutable_cell_ptr());
storage_decimal_value->integer = decimal_value->int_value();
storage_decimal_value->fraction = decimal_value->frac_value();
break;
}
case OLAP_FIELD_TYPE_DATETIME: {
dst->set_is_null(src_null);
if (src_null) {
break;
}
auto* datetime_value = reinterpret_cast<const DateTimeValue*>(src);
auto* storage_datetime_value = reinterpret_cast<uint64_t*>(dst->mutable_cell_ptr());
*storage_datetime_value = datetime_value->to_olap_datetime();
break;
}
case OLAP_FIELD_TYPE_DATE: {
dst->set_is_null(src_null);
if (src_null) {
break;
}
auto* date_value = reinterpret_cast<const DateTimeValue*>(src);
auto* storage_date_value = reinterpret_cast<uint24_t*>(dst->mutable_cell_ptr());
*storage_date_value = static_cast<int64_t>(date_value->to_olap_date());
break;
}
case OLAP_FIELD_TYPE_BOOL:
case OLAP_FIELD_TYPE_TINYINT:
case OLAP_FIELD_TYPE_SMALLINT:
case OLAP_FIELD_TYPE_INT:
case OLAP_FIELD_TYPE_UNSIGNED_INT:
case OLAP_FIELD_TYPE_BIGINT:
case OLAP_FIELD_TYPE_LARGEINT:
case OLAP_FIELD_TYPE_FLOAT:
case OLAP_FIELD_TYPE_DOUBLE:
case OLAP_FIELD_TYPE_CHAR:
case OLAP_FIELD_TYPE_VARCHAR:
case OLAP_FIELD_TYPE_HLL:
case OLAP_FIELD_TYPE_OBJECT:{
dst->set_is_null(src_null);
if (src_null) {
break;
}
const TypeInfo* type_info = get_type_info(type);
type_info->deep_copy(dst->mutable_cell_ptr(), src, mem_pool);
break;
}
default:
return OLAP_ERR_INVALID_SCHEMA;
}
return OLAP_SUCCESS;
}
OLAPStatus PushBrokerReader::next(ContiguousRow* row) {
if (!_ready || row == nullptr) {
return OLAP_ERR_INPUT_PARAMETER_ERROR;
@ -961,22 +1025,18 @@ OLAPStatus PushBrokerReader::next(ContiguousRow* row) {
}
auto slot_descs = _tuple_desc->slots();
size_t num_key_columns = _schema->num_key_columns();
// finalize row
for (size_t i = 0; i < slot_descs.size(); ++i) {
auto cell = row->cell(i);
const SlotDescriptor* slot = slot_descs[i];
bool is_null = _tuple->is_null(slot->null_indicator_offset());
const void* value = _tuple->get_slot(slot->tuple_offset());
// try execute init method defined in aggregateInfo
// by default it only copies data into cell
_schema->column(i)->consume(&cell, (const char*)value, is_null, _mem_pool.get(),
_runtime_state->obj_pool());
// if column(i) is a value column, try execute finalize method defined in aggregateInfo
// to convert data into final format
if (i >= num_key_columns) {
_schema->column(i)->agg_finalize(&cell, _mem_pool.get());
FieldType type = _schema->column(i)->type();
OLAPStatus field_status = fill_field_row(&cell, (const char*)value, is_null, _mem_pool.get(), type);
if (field_status!= OLAP_SUCCESS) {
LOG(WARNING) << "fill field row failed in spark load, slot index: " << i << ", type: " << type;
return OLAP_ERR_SCHEMA_SCHEMA_FIELD_INVALID;
}
}

View File

@ -200,6 +200,7 @@ public:
MemPool* mem_pool() { return _mem_pool.get(); }
private:
OLAPStatus fill_field_row(RowCursorCell* dst, const char* src,bool src_null, MemPool* mem_pool, FieldType type);
bool _ready;
bool _eof;
TupleDescriptor* _tuple_desc;