diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 9369c73709..3be046a1c8 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -196,6 +196,8 @@ Status NodeChannel::_wait_in_flight_packet() { if (!_has_in_flight_packet) { return Status::OK(); } + + SCOPED_RAW_TIMER(_parent->mutable_wait_in_flight_packet_ns()); _add_batch_closure->join(); _has_in_flight_packet = false; if (_add_batch_closure->cntl.Failed()) { @@ -214,6 +216,7 @@ Status NodeChannel::_send_cur_batch(bool eos) { _add_batch_request.set_eos(eos); _add_batch_request.set_packet_seq(_next_packet_seq); if (_batch->num_rows() > 0) { + SCOPED_RAW_TIMER(_parent->mutable_serialize_batch_ns()); _batch->serialize(_add_batch_request.mutable_row_batch()); } @@ -498,6 +501,8 @@ Status OlapTableSink::prepare(RuntimeState* state) { _validate_data_timer = ADD_TIMER(_profile, "ValidateDataTime"); _open_timer = ADD_TIMER(_profile, "OpenTime"); _close_timer = ADD_TIMER(_profile, "CloseTime"); + _wait_in_flight_packet_timer = ADD_TIMER(_profile, "WaitInFlightPacketTime"); + _serialize_batch_timer = ADD_TIMER(_profile, "SerializeBatchTime"); // open all channels auto& partitions = _partition->get_partitions(); @@ -602,6 +607,9 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) { COUNTER_SET(_send_data_timer, _send_data_ns); COUNTER_SET(_convert_batch_timer, _convert_batch_ns); COUNTER_SET(_validate_data_timer, _validate_data_ns); + COUNTER_SET(_wait_in_flight_packet_timer, _wait_in_flight_packet_ns); + COUNTER_SET(_serialize_batch_timer, _serialize_batch_ns); + state->update_num_rows_load_filtered(_number_filtered_rows); } else { for (auto channel : _channels) { diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 3bd5941f3c..dc7dc37041 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -168,6 +168,11 @@ public: return _profile; } + // these 2 counters does not thread-safe. make sure only one thread + // at a time can modify them. + int64_t* mutable_wait_in_flight_packet_ns() { return &_wait_in_flight_packet_ns; } + int64_t* mutable_serialize_batch_ns() { return &_serialize_batch_ns; } + private: // convert input batch to output batch which will be loaded into OLAP table. // this is only used in insert statement. @@ -236,6 +241,8 @@ private: int64_t _convert_batch_ns = 0; int64_t _validate_data_ns = 0; int64_t _send_data_ns = 0; + int64_t _wait_in_flight_packet_ns = 0; + int64_t _serialize_batch_ns = 0; int64_t _number_input_rows = 0; int64_t _number_output_rows = 0; int64_t _number_filtered_rows = 0; @@ -248,6 +255,8 @@ private: RuntimeProfile::Counter* _validate_data_timer = nullptr; RuntimeProfile::Counter* _open_timer = nullptr; RuntimeProfile::Counter* _close_timer = nullptr; + RuntimeProfile::Counter* _wait_in_flight_packet_timer = nullptr; + RuntimeProfile::Counter* _serialize_batch_timer = nullptr; }; } diff --git a/be/src/http/action/mini_load.cpp b/be/src/http/action/mini_load.cpp index f423d63f8c..097550f220 100644 --- a/be/src/http/action/mini_load.cpp +++ b/be/src/http/action/mini_load.cpp @@ -810,12 +810,12 @@ void MiniLoadAction::_new_handle(HttpRequest* req) { } } - if (!ctx->status.ok()) { + // if failed to commit and status is not PUBLISH_TIMEOUT, rollback the txn. + // PUBLISH_TIMEOUT is treated as OK in mini load, because user will use show load stmt + // to see the final result. + if (!ctx->status.ok() && ctx->status.code() != TStatusCode::PUBLISH_TIMEOUT) { if (ctx->need_rollback) { _exec_env->stream_load_executor()->rollback_txn(ctx); - if (ctx->status.code() == TStatusCode::PUBLISH_TIMEOUT) { - ctx->status = Status::PublishTimeout("transation has been rollback because it was timeout in phase of publish"); - } ctx->need_rollback = false; } if (ctx->body_sink.get() != nullptr) { @@ -823,7 +823,7 @@ void MiniLoadAction::_new_handle(HttpRequest* req) { } } - std::string str = to_json(ctx->status); + std::string str = ctx->to_json_for_mini_load(); HttpChannel::send_reply(req, str); } diff --git a/be/src/runtime/stream_load/stream_load_context.cpp b/be/src/runtime/stream_load/stream_load_context.cpp index c4ec694a00..27981526b6 100644 --- a/be/src/runtime/stream_load/stream_load_context.cpp +++ b/be/src/runtime/stream_load/stream_load_context.cpp @@ -76,6 +76,49 @@ std::string StreamLoadContext::to_json() const { return s.GetString(); } +/* + * The old mini load result format is as followes: + * (which defined in src/util/json_util.cpp) + * + * { + * "status" : "Success"("Fail"), + * "msg" : "xxxx" + * } + * + */ +std::string StreamLoadContext::to_json_for_mini_load() const { + rapidjson::StringBuffer s; + rapidjson::PrettyWriter writer(s); + writer.StartObject(); + + // status + bool show_ok = true; + writer.Key("status"); + switch (status.code()) { + case TStatusCode::OK: + writer.String("Success"); + break; + case TStatusCode::PUBLISH_TIMEOUT: + // treat PUBLISH_TIMEOUT as OK in mini load + writer.String("Success"); + break; + default: + writer.String("Fail"); + show_ok = false; + break; + } + // msg + writer.Key("msg"); + if (status.ok() || show_ok) { + writer.String("OK"); + } else { + writer.String(status.get_error_msg().c_str()); + } + writer.EndObject(); + return s.GetString(); +} + + std::string StreamLoadContext::brief(bool detail) const { std::stringstream ss; ss << "id=" << id << ", job id=" << job_id << ", txn id=" << txn_id << ", label=" << label; diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index c419290ca3..f2c5c519a4 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -101,6 +101,9 @@ public: } std::string to_json() const; + // the old mini load result format is not same as stream load. + // add this function for compatible with old mini load result format. + std::string to_json_for_mini_load() const; // return the brief info of this context. // also print the load source info if detail is set to true diff --git a/be/src/runtime/tablet_writer_mgr.cpp b/be/src/runtime/tablet_writer_mgr.cpp index d272afdac6..0e397cc6db 100644 --- a/be/src/runtime/tablet_writer_mgr.cpp +++ b/be/src/runtime/tablet_writer_mgr.cpp @@ -104,6 +104,7 @@ Status TabletsChannel::open(const PTabletWriterOpenRequest& params) { // Normal case, already open by other sender return Status::OK(); } + LOG(INFO) << "open tablets channel: " << _key; _txn_id = params.txn_id(); _index_id = params.index_id(); _schema = new OlapTableSchemaParam(); @@ -170,6 +171,7 @@ Status TabletsChannel::close(int sender_id, bool* finished, *finished = (_num_remaining_senders == 0); return _close_status; } + LOG(INFO) << "close tablets channel: " << _key; for (auto pid : partition_ids) { _partition_ids.emplace(pid); } @@ -251,7 +253,6 @@ TabletWriterMgr::~TabletWriterMgr() { Status TabletWriterMgr::open(const PTabletWriterOpenRequest& params) { TabletsChannelKey key(params.id(), params.index_id()); - LOG(INFO) << "open tablets writer channel: " << key; std::shared_ptr channel; { std::lock_guard l(_lock);