From 819ab6fc002d5763040a781b059a45c3429038fd Mon Sep 17 00:00:00 2001 From: zhangstar333 <87313068+zhangstar333@users.noreply.github.com> Date: Thu, 29 Feb 2024 20:09:24 +0800 Subject: [PATCH] [feature](sink) support paritition tablet sink shuffle (#30914) Co-authored-by: morrySnow --- be/src/vec/sink/vdata_stream_sender.cpp | 102 +++++++++++++++++- be/src/vec/sink/vdata_stream_sender.h | 41 ++++++- be/src/vec/sink/vrow_distribution.h | 3 + .../translator/PhysicalPlanTranslator.java | 3 + .../DistributionSpecTabletIdShuffle.java | 35 ++++++ .../properties/PhysicalProperties.java | 3 + .../trees/plans/commands/InsertExecutor.java | 26 ++++- .../commands/InsertIntoTableCommand.java | 15 ++- .../plans/physical/PhysicalOlapTableSink.java | 21 +--- .../apache/doris/planner/DataPartition.java | 34 ++---- .../apache/doris/planner/DataStreamSink.java | 46 ++++++++ .../apache/doris/planner/OlapTableSink.java | 22 +++- .../org/apache/doris/qe/SessionVariable.java | 9 ++ gensrc/thrift/DataSinks.thrift | 7 +- 14 files changed, 310 insertions(+), 57 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecTabletIdShuffle.java diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index e690c8eb59..a7f1b4b858 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -23,15 +23,19 @@ #include #include #include +#include #include #include +#include #include #include +#include #include #include "common/object_pool.h" #include "common/status.h" +#include "exec/tablet_info.h" #include "pipeline/exec/exchange_sink_operator.h" #include "pipeline/exec/result_file_sink_operator.h" #include "runtime/descriptors.h" @@ -41,10 +45,13 @@ #include "runtime/types.h" #include "util/proto_util.h" #include "vec/columns/column_const.h" +#include "vec/columns/columns_number.h" #include "vec/common/sip_hash.h" #include "vec/exprs/vexpr.h" #include "vec/runtime/vdata_stream_mgr.h" #include "vec/runtime/vdata_stream_recvr.h" +#include "vec/sink/vrow_distribution.h" +#include "vec/sink/writer/vtablet_writer_v2.h" namespace doris::vectorized { @@ -329,6 +336,7 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int sink.output_partition.type == TPartitionType::HASH_PARTITIONED || sink.output_partition.type == TPartitionType::RANDOM || sink.output_partition.type == TPartitionType::RANGE_PARTITIONED || + sink.output_partition.type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED || sink.output_partition.type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED); std::map fragment_id_to_channel_index; @@ -412,6 +420,34 @@ Status VDataStreamSender::init(const TDataSink& tsink) { RETURN_IF_ERROR(_partitioner->init(t_stream_sink.output_partition.partition_exprs)); } else if (_part_type == TPartitionType::RANGE_PARTITIONED) { return Status::InternalError("TPartitionType::RANGE_PARTITIONED should not be used"); + } else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { + _txn_id = t_stream_sink.tablet_sink_txn_id; + _schema = std::make_shared(); + RETURN_IF_ERROR(_schema->init(t_stream_sink.tablet_sink_schema)); + _vpartition = std::make_unique( + _schema, t_stream_sink.tablet_sink_partition); + RETURN_IF_ERROR(_vpartition->init()); + auto find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_ROW; + _tablet_finder = std::make_unique(_vpartition.get(), find_tablet_mode); + _tablet_sink_tuple_desc = + _state->desc_tbl().get_tuple_descriptor(t_stream_sink.tablet_sink_tuple_id); + _tablet_sink_row_desc = _pool->add(new RowDescriptor(_tablet_sink_tuple_desc, false)); + //_block_convertor no need init_autoinc_info here + _block_convertor = + std::make_unique(_tablet_sink_tuple_desc); + _location = _pool->add(new OlapTableLocationParam(t_stream_sink.tablet_sink_location)); + _row_distribution.init({.state = _state, + .block_convertor = _block_convertor.get(), + .tablet_finder = _tablet_finder.get(), + .vpartition = _vpartition.get(), + .add_partition_request_timer = _add_partition_request_timer, + .txn_id = _txn_id, + .pool = _pool, + .location = _location, + .vec_output_expr_ctxs = &_fake_expr_ctxs, + .schema = _schema, + .caller = (void*)this, + .create_partition_callback = &empty_callback_function}); } else { // UNPARTITIONED } @@ -488,6 +524,8 @@ Status VDataStreamSender::open(RuntimeState* state) { if (_part_type == TPartitionType::HASH_PARTITIONED || _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) { RETURN_IF_ERROR(_partitioner->open(state)); + } else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { + RETURN_IF_ERROR(_row_distribution.open(_tablet_sink_row_desc)); } _compression_type = state->fragement_transmission_compression_type(); @@ -502,6 +540,25 @@ void VDataStreamSender::_handle_eof_channel(RuntimeState* state, ChannelPtrType static_cast(channel->close(state, Status::OK())); } +Status VDataStreamSender::_send_new_partition_batch() { + if (_row_distribution.need_deal_batching()) { // maybe try_close more than 1 time + RETURN_IF_ERROR(_row_distribution.automatic_create_partition()); + Block tmp_block = _row_distribution._batching_block->to_block(); // Borrow out, for lval ref + + // these order is only. + // 1. clear batching stats(and flag goes true) so that we won't make a new batching process in dealing batched block. + // 2. deal batched block + // 3. now reuse the column of lval block. cuz write doesn't real adjust it. it generate a new block from that. + _row_distribution.clear_batching_stats(); + RETURN_IF_ERROR(this->send(_state, &tmp_block, false)); + // Recovery back + _row_distribution._batching_block->set_mutable_columns(tmp_block.mutate_columns()); + _row_distribution._batching_block->clear_column_data(); + _row_distribution._deal_batched = false; + } + return Status::OK(); +} + Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { SCOPED_TIMER(_profile->total_time_counter()); SCOPED_TIMER(_exec_timer); @@ -627,6 +684,38 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { (uint32_t*)_partitioner->get_channel_ids(), rows, block, _enable_pipeline_exec ? eos : false)); } + } else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { + // check out of limit + RETURN_IF_ERROR(_send_new_partition_batch()); + if (UNLIKELY(block->rows() == 0)) { + return Status::OK(); + } + std::shared_ptr convert_block; + bool has_filtered_rows = false; + int64_t filtered_rows = 0; + _number_input_rows += block->rows(); + RETURN_IF_ERROR(_row_distribution.generate_rows_distribution( + *block, convert_block, filtered_rows, has_filtered_rows, _row_part_tablet_ids, + _number_input_rows)); + + const auto& row_ids = _row_part_tablet_ids[0].row_ids; + const auto& tablet_ids = _row_part_tablet_ids[0].tablet_ids; + const auto& num_channels = _channels.size(); + std::vector> channel2rows; + channel2rows.resize(num_channels); + for (int idx = 0; idx < row_ids.size(); ++idx) { + const auto& row = row_ids[idx]; + const auto& tablet_id = tablet_ids[idx]; + channel2rows[tablet_id % num_channels].emplace_back(row); + } + + RETURN_IF_ERROR(channel_add_rows_with_idx(state, _channels, num_channels, channel2rows, + convert_block.get(), + _enable_pipeline_exec ? eos : false)); + if (eos) { + _row_distribution._deal_batched = true; + RETURN_IF_ERROR(_send_new_partition_batch()); + } } else { // Range partition // 1. calculate range @@ -662,6 +751,12 @@ Status VDataStreamSender::close(RuntimeState* state, Status exec_status) { { // send last block SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + // non pipeline engin not pass eos in send function, and maybe have create partition at last block + // so at here to check again + if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { + _row_distribution._deal_batched = true; + RETURN_IF_ERROR(_send_new_partition_batch()); + } if (_serializer.get_block() && _serializer.get_block()->rows() > 0) { Block block = _serializer.get_block()->to_block(); RETURN_IF_ERROR( @@ -694,7 +789,12 @@ Status VDataStreamSender::close(RuntimeState* state, Status exec_status) { } } } - + if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { + _state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() + + _tablet_finder->num_filtered_rows()); + _state->update_num_rows_load_unselected( + _tablet_finder->num_immutable_partition_filtered_rows()); + } if (_peak_memory_usage_counter) { _peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); } diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 2834c7e2aa..88a948ed05 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -29,6 +29,7 @@ #include #include +#include #include #include #include @@ -39,6 +40,7 @@ #include "common/logging.h" #include "common/status.h" #include "exec/data_sink.h" +#include "exec/tablet_info.h" #include "pipeline/exec/exchange_sink_buffer.h" #include "service/backend_options.h" #include "util/ref_count_closure.h" @@ -48,6 +50,8 @@ #include "vec/exprs/vexpr_context.h" #include "vec/runtime/partitioner.h" #include "vec/runtime/vdata_stream_recvr.h" +#include "vec/sink/vrow_distribution.h" +#include "vec/sink/vtablet_finder.h" namespace doris { class ObjectPool; @@ -160,10 +164,18 @@ protected: Status channel_add_rows(RuntimeState* state, Channels& channels, int num_channels, const HashValueType* __restrict channel_ids, int rows, Block* block, bool eos); + template + Status channel_add_rows_with_idx(RuntimeState* state, Channels& channels, int num_channels, + std::vector>& channel2rows, Block* block, + bool eos); template void _handle_eof_channel(RuntimeState* state, ChannelPtrType channel, Status st); + static Status empty_callback_function(void* sender, TCreatePartitionResult* result) { + return Status::OK(); + } + Status _send_new_partition_batch(); // Sender instance id, unique within a fragment. int _sender_id; @@ -221,6 +233,22 @@ protected: bool _enable_pipeline_exec = false; BlockSerializer _serializer; + + // for shuffle data by partition and tablet + VRowDistribution _row_distribution; + RuntimeProfile::Counter* _add_partition_request_timer = nullptr; + int64_t _txn_id = -1; + RowDescriptor* _tablet_sink_row_desc = nullptr; + TupleDescriptor* _tablet_sink_tuple_desc = nullptr; + OlapTableLocationParam* _location = nullptr; + int64_t _number_input_rows = 0; + // reuse to avoid frequent memory allocation and release. + std::vector _row_part_tablet_ids; + vectorized::VExprContextSPtrs _fake_expr_ctxs; + std::unique_ptr _vpartition = nullptr; + std::unique_ptr _tablet_finder = nullptr; + std::shared_ptr _schema = nullptr; + std::unique_ptr _block_convertor = nullptr; }; template @@ -402,12 +430,20 @@ Status VDataStreamSender::channel_add_rows(RuntimeState* state, Channels& channe int num_channels, const HashValueType* __restrict channel_ids, int rows, Block* block, bool eos) { - std::vector channel2rows[num_channels]; - + std::vector> channel2rows; + channel2rows.resize(num_channels); for (uint32_t i = 0; i < rows; i++) { channel2rows[channel_ids[i]].emplace_back(i); } + RETURN_IF_ERROR( + channel_add_rows_with_idx(state, channels, num_channels, channel2rows, block, eos)); + return Status::OK(); +} +template +Status VDataStreamSender::channel_add_rows_with_idx( + RuntimeState* state, Channels& channels, int num_channels, + std::vector>& channel2rows, Block* block, bool eos) { Status status; for (int i = 0; i < num_channels; ++i) { if (!channels[i]->is_receiver_eof() && !channel2rows[i].empty()) { @@ -424,7 +460,6 @@ Status VDataStreamSender::channel_add_rows(RuntimeState* state, Channels& channe } } } - return Status::OK(); } diff --git a/be/src/vec/sink/vrow_distribution.h b/be/src/vec/sink/vrow_distribution.h index 54cd6f42ce..12acda73ed 100644 --- a/be/src/vec/sink/vrow_distribution.h +++ b/be/src/vec/sink/vrow_distribution.h @@ -161,8 +161,11 @@ private: // for auto partitions std::vector> _partitions_need_create; + +public: std::unique_ptr _batching_block; bool _deal_batched = false; // If true, send batched block before any block's append. +private: size_t _batching_rows = 0, _batching_bytes = 0; OlapTableBlockConvertor* _block_convertor = nullptr; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index c9cb534c9c..c64965080f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -70,6 +70,7 @@ import org.apache.doris.nereids.properties.DistributionSpecHash; import org.apache.doris.nereids.properties.DistributionSpecReplicated; import org.apache.doris.nereids.properties.DistributionSpecStorageAny; import org.apache.doris.nereids.properties.DistributionSpecStorageGather; +import org.apache.doris.nereids.properties.DistributionSpecTabletIdShuffle; import org.apache.doris.nereids.properties.OrderKey; import org.apache.doris.nereids.properties.PhysicalProperties; import org.apache.doris.nereids.rules.implementation.LogicalWindowToPhysicalWindow.WindowFrameGroup; @@ -2458,6 +2459,8 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor locationParams = olapTableSink + .createLocation(olapTableSink.getDstTable()); + dataStreamSink.setTabletSinkLocationParam(locationParams.get(0)); + dataStreamSink.setTabletSinkTxnId(olapTableSink.getTxnId()); + } } catch (Exception e) { throw new AnalysisException(e.getMessage(), e); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java index d5101aabcd..166219dfae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java @@ -37,9 +37,12 @@ import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.plans.Explainable; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation; import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; +import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.planner.DataSink; import org.apache.doris.planner.GroupCommitPlanner; @@ -183,7 +186,7 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync, physicalOlapTableSink.getDatabase(), physicalOlapTableSink.getTargetTable(), label, planner); insertExecutor.beginTransaction(); - insertExecutor.finalizeSink(sink, physicalOlapTableSink.isPartialUpdate(), + insertExecutor.finalizeSink(planner.getFragments().get(0), sink, physicalOlapTableSink.isPartialUpdate(), physicalOlapTableSink.getDmlCommandType() == DMLCommandType.INSERT, this.allowAutoPartition); } finally { targetTableIf.readUnlock(); @@ -254,12 +257,20 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync, } OlapTable targetTable = physicalOlapTableSink.getTargetTable(); return ctx.getSessionVariable().getSqlMode() != SqlModeHelper.MODE_NO_BACKSLASH_ESCAPES - && !ctx.isTxnModel() && sink.getFragment().getPlanRoot() instanceof UnionNode + && !ctx.isTxnModel() && isGroupCommitAvailablePlan(physicalOlapTableSink) && physicalOlapTableSink.getPartitionIds().isEmpty() && targetTable.getTableProperty() .getUseSchemaLightChange() && !targetTable.getQualifiedDbName() .equalsIgnoreCase(FeConstants.INTERNAL_DB_NAME); } + private boolean isGroupCommitAvailablePlan(PhysicalOlapTableSink sink) { + Plan child = sink.child(); + if (child instanceof PhysicalDistribute) { + child = child.child(0); + } + return child instanceof OneRowRelation || (child instanceof PhysicalUnion && child.arity() == 0); + } + @Override public Plan getExplainPlan(ConnectContext ctx) { if (!ctx.getSessionVariable().isEnableNereidsDML()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java index 11a85ed100..86a6a4b8c6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java @@ -25,7 +25,6 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.RandomDistributionInfo; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.memo.GroupExpression; -import org.apache.doris.nereids.properties.DistributionSpecHash.ShuffleType; import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.properties.PhysicalProperties; import org.apache.doris.nereids.trees.expressions.Expression; @@ -42,13 +41,11 @@ import org.apache.doris.statistics.Statistics; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.stream.Collectors; /** * physical olap table sink for insert command @@ -218,23 +215,7 @@ public class PhysicalOlapTableSink extends PhysicalSink if (targetTable.isPartitionDistributed()) { DistributionInfo distributionInfo = targetTable.getDefaultDistributionInfo(); if (distributionInfo instanceof HashDistributionInfo) { - HashDistributionInfo hashDistributionInfo - = ((HashDistributionInfo) targetTable.getDefaultDistributionInfo()); - List distributedColumns = hashDistributionInfo.getDistributionColumns(); - List columnIndexes = Lists.newArrayList(); - int idx = 0; - for (int i = 0; i < targetTable.getFullSchema().size(); ++i) { - if (targetTable.getFullSchema().get(i).equals(distributedColumns.get(idx))) { - columnIndexes.add(i); - idx++; - if (idx == distributedColumns.size()) { - break; - } - } - } - return PhysicalProperties.createHash(columnIndexes.stream() - .map(colIdx -> child().getOutput().get(colIdx).getExprId()) - .collect(Collectors.toList()), ShuffleType.NATURAL); + return PhysicalProperties.TABLET_ID_SHUFFLE; } else if (distributionInfo instanceof RandomDistributionInfo) { return PhysicalProperties.ANY; } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java index 7535208694..9c5c375a35 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java @@ -32,8 +32,6 @@ import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import java.util.List; @@ -46,16 +44,14 @@ import java.util.List; * TODO: better name? just Partitioning? */ public class DataPartition { - private static final Logger LOG = LogManager.getLogger(DataPartition.class); public static final DataPartition UNPARTITIONED = new DataPartition(TPartitionType.UNPARTITIONED); - public static final DataPartition RANDOM = new DataPartition(TPartitionType.RANDOM); + public static final DataPartition TABLET_ID = new DataPartition(TPartitionType.TABLET_SINK_SHUFFLE_PARTITIONED); private final TPartitionType type; - // for hash partition: exprs used to compute hash value - private ImmutableList partitionExprs = ImmutableList.of(); + private ImmutableList partitionExprs; public DataPartition(TPartitionType type, List exprs) { Preconditions.checkNotNull(exprs); @@ -67,13 +63,10 @@ public class DataPartition { this.partitionExprs = ImmutableList.copyOf(exprs); } - public void substitute(ExprSubstitutionMap smap, Analyzer analyzer) throws AnalysisException { - List list = Expr.trySubstituteList(partitionExprs, smap, analyzer, false); - partitionExprs = ImmutableList.copyOf(list); - } - public DataPartition(TPartitionType type) { - Preconditions.checkState(type == TPartitionType.UNPARTITIONED || type == TPartitionType.RANDOM); + Preconditions.checkState(type == TPartitionType.UNPARTITIONED + || type == TPartitionType.RANDOM + || type == TPartitionType.TABLET_SINK_SHUFFLE_PARTITIONED); this.type = type; this.partitionExprs = ImmutableList.of(); } @@ -82,6 +75,11 @@ public class DataPartition { return new DataPartition(TPartitionType.HASH_PARTITIONED, exprs); } + public void substitute(ExprSubstitutionMap smap, Analyzer analyzer) throws AnalysisException { + List list = Expr.trySubstituteList(partitionExprs, smap, analyzer, false); + partitionExprs = ImmutableList.copyOf(list); + } + public boolean isPartitioned() { return type != TPartitionType.UNPARTITIONED; } @@ -106,16 +104,6 @@ public class DataPartition { return result; } - /** - * Returns true if 'this' is a partition that is compatible with the - * requirements of 's'. - * TODO: specify more clearly and implement - */ - public boolean isCompatible(DataPartition s) { - // TODO: implement - return true; - } - public String getExplainString(TExplainLevel explainLevel) { StringBuilder str = new StringBuilder(); str.append(type.toString()); @@ -127,7 +115,7 @@ public class DataPartition { for (Expr expr : partitionExprs) { strings.add(expr.toSql()); } - str.append(": " + Joiner.on(", ").join(strings)); + str.append(": ").append(Joiner.on(", ").join(strings)); } str.append("\n"); return str.toString(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java index 4d4a2c641a..b9cf516bc3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java @@ -27,8 +27,12 @@ import org.apache.doris.thrift.TDataSink; import org.apache.doris.thrift.TDataSinkType; import org.apache.doris.thrift.TDataStreamSink; import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.TOlapTableLocationParam; +import org.apache.doris.thrift.TOlapTablePartitionParam; +import org.apache.doris.thrift.TOlapTableSchemaParam; import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.springframework.util.CollectionUtils; @@ -52,6 +56,13 @@ public class DataStreamSink extends DataSink { protected List runtimeFilters = Lists.newArrayList(); + // use for tablet id shuffle sink only + protected TOlapTableSchemaParam tabletSinkSchemaParam = null; + protected TOlapTablePartitionParam tabletSinkPartitionParam = null; + protected TOlapTableLocationParam tabletSinkLocationParam = null; + protected TupleDescriptor tabletSinkTupleDesc = null; + protected long tabletSinkTxnId = -1; + public DataStreamSink() { } @@ -118,6 +129,26 @@ public class DataStreamSink extends DataSink { this.runtimeFilters.add(runtimeFilter); } + public void setTabletSinkSchemaParam(TOlapTableSchemaParam schemaParam) { + this.tabletSinkSchemaParam = schemaParam; + } + + public void setTabletSinkPartitionParam(TOlapTablePartitionParam partitionParam) { + this.tabletSinkPartitionParam = partitionParam; + } + + public void setTabletSinkTupleDesc(TupleDescriptor tupleDesc) { + this.tabletSinkTupleDesc = tupleDesc; + } + + public void setTabletSinkLocationParam(TOlapTableLocationParam locationParam) { + this.tabletSinkLocationParam = locationParam; + } + + public void setTabletSinkTxnId(long txnId) { + this.tabletSinkTxnId = txnId; + } + @Override public String getExplainString(String prefix, TExplainLevel explainLevel) { StringBuilder strBuilder = new StringBuilder(); @@ -179,6 +210,21 @@ public class DataStreamSink extends DataSink { tStreamSink.addToRuntimeFilters(rf.toThrift()); } } + Preconditions.checkState((tabletSinkSchemaParam != null) == (tabletSinkPartitionParam != null), + "schemaParam and partitionParam should be set together."); + if (tabletSinkSchemaParam != null) { + tStreamSink.setTabletSinkSchema(tabletSinkSchemaParam); + } + if (tabletSinkPartitionParam != null) { + tStreamSink.setTabletSinkPartition(tabletSinkPartitionParam); + } + if (tabletSinkTupleDesc != null) { + tStreamSink.setTabletSinkTupleId(tabletSinkTupleDesc.getId().asInt()); + } + if (tabletSinkLocationParam != null) { + tStreamSink.setTabletSinkLocation(tabletSinkLocationParam); + } + tStreamSink.setTabletSinkTxnId(tabletSinkTxnId); result.setStreamSink(tStreamSink); return result; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index 485fde67fb..924b64f87c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -110,6 +110,7 @@ public class OlapTableSink extends DataSink { private boolean singleReplicaLoad; private boolean isStrictMode = false; + private long txnId = -1; public OlapTableSink(OlapTable dstTable, TupleDescriptor tupleDescriptor, List partitionIds, boolean singleReplicaLoad) { @@ -129,6 +130,7 @@ public class OlapTableSink extends DataSink { tSink.setLoadChannelTimeoutS(loadChannelTimeoutS); tSink.setSendBatchParallelism(sendBatchParallelism); this.isStrictMode = isStrictMode; + this.txnId = txnId; if (loadToSingleTablet && !(dstTable.getDefaultDistributionInfo() instanceof RandomDistributionInfo)) { throw new AnalysisException( "if load_to_single_tablet set to true," + " the olap table must be with random distribution"); @@ -237,7 +239,7 @@ public class OlapTableSink extends DataSink { return tDataSink; } - private TOlapTableSchemaParam createSchema(long dbId, OlapTable table, Analyzer analyzer) throws AnalysisException { + public TOlapTableSchemaParam createSchema(long dbId, OlapTable table, Analyzer analyzer) throws AnalysisException { TOlapTableSchemaParam schemaParam = new TOlapTableSchemaParam(); schemaParam.setDbId(dbId); schemaParam.setTableId(table.getId()); @@ -321,7 +323,7 @@ public class OlapTableSink extends DataSink { return distColumns; } - private TOlapTablePartitionParam createPartition(long dbId, OlapTable table, Analyzer analyzer) + public TOlapTablePartitionParam createPartition(long dbId, OlapTable table, Analyzer analyzer) throws UserException { TOlapTablePartitionParam partitionParam = new TOlapTablePartitionParam(); partitionParam.setDbId(dbId); @@ -479,7 +481,7 @@ public class OlapTableSink extends DataSink { } } - private List createLocation(OlapTable table) throws UserException { + public List createLocation(OlapTable table) throws UserException { TOlapTableLocationParam locationParam = new TOlapTableLocationParam(); TOlapTableLocationParam slaveLocationParam = new TOlapTableLocationParam(); // BE id -> path hash @@ -571,7 +573,7 @@ public class OlapTableSink extends DataSink { bePathsMap.putAll(result); } - private TPaloNodesInfo createPaloNodesInfo() { + public TPaloNodesInfo createPaloNodesInfo() { TPaloNodesInfo nodesInfo = new TPaloNodesInfo(); SystemInfoService systemInfoService = Env.getCurrentSystemInfo(); for (Long id : systemInfoService.getAllBackendIds(false)) { @@ -584,4 +586,16 @@ public class OlapTableSink extends DataSink { protected TDataSinkType getDataSinkType() { return TDataSinkType.OLAP_TABLE_SINK; } + + public OlapTable getDstTable() { + return dstTable; + } + + public TupleDescriptor getTupleDescriptor() { + return tupleDescriptor; + } + + public long getTxnId() { + return txnId; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index a5bc477c3f..e561ded693 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -3206,6 +3206,15 @@ public class SessionVariable implements Serializable, Writable { this.dumpNereidsMemo = dumpNereidsMemo; } + public void disableStrictConsistencyDmlOnce() throws DdlException { + if (!enableStrictConsistencyDml) { + return; + } + setIsSingleSetVar(true); + VariableMgr.setVar(this, + new SetVar(SessionVariable.ENABLE_STRICT_CONSISTENCY_DML, new StringLiteral("false"))); + } + public void enableFallbackToOriginalPlannerOnce() throws DdlException { if (enableFallbackToOriginalPlanner) { return; diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index 068b592700..602943b420 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -172,8 +172,11 @@ struct TDataStreamSink { 7: optional list runtime_filters // used for partition_type = TABLET_SINK_SHUFFLE_PARTITIONED - 8: optional Descriptors.TOlapTableSchemaParam schema - 9: optional Descriptors.TOlapTablePartitionParam partition + 8: optional Descriptors.TOlapTableSchemaParam tablet_sink_schema + 9: optional Descriptors.TOlapTablePartitionParam tablet_sink_partition + 10: optional Descriptors.TOlapTableLocationParam tablet_sink_location + 11: optional i64 tablet_sink_txn_id + 12: optional Types.TTupleId tablet_sink_tuple_id } struct TMultiCastDataStreamSink {