Modify the result json format of mini load (#1487)
Mini load is now using stream load framework. But we should keep the mini load return behavior and result json format be same as old. So PUBLISH_TIMEOUT error should be treated as OK in mini load. Also add 2 counters for OlapTableSink profile: SerializeBatchTime: time of serializing all row batch. WaitInFlightPacketTime: time of waiting last send packet
This commit is contained in:
@ -196,6 +196,8 @@ Status NodeChannel::_wait_in_flight_packet() {
|
||||
if (!_has_in_flight_packet) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
SCOPED_RAW_TIMER(_parent->mutable_wait_in_flight_packet_ns());
|
||||
_add_batch_closure->join();
|
||||
_has_in_flight_packet = false;
|
||||
if (_add_batch_closure->cntl.Failed()) {
|
||||
@ -214,6 +216,7 @@ Status NodeChannel::_send_cur_batch(bool eos) {
|
||||
_add_batch_request.set_eos(eos);
|
||||
_add_batch_request.set_packet_seq(_next_packet_seq);
|
||||
if (_batch->num_rows() > 0) {
|
||||
SCOPED_RAW_TIMER(_parent->mutable_serialize_batch_ns());
|
||||
_batch->serialize(_add_batch_request.mutable_row_batch());
|
||||
}
|
||||
|
||||
@ -498,6 +501,8 @@ Status OlapTableSink::prepare(RuntimeState* state) {
|
||||
_validate_data_timer = ADD_TIMER(_profile, "ValidateDataTime");
|
||||
_open_timer = ADD_TIMER(_profile, "OpenTime");
|
||||
_close_timer = ADD_TIMER(_profile, "CloseTime");
|
||||
_wait_in_flight_packet_timer = ADD_TIMER(_profile, "WaitInFlightPacketTime");
|
||||
_serialize_batch_timer = ADD_TIMER(_profile, "SerializeBatchTime");
|
||||
|
||||
// open all channels
|
||||
auto& partitions = _partition->get_partitions();
|
||||
@ -602,6 +607,9 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) {
|
||||
COUNTER_SET(_send_data_timer, _send_data_ns);
|
||||
COUNTER_SET(_convert_batch_timer, _convert_batch_ns);
|
||||
COUNTER_SET(_validate_data_timer, _validate_data_ns);
|
||||
COUNTER_SET(_wait_in_flight_packet_timer, _wait_in_flight_packet_ns);
|
||||
COUNTER_SET(_serialize_batch_timer, _serialize_batch_ns);
|
||||
|
||||
state->update_num_rows_load_filtered(_number_filtered_rows);
|
||||
} else {
|
||||
for (auto channel : _channels) {
|
||||
|
||||
@ -168,6 +168,11 @@ public:
|
||||
return _profile;
|
||||
}
|
||||
|
||||
// these 2 counters does not thread-safe. make sure only one thread
|
||||
// at a time can modify them.
|
||||
int64_t* mutable_wait_in_flight_packet_ns() { return &_wait_in_flight_packet_ns; }
|
||||
int64_t* mutable_serialize_batch_ns() { return &_serialize_batch_ns; }
|
||||
|
||||
private:
|
||||
// convert input batch to output batch which will be loaded into OLAP table.
|
||||
// this is only used in insert statement.
|
||||
@ -236,6 +241,8 @@ private:
|
||||
int64_t _convert_batch_ns = 0;
|
||||
int64_t _validate_data_ns = 0;
|
||||
int64_t _send_data_ns = 0;
|
||||
int64_t _wait_in_flight_packet_ns = 0;
|
||||
int64_t _serialize_batch_ns = 0;
|
||||
int64_t _number_input_rows = 0;
|
||||
int64_t _number_output_rows = 0;
|
||||
int64_t _number_filtered_rows = 0;
|
||||
@ -248,6 +255,8 @@ private:
|
||||
RuntimeProfile::Counter* _validate_data_timer = nullptr;
|
||||
RuntimeProfile::Counter* _open_timer = nullptr;
|
||||
RuntimeProfile::Counter* _close_timer = nullptr;
|
||||
RuntimeProfile::Counter* _wait_in_flight_packet_timer = nullptr;
|
||||
RuntimeProfile::Counter* _serialize_batch_timer = nullptr;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@ -810,12 +810,12 @@ void MiniLoadAction::_new_handle(HttpRequest* req) {
|
||||
}
|
||||
}
|
||||
|
||||
if (!ctx->status.ok()) {
|
||||
// if failed to commit and status is not PUBLISH_TIMEOUT, rollback the txn.
|
||||
// PUBLISH_TIMEOUT is treated as OK in mini load, because user will use show load stmt
|
||||
// to see the final result.
|
||||
if (!ctx->status.ok() && ctx->status.code() != TStatusCode::PUBLISH_TIMEOUT) {
|
||||
if (ctx->need_rollback) {
|
||||
_exec_env->stream_load_executor()->rollback_txn(ctx);
|
||||
if (ctx->status.code() == TStatusCode::PUBLISH_TIMEOUT) {
|
||||
ctx->status = Status::PublishTimeout("transation has been rollback because it was timeout in phase of publish");
|
||||
}
|
||||
ctx->need_rollback = false;
|
||||
}
|
||||
if (ctx->body_sink.get() != nullptr) {
|
||||
@ -823,7 +823,7 @@ void MiniLoadAction::_new_handle(HttpRequest* req) {
|
||||
}
|
||||
}
|
||||
|
||||
std::string str = to_json(ctx->status);
|
||||
std::string str = ctx->to_json_for_mini_load();
|
||||
HttpChannel::send_reply(req, str);
|
||||
}
|
||||
|
||||
|
||||
@ -76,6 +76,49 @@ std::string StreamLoadContext::to_json() const {
|
||||
return s.GetString();
|
||||
}
|
||||
|
||||
/*
|
||||
* The old mini load result format is as followes:
|
||||
* (which defined in src/util/json_util.cpp)
|
||||
*
|
||||
* {
|
||||
* "status" : "Success"("Fail"),
|
||||
* "msg" : "xxxx"
|
||||
* }
|
||||
*
|
||||
*/
|
||||
std::string StreamLoadContext::to_json_for_mini_load() const {
|
||||
rapidjson::StringBuffer s;
|
||||
rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
|
||||
writer.StartObject();
|
||||
|
||||
// status
|
||||
bool show_ok = true;
|
||||
writer.Key("status");
|
||||
switch (status.code()) {
|
||||
case TStatusCode::OK:
|
||||
writer.String("Success");
|
||||
break;
|
||||
case TStatusCode::PUBLISH_TIMEOUT:
|
||||
// treat PUBLISH_TIMEOUT as OK in mini load
|
||||
writer.String("Success");
|
||||
break;
|
||||
default:
|
||||
writer.String("Fail");
|
||||
show_ok = false;
|
||||
break;
|
||||
}
|
||||
// msg
|
||||
writer.Key("msg");
|
||||
if (status.ok() || show_ok) {
|
||||
writer.String("OK");
|
||||
} else {
|
||||
writer.String(status.get_error_msg().c_str());
|
||||
}
|
||||
writer.EndObject();
|
||||
return s.GetString();
|
||||
}
|
||||
|
||||
|
||||
std::string StreamLoadContext::brief(bool detail) const {
|
||||
std::stringstream ss;
|
||||
ss << "id=" << id << ", job id=" << job_id << ", txn id=" << txn_id << ", label=" << label;
|
||||
|
||||
@ -101,6 +101,9 @@ public:
|
||||
}
|
||||
|
||||
std::string to_json() const;
|
||||
// the old mini load result format is not same as stream load.
|
||||
// add this function for compatible with old mini load result format.
|
||||
std::string to_json_for_mini_load() const;
|
||||
|
||||
// return the brief info of this context.
|
||||
// also print the load source info if detail is set to true
|
||||
|
||||
@ -104,6 +104,7 @@ Status TabletsChannel::open(const PTabletWriterOpenRequest& params) {
|
||||
// Normal case, already open by other sender
|
||||
return Status::OK();
|
||||
}
|
||||
LOG(INFO) << "open tablets channel: " << _key;
|
||||
_txn_id = params.txn_id();
|
||||
_index_id = params.index_id();
|
||||
_schema = new OlapTableSchemaParam();
|
||||
@ -170,6 +171,7 @@ Status TabletsChannel::close(int sender_id, bool* finished,
|
||||
*finished = (_num_remaining_senders == 0);
|
||||
return _close_status;
|
||||
}
|
||||
LOG(INFO) << "close tablets channel: " << _key;
|
||||
for (auto pid : partition_ids) {
|
||||
_partition_ids.emplace(pid);
|
||||
}
|
||||
@ -251,7 +253,6 @@ TabletWriterMgr::~TabletWriterMgr() {
|
||||
|
||||
Status TabletWriterMgr::open(const PTabletWriterOpenRequest& params) {
|
||||
TabletsChannelKey key(params.id(), params.index_id());
|
||||
LOG(INFO) << "open tablets writer channel: " << key;
|
||||
std::shared_ptr<TabletsChannel> channel;
|
||||
{
|
||||
std::lock_guard<std::mutex> l(_lock);
|
||||
|
||||
Reference in New Issue
Block a user