diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp index b6a1ad3cd3..0b9d3ac1b4 100644 --- a/be/src/olap/push_handler.cpp +++ b/be/src/olap/push_handler.cpp @@ -150,7 +150,7 @@ OLAPStatus PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TP if (res != OLAP_SUCCESS) { LOG(WARNING) << "fail to convert tmp file when realtime push. res=" << res << ", failed to process realtime push." - << ", table=" << tablet->full_name() + << ", tablet=" << tablet->full_name() << ", transaction_id=" << request.transaction_id; for (TabletVars& tablet_var : *tablet_vars) { if (tablet_var.tablet == nullptr) { @@ -944,6 +944,70 @@ OLAPStatus PushBrokerReader::init(const Schema* schema, const TBrokerScanRange& return OLAP_SUCCESS; } +OLAPStatus PushBrokerReader::fill_field_row(RowCursorCell* dst, const char* src, bool src_null, MemPool* mem_pool, FieldType type){ + switch (type) { + case OLAP_FIELD_TYPE_DECIMAL: { + dst->set_is_null(src_null); + if (src_null) { + break; + } + auto *decimal_value = reinterpret_cast(src); + auto *storage_decimal_value = reinterpret_cast(dst->mutable_cell_ptr()); + storage_decimal_value->integer = decimal_value->int_value(); + storage_decimal_value->fraction = decimal_value->frac_value(); + break; + } + case OLAP_FIELD_TYPE_DATETIME: { + dst->set_is_null(src_null); + if (src_null) { + break; + } + + auto* datetime_value = reinterpret_cast(src); + auto* storage_datetime_value = reinterpret_cast(dst->mutable_cell_ptr()); + *storage_datetime_value = datetime_value->to_olap_datetime(); + break; + } + + case OLAP_FIELD_TYPE_DATE: { + dst->set_is_null(src_null); + if (src_null) { + break; + } + + auto* date_value = reinterpret_cast(src); + auto* storage_date_value = reinterpret_cast(dst->mutable_cell_ptr()); + *storage_date_value = static_cast(date_value->to_olap_date()); + break; + } + case OLAP_FIELD_TYPE_BOOL: + case OLAP_FIELD_TYPE_TINYINT: + case OLAP_FIELD_TYPE_SMALLINT: + case OLAP_FIELD_TYPE_INT: + case OLAP_FIELD_TYPE_UNSIGNED_INT: + case OLAP_FIELD_TYPE_BIGINT: + case OLAP_FIELD_TYPE_LARGEINT: + case OLAP_FIELD_TYPE_FLOAT: + case OLAP_FIELD_TYPE_DOUBLE: + case OLAP_FIELD_TYPE_CHAR: + case OLAP_FIELD_TYPE_VARCHAR: + case OLAP_FIELD_TYPE_HLL: + case OLAP_FIELD_TYPE_OBJECT:{ + dst->set_is_null(src_null); + if (src_null) { + break; + } + const TypeInfo* type_info = get_type_info(type); + type_info->deep_copy(dst->mutable_cell_ptr(), src, mem_pool); + break; + } + default: + return OLAP_ERR_INVALID_SCHEMA; + } + + return OLAP_SUCCESS; +} + OLAPStatus PushBrokerReader::next(ContiguousRow* row) { if (!_ready || row == nullptr) { return OLAP_ERR_INPUT_PARAMETER_ERROR; @@ -961,22 +1025,18 @@ OLAPStatus PushBrokerReader::next(ContiguousRow* row) { } auto slot_descs = _tuple_desc->slots(); - size_t num_key_columns = _schema->num_key_columns(); - // finalize row for (size_t i = 0; i < slot_descs.size(); ++i) { auto cell = row->cell(i); const SlotDescriptor* slot = slot_descs[i]; bool is_null = _tuple->is_null(slot->null_indicator_offset()); const void* value = _tuple->get_slot(slot->tuple_offset()); - // try execute init method defined in aggregateInfo - // by default it only copies data into cell - _schema->column(i)->consume(&cell, (const char*)value, is_null, _mem_pool.get(), - _runtime_state->obj_pool()); - // if column(i) is a value column, try execute finalize method defined in aggregateInfo - // to convert data into final format - if (i >= num_key_columns) { - _schema->column(i)->agg_finalize(&cell, _mem_pool.get()); + + FieldType type = _schema->column(i)->type(); + OLAPStatus field_status = fill_field_row(&cell, (const char*)value, is_null, _mem_pool.get(), type); + if (field_status!= OLAP_SUCCESS) { + LOG(WARNING) << "fill field row failed in spark load, slot index: " << i << ", type: " << type; + return OLAP_ERR_SCHEMA_SCHEMA_FIELD_INVALID; } } diff --git a/be/src/olap/push_handler.h b/be/src/olap/push_handler.h index 7f8d0aad50..e1fb787fd3 100644 --- a/be/src/olap/push_handler.h +++ b/be/src/olap/push_handler.h @@ -200,6 +200,7 @@ public: MemPool* mem_pool() { return _mem_pool.get(); } private: + OLAPStatus fill_field_row(RowCursorCell* dst, const char* src,bool src_null, MemPool* mem_pool, FieldType type); bool _ready; bool _eof; TupleDescriptor* _tuple_desc;