fix the problem of losing extra info in the case of multiple 2.0 protocol packages
This commit is contained in:
@ -397,6 +397,18 @@ inline int Ob20ProtocolProcessor::process_ob20_packet(ObProto20PktContext& conte
|
|||||||
mysql_data_size, ipacket, need_decode_more))) {
|
mysql_data_size, ipacket, need_decode_more))) {
|
||||||
LOG_ERROR("fail to process fragment mysql packet", KP(mysql_data_start),
|
LOG_ERROR("fail to process fragment mysql packet", KP(mysql_data_start),
|
||||||
K(mysql_data_size), K(need_decode_more), K(ret));
|
K(mysql_data_size), K(need_decode_more), K(ret));
|
||||||
|
} else if (!context.extra_info_.exist_extra_info()
|
||||||
|
&& pkt20->get_extra_info().exist_extra_info()) {
|
||||||
|
char* tmp_buffer = NULL;
|
||||||
|
int64_t total_len = pkt20->get_extra_info().get_total_len();
|
||||||
|
if (OB_ISNULL(tmp_buffer = reinterpret_cast<char *>(context.arena_.alloc(total_len)))) {
|
||||||
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||||
|
LOG_ERROR("no memory available", "alloc_size", total_len, K(ret));
|
||||||
|
} else if (OB_FAIL(context.extra_info_.assign(pkt20->get_extra_info(), tmp_buffer, total_len))) {
|
||||||
|
LOG_ERROR("failed to deep copy extra info", K(ret));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// do nothing
|
||||||
}
|
}
|
||||||
|
|
||||||
if (OB_FAIL(ret)) {
|
if (OB_FAIL(ret)) {
|
||||||
@ -415,7 +427,16 @@ inline int Ob20ProtocolProcessor::process_ob20_packet(ObProto20PktContext& conte
|
|||||||
ObMySQLRawPacket *input_packet = reinterpret_cast<ObMySQLRawPacket *>(ipacket);
|
ObMySQLRawPacket *input_packet = reinterpret_cast<ObMySQLRawPacket *>(ipacket);
|
||||||
input_packet->set_can_reroute_pkt(pkt20->get_flags().is_proxy_reroute());
|
input_packet->set_can_reroute_pkt(pkt20->get_flags().is_proxy_reroute());
|
||||||
input_packet->set_is_weak_read(pkt20->get_flags().is_weak_read());
|
input_packet->set_is_weak_read(pkt20->get_flags().is_weak_read());
|
||||||
input_packet->set_extra_info(pkt20->get_extra_info());
|
|
||||||
|
const int64_t t_len = context.extra_info_.get_total_len();
|
||||||
|
char *t_buffer = NULL;
|
||||||
|
if (OB_ISNULL(t_buffer = reinterpret_cast<char *>(pool.alloc(t_len)))) {
|
||||||
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||||
|
LOG_ERROR("no memory available", "alloc_size", t_len, K(ret));
|
||||||
|
} else if (OB_FAIL(input_packet->extra_info_.assign(context.extra_info_, t_buffer, t_len))) {
|
||||||
|
LOG_ERROR("failed to assign extra info", K(ret));
|
||||||
|
}
|
||||||
|
|
||||||
input_packet->set_txn_free_route(pkt20->get_flags().txn_free_route());
|
input_packet->set_txn_free_route(pkt20->get_flags().txn_free_route());
|
||||||
context.reset();
|
context.reset();
|
||||||
// set again for sending response
|
// set again for sending response
|
||||||
|
24
deps/oblib/src/rpc/obmysql/ob_mysql_packet.cpp
vendored
24
deps/oblib/src/rpc/obmysql/ob_mysql_packet.cpp
vendored
@ -177,6 +177,30 @@ int ObMySQLRawPacket::serialize(char *buf, const int64_t length, int64_t &pos) c
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int Ob20ExtraInfo::assign(const Ob20ExtraInfo &other, char* buf, int64_t buf_len)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
uint64_t total_len = other.get_total_len();
|
||||||
|
if (total_len > buf_len) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
SERVER_LOG(ERROR, "invalid alloc size", K(total_len), K(ret));
|
||||||
|
} else {
|
||||||
|
uint64_t len = 0;
|
||||||
|
MEMCPY(buf+len, other.trace_info_.ptr(), other.trace_info_.length());
|
||||||
|
trace_info_.assign_ptr(buf+len, other.trace_info_.length());
|
||||||
|
len += other.trace_info_.length();
|
||||||
|
|
||||||
|
MEMCPY(buf+len, other.sync_sess_info_.ptr(), other.sync_sess_info_.length());
|
||||||
|
sync_sess_info_.assign_ptr(buf+len, other.sync_sess_info_.length());
|
||||||
|
len += other.sync_sess_info_.length();
|
||||||
|
|
||||||
|
MEMCPY(buf+len, other.full_link_trace_.ptr(), other.full_link_trace_.length());
|
||||||
|
full_link_trace_.assign_ptr(buf+len, other.full_link_trace_.length());
|
||||||
|
len += other.full_link_trace_.length();
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
char const *get_info_func_name(const ObInformationFunctions func)
|
char const *get_info_func_name(const ObInformationFunctions func)
|
||||||
{
|
{
|
||||||
const char *str = NULL;
|
const char *str = NULL;
|
||||||
|
8
deps/oblib/src/rpc/obmysql/ob_mysql_packet.h
vendored
8
deps/oblib/src/rpc/obmysql/ob_mysql_packet.h
vendored
@ -360,6 +360,11 @@ public:
|
|||||||
bool exist_full_link_trace() const { return !full_link_trace_.empty(); }
|
bool exist_full_link_trace() const { return !full_link_trace_.empty(); }
|
||||||
const ObString& get_sync_sess_info() const { return sync_sess_info_; }
|
const ObString& get_sync_sess_info() const { return sync_sess_info_; }
|
||||||
const ObString& get_full_link_trace() const { return full_link_trace_; }
|
const ObString& get_full_link_trace() const { return full_link_trace_; }
|
||||||
|
bool exist_extra_info() {return !sync_sess_info_.empty() || !full_link_trace_.empty() || exist_trace_info_;}
|
||||||
|
bool exist_extra_info() const {return !sync_sess_info_.empty() || !full_link_trace_.empty() || exist_trace_info_;}
|
||||||
|
int assign(const Ob20ExtraInfo &other, char* buf, int64_t len);
|
||||||
|
int64_t get_total_len() {return trace_info_.length() + sync_sess_info_.length() + full_link_trace_.length();}
|
||||||
|
int64_t get_total_len() const {return trace_info_.length() + sync_sess_info_.length() + full_link_trace_.length();}
|
||||||
TO_STRING_KV(K_(extra_len), K_(exist_trace_info), K_(trace_info),
|
TO_STRING_KV(K_(extra_len), K_(exist_trace_info), K_(trace_info),
|
||||||
K_(sync_sess_info), K_(full_link_trace));
|
K_(sync_sess_info), K_(full_link_trace));
|
||||||
};
|
};
|
||||||
@ -511,9 +516,9 @@ public:
|
|||||||
inline void set_txn_free_route(const bool txn_free_route);
|
inline void set_txn_free_route(const bool txn_free_route);
|
||||||
inline bool txn_free_route() const;
|
inline bool txn_free_route() const;
|
||||||
|
|
||||||
inline void set_extra_info(const Ob20ExtraInfo &extra_info) { extra_info_ = extra_info; }
|
|
||||||
inline const Ob20ExtraInfo &get_extra_info() const { return extra_info_; }
|
inline const Ob20ExtraInfo &get_extra_info() const { return extra_info_; }
|
||||||
bool exist_trace_info() const { return extra_info_.exist_trace_info_; }
|
bool exist_trace_info() const { return extra_info_.exist_trace_info_; }
|
||||||
|
bool exist_extra_info() const { return extra_info_.exist_extra_info(); }
|
||||||
const common::ObString &get_trace_info() const { return extra_info_.trace_info_; }
|
const common::ObString &get_trace_info() const { return extra_info_.trace_info_; }
|
||||||
virtual int64_t get_serialize_size() const;
|
virtual int64_t get_serialize_size() const;
|
||||||
|
|
||||||
@ -547,6 +552,7 @@ private:
|
|||||||
bool can_reroute_pkt_;
|
bool can_reroute_pkt_;
|
||||||
bool is_weak_read_;
|
bool is_weak_read_;
|
||||||
bool txn_free_route_;
|
bool txn_free_route_;
|
||||||
|
public:
|
||||||
Ob20ExtraInfo extra_info_;
|
Ob20ExtraInfo extra_info_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -137,7 +137,7 @@ private:
|
|||||||
class ObProto20PktContext
|
class ObProto20PktContext
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
ObProto20PktContext() { reset(); }
|
ObProto20PktContext() : arena_(common::ObModIds::LIB_MULTI_PACKETS){ reset(); }
|
||||||
~ObProto20PktContext() { }
|
~ObProto20PktContext() { }
|
||||||
void reset()
|
void reset()
|
||||||
{
|
{
|
||||||
@ -145,18 +145,25 @@ public:
|
|||||||
is_multi_pkt_ = false;
|
is_multi_pkt_ = false;
|
||||||
proto20_last_request_id_ = 0;
|
proto20_last_request_id_ = 0;
|
||||||
proto20_last_pkt_seq_ = 0;
|
proto20_last_pkt_seq_ = 0;
|
||||||
|
extra_info_.reset();
|
||||||
|
arena_.reset(); //fast free memory
|
||||||
}
|
}
|
||||||
|
|
||||||
TO_STRING_KV(K_(comp_last_pkt_seq),
|
TO_STRING_KV(K_(comp_last_pkt_seq),
|
||||||
K_(is_multi_pkt),
|
K_(is_multi_pkt),
|
||||||
K_(proto20_last_request_id),
|
K_(proto20_last_request_id),
|
||||||
K_(proto20_last_pkt_seq));
|
K_(proto20_last_pkt_seq),
|
||||||
|
K_(extra_info),
|
||||||
|
"used", arena_.used(),
|
||||||
|
"total", arena_.total());
|
||||||
|
|
||||||
public:
|
public:
|
||||||
uint8_t comp_last_pkt_seq_;
|
uint8_t comp_last_pkt_seq_;
|
||||||
bool is_multi_pkt_;
|
bool is_multi_pkt_;
|
||||||
uint32_t proto20_last_request_id_;
|
uint32_t proto20_last_request_id_;
|
||||||
uint8_t proto20_last_pkt_seq_;
|
uint8_t proto20_last_pkt_seq_;
|
||||||
|
Ob20ExtraInfo extra_info_;
|
||||||
|
common::ObArenaAllocator arena_;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
DISALLOW_COPY_AND_ASSIGN(ObProto20PktContext);
|
DISALLOW_COPY_AND_ASSIGN(ObProto20PktContext);
|
||||||
|
Reference in New Issue
Block a user