diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index aa3948e0a1..da536d66ec 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -197,53 +197,91 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { return Status::OK(); } -#define DO_RPC(QUEUE, BLOCK, HOLDER) \ - auto& request = QUEUE.front(); \ - if (!_instance_to_request[id]) { \ - _construct_request(id); \ - } \ - auto brpc_request = _instance_to_request[id]; \ - brpc_request->set_eos(request.eos); \ - brpc_request->set_packet_seq(_instance_to_seq[id]++); \ - if (request.BLOCK) { \ - brpc_request->set_allocated_block(request.BLOCK); \ - } \ - auto* _closure = new SelfDeleteClosure(id, request.eos, HOLDER); \ - _closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms); \ - _closure->addFailedHandler( \ - [&](const InstanceLoId& id, const std::string& err) { _failed(id, err); }); \ - _closure->addSuccessHandler([&](const InstanceLoId& id, const bool& eos, \ - const PTransmitDataResult& result) { \ - Status s = Status(result.status()); \ - if (!s.ok()) { \ - _failed(id, \ - fmt::format("exchange req success but status isn't ok: {}", s.to_string())); \ - } else if (eos) { \ - _ended(id); \ - } else { \ - _send_rpc(id); \ - } \ - }); \ - { \ - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); \ - if (enable_http_send_block(*brpc_request)) { \ - RETURN_IF_ERROR(transmit_block_http(_context->get_runtime_state(), _closure, \ - *brpc_request, request.channel->_brpc_dest_addr)); \ - } else { \ - transmit_block(*request.channel->_brpc_stub, _closure, *brpc_request); \ - } \ - } \ - if (request.BLOCK) { \ - brpc_request->release_block(); \ - } \ - QUEUE.pop(); - if (!q.empty()) { // If we have data to shuffle which is not broadcasted - DO_RPC(q, block.get(), nullptr) + auto& request = q.front(); + if (!_instance_to_request[id]) { + _construct_request(id); + } + auto brpc_request = _instance_to_request[id]; + brpc_request->set_eos(request.eos); + brpc_request->set_packet_seq(_instance_to_seq[id]++); + if (request.block) { + brpc_request->set_allocated_block(request.block.get()); + } + auto* _closure = new SelfDeleteClosure(id, request.eos, nullptr); + _closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms); + _closure->addFailedHandler( + [&](const InstanceLoId& id, const std::string& err) { _failed(id, err); }); + _closure->addSuccessHandler([&](const InstanceLoId& id, const bool& eos, + const PTransmitDataResult& result) { + Status s = Status(result.status()); + if (!s.ok()) { + _failed(id, + fmt::format("exchange req success but status isn't ok: {}", s.to_string())); + } else if (eos) { + _ended(id); + } else { + _send_rpc(id); + } + }); + { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); + if (enable_http_send_block(*brpc_request)) { + RETURN_IF_ERROR(transmit_block_http(_context->get_runtime_state(), _closure, + *brpc_request, + request.channel->_brpc_dest_addr)); + } else { + transmit_block(*request.channel->_brpc_stub, _closure, *brpc_request); + } + } + if (request.block) { + brpc_request->release_block(); + } + q.pop(); } else if (!broadcast_q.empty()) { // If we have data to shuffle which is broadcasted - DO_RPC(broadcast_q, block_holder->get_block(), request.block_holder) + auto& request = broadcast_q.front(); + if (!_instance_to_request[id]) { + _construct_request(id); + } + auto brpc_request = _instance_to_request[id]; + brpc_request->set_eos(request.eos); + brpc_request->set_packet_seq(_instance_to_seq[id]++); + if (request.block_holder->get_block()) { + brpc_request->set_allocated_block(request.block_holder->get_block()); + } + auto* _closure = + new SelfDeleteClosure(id, request.eos, request.block_holder); + _closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms); + _closure->addFailedHandler( + [&](const InstanceLoId& id, const std::string& err) { _failed(id, err); }); + _closure->addSuccessHandler([&](const InstanceLoId& id, const bool& eos, + const PTransmitDataResult& result) { + Status s = Status(result.status()); + if (!s.ok()) { + _failed(id, + fmt::format("exchange req success but status isn't ok: {}", s.to_string())); + } else if (eos) { + _ended(id); + } else { + _send_rpc(id); + } + }); + { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); + if (enable_http_send_block(*brpc_request)) { + RETURN_IF_ERROR(transmit_block_http(_context->get_runtime_state(), _closure, + *brpc_request, + request.channel->_brpc_dest_addr)); + } else { + transmit_block(*request.channel->_brpc_stub, _closure, *brpc_request); + } + } + if (request.block_holder->get_block()) { + brpc_request->release_block(); + } + broadcast_q.pop(); } else { _instance_to_sending_by_pipeline[id] = true; return Status::OK(); diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index bd35929f85..ac58b4f3e7 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -1384,13 +1384,15 @@ Status VOlapTableSink::_validate_column(RuntimeState* state, const TypeDescripto invalid = true; } } - - if (dec_val > _get_decimalv2_min_or_max(type) || - dec_val < _get_decimalv2_min_or_max(type)) { + const auto& max_decimalv2 = _get_decimalv2_min_or_max(type); + const auto& min_decimalv2 = _get_decimalv2_min_or_max(type); + if (dec_val > max_decimalv2 || dec_val < min_decimalv2) { fmt::format_to(error_msg, "{}", "decimal value is not valid for definition"); fmt::format_to(error_msg, ", value={}", dec_val.to_string()); - fmt::format_to(error_msg, ", precision={}, scale={}; ", type.precision, + fmt::format_to(error_msg, ", precision={}, scale={}", type.precision, type.scale); + fmt::format_to(error_msg, ", min={}, max={}; ", min_decimalv2.to_string(), + max_decimalv2.to_string()); invalid = true; }