fix latch_hold wrong
This commit is contained in:
16
deps/oblib/src/lib/lock/ob_latch.h
vendored
16
deps/oblib/src/lib/lock/ob_latch.h
vendored
@ -230,21 +230,19 @@ public:
|
||||
OB_INLINE static int reg_lock(uint32_t* latch_addr)
|
||||
{
|
||||
int ret = -1;
|
||||
if (max_lock_slot_idx < sizeof(current_locks) / sizeof(uint32_t*)) {
|
||||
ret = max_lock_slot_idx++;
|
||||
ret = (max_lock_slot_idx++) % ARRAYSIZEOF(current_locks);
|
||||
current_locks[ret] = latch_addr;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
OB_INLINE static int unreg_lock(uint32_t* latch_addr)
|
||||
{
|
||||
int ret = -1;
|
||||
// for (int8_t i = max_lock_slot_idx - 1; -1 == ret && i >= 0; --i) {
|
||||
// if (latch_addr == current_locks[i]) {
|
||||
// ret = i;
|
||||
// current_locks[i] = max_lock_slot_idx > 0 ? current_locks[--max_lock_slot_idx] : nullptr;
|
||||
// }
|
||||
// }
|
||||
UNUSED(latch_addr);
|
||||
//if (max_lock_slot_idx > 0
|
||||
// && latch_addr == current_locks[(max_lock_slot_idx - 1) % ARRAYSIZEOF(current_locks)]) {
|
||||
// ret = (--max_lock_slot_idx % ARRAYSIZEOF(current_locks));
|
||||
// current_locks[ret] = nullptr;
|
||||
//}
|
||||
return ret;
|
||||
}
|
||||
OB_INLINE static void clear_lock()
|
||||
|
||||
25
deps/oblib/src/lib/net/ob_addr.cpp
vendored
25
deps/oblib/src/lib/net/ob_addr.cpp
vendored
@ -19,6 +19,7 @@
|
||||
#include <arpa/inet.h>
|
||||
#include <netdb.h>
|
||||
#include "lib/utility/utility.h"
|
||||
#include "include/easy_define.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -28,6 +29,21 @@ namespace common
|
||||
// --------------------------------------------------------
|
||||
// class ObAddr implements
|
||||
// --------------------------------------------------------
|
||||
ObAddr::ObAddr(const easy_addr_t& addr)
|
||||
{
|
||||
if (addr.family == AF_INET) {
|
||||
version_ = IPV4;
|
||||
ip_.v4_ = addr.u.addr;
|
||||
} else if (addr.family == AF_INET6) {
|
||||
version_ = IPV6;
|
||||
MEMCPY(ip_.v6_, addr.u.addr6, sizeof(ip_.v6_));
|
||||
} else if (addr.family == AF_UNIX) {
|
||||
version_ = UNIX;
|
||||
MEMCPY(ip_.unix_path_, addr.u.unix_path, sizeof(ip_.unix_path_));
|
||||
}
|
||||
port_ = addr.port;
|
||||
}
|
||||
|
||||
int ObAddr::convert_ipv4_addr(const char *ip)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -529,5 +545,14 @@ OB_DEF_SERIALIZE_SIZE(ObAddr)
|
||||
OB_SERIALIZE_MEMBER(ObAddrWithSeq,
|
||||
server_addr_,
|
||||
server_seq_);
|
||||
|
||||
DEF_TO_STRING(ObAddrWithSeq)
|
||||
{
|
||||
int64_t pos = 0;
|
||||
J_OBJ_START();
|
||||
J_KV(K_(server_addr), K_(server_seq));
|
||||
J_OBJ_END();
|
||||
return pos;
|
||||
}
|
||||
} // end namespace common
|
||||
} // end namespace oceanbase
|
||||
|
||||
17
deps/oblib/src/lib/net/ob_addr.h
vendored
17
deps/oblib/src/lib/net/ob_addr.h
vendored
@ -14,11 +14,11 @@
|
||||
#define _OCEABASE_LIB_NET_OB_ADDR_H_
|
||||
|
||||
#include "lib/ob_define.h"
|
||||
#include "lib/string/ob_string.h"
|
||||
#include "util/easy_inet.h"
|
||||
#include "lib/utility/ob_unify_serialize.h"
|
||||
#include "lib/container/ob_se_array.h"
|
||||
#include "lib/container/ob_array_serialization.h"
|
||||
#include "lib/ob_name_id_def.h"
|
||||
#include "lib/utility/ob_print_utils.h"
|
||||
|
||||
struct easy_addr_t;
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -29,6 +29,7 @@ class ObBatchP;
|
||||
};
|
||||
namespace common
|
||||
{
|
||||
class ObString;
|
||||
|
||||
class ObAddr
|
||||
{
|
||||
@ -71,6 +72,8 @@ public:
|
||||
ip_.v4_ = ip;
|
||||
}
|
||||
|
||||
explicit ObAddr(const easy_addr_t& addr);
|
||||
|
||||
void reset()
|
||||
{
|
||||
port_ = 0;
|
||||
@ -142,10 +145,6 @@ private:
|
||||
int32_t port_;
|
||||
}; // end of class ObAddr
|
||||
|
||||
typedef ObSEArray<ObAddr, 3> ObAddrArray;
|
||||
typedef ObSArray<ObAddr> ObAddrSArray;
|
||||
typedef ObIArray<ObAddr> ObAddrIArray;
|
||||
|
||||
inline bool ObAddr::is_valid() const
|
||||
{
|
||||
bool valid = false;
|
||||
@ -286,7 +285,7 @@ public:
|
||||
|
||||
const ObAddr &get_addr() const { return server_addr_; }
|
||||
const int64_t &get_seq() const { return server_seq_; }
|
||||
TO_STRING_KV(K_(server_addr), K_(server_seq));
|
||||
DECLARE_TO_STRING;
|
||||
private:
|
||||
ObAddr server_addr_;
|
||||
int64_t server_seq_;
|
||||
|
||||
1
deps/oblib/src/lib/oblog/ob_log.cpp
vendored
1
deps/oblib/src/lib/oblog/ob_log.cpp
vendored
@ -28,6 +28,7 @@
|
||||
#include "lib/utility/ob_fast_convert.h"
|
||||
#include "lib/utility/ob_rate_limiter.h"
|
||||
#include "lib/container/ob_vector.h"
|
||||
#include "lib/container/ob_se_array.h"
|
||||
#include "lib/allocator/ob_vslice_alloc.h"
|
||||
#include "lib/allocator/ob_fifo_allocator.h"
|
||||
#include "common/ob_smart_var.h"
|
||||
|
||||
@ -21,6 +21,7 @@
|
||||
#include "lib/signal/ob_signal_struct.h"
|
||||
#include "lib/signal/ob_signal_utils.h"
|
||||
#include "lib/signal/ob_signal_worker.h"
|
||||
#include "lib/utility/ob_hang_fatal_error.h"
|
||||
#include "common/ob_common_utility.h"
|
||||
|
||||
namespace oceanbase
|
||||
|
||||
2
deps/oblib/src/lib/thread/thread.cpp
vendored
2
deps/oblib/src/lib/thread/thread.cpp
vendored
@ -34,7 +34,7 @@ thread_local int64_t Thread::loop_ts_ = 0;
|
||||
thread_local pthread_t Thread::thread_joined_ = 0;
|
||||
thread_local int64_t Thread::sleep_us_ = 0;
|
||||
thread_local uint8_t Thread::is_blocking_ = 0;
|
||||
thread_local char* Thread::rpc_dest_addr_ = nullptr;
|
||||
thread_local ObAddr Thread::rpc_dest_addr_;
|
||||
thread_local Thread* Thread::current_thread_ = nullptr;
|
||||
int64_t Thread::total_thread_count_ = 0;
|
||||
|
||||
|
||||
3
deps/oblib/src/lib/thread/thread.h
vendored
3
deps/oblib/src/lib/thread/thread.h
vendored
@ -17,6 +17,7 @@
|
||||
#include "lib/time/ob_time_utility.h"
|
||||
#include "lib/utility/ob_macro_utils.h"
|
||||
#include "lib/lock/ob_latch.h"
|
||||
#include "lib/net/ob_addr.h"
|
||||
|
||||
namespace oceanbase {
|
||||
namespace lib {
|
||||
@ -68,7 +69,7 @@ public:
|
||||
static thread_local pthread_t thread_joined_;
|
||||
static thread_local int64_t sleep_us_;
|
||||
static thread_local uint8_t is_blocking_;
|
||||
static thread_local char* rpc_dest_addr_;
|
||||
static thread_local ObAddr rpc_dest_addr_;
|
||||
private:
|
||||
static void* __th_start(void *th);
|
||||
void destroy_stack();
|
||||
|
||||
11
deps/oblib/src/rpc/frame/ob_req_transport.cpp
vendored
11
deps/oblib/src/rpc/frame/ob_req_transport.cpp
vendored
@ -21,6 +21,7 @@
|
||||
#include "lib/oblog/ob_log.h"
|
||||
#include "lib/utility/ob_macro_utils.h"
|
||||
#include "lib/worker.h"
|
||||
#include "lib/net/ob_addr.h"
|
||||
#include "rpc/obrpc/ob_rpc_packet.h"
|
||||
#include "rpc/obrpc/ob_rpc_stat.h"
|
||||
#include "rpc/obrpc/ob_net_keepalive.h"
|
||||
@ -420,7 +421,6 @@ ObPacket *ObReqTransport::send_session(easy_session_t *s) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObPacket *pkt = NULL;
|
||||
char buff[OB_SERVER_ADDR_STR_LEN] = {'\0'};
|
||||
|
||||
if (OB_ISNULL(s)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
@ -440,14 +440,13 @@ ObPacket *ObReqTransport::send_session(easy_session_t *s) const
|
||||
s->addr.cidx = balance_assign(s);
|
||||
}
|
||||
|
||||
easy_inet_addr_to_str(&s->addr, buff, OB_SERVER_ADDR_STR_LEN);
|
||||
lib::Thread::rpc_dest_addr_ = buff;
|
||||
IGNORE_RETURN new (&lib::Thread::rpc_dest_addr_) ObAddr(s->addr);
|
||||
pkt = reinterpret_cast<ObPacket*>(easy_client_send(eio_, s->addr, s));
|
||||
lib::Thread::rpc_dest_addr_ = nullptr;
|
||||
lib::Thread::rpc_dest_addr_.reset();
|
||||
if (NULL == pkt) {
|
||||
char buff[OB_SERVER_ADDR_STR_LEN] = {'\0'};
|
||||
easy_inet_addr_to_str(&s->addr, buff, OB_SERVER_ADDR_STR_LEN);
|
||||
SERVER_LOG(WARN, "send packet fail", "dst", buff, KP(s));
|
||||
} else {
|
||||
SERVER_LOG(DEBUG, "send session successfully", "dst", buff);
|
||||
}
|
||||
}
|
||||
return pkt;
|
||||
|
||||
@ -11,6 +11,7 @@
|
||||
*/
|
||||
|
||||
#include "rpc/obrpc/ob_easy_rpc_request_operator.h"
|
||||
#include "lib/utility/utility.h"
|
||||
|
||||
using namespace oceanbase::rpc;
|
||||
using namespace oceanbase::common;
|
||||
|
||||
2
deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.h
vendored
2
deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.h
vendored
@ -132,6 +132,7 @@ public:
|
||||
if (OB_LS_FETCH_LOG2 == pcode) {
|
||||
pnio_group_id = ObPocRpcServer::RATELIMIT_PNIO_GROUP;
|
||||
}
|
||||
IGNORE_RETURN new (&lib::Thread::rpc_dest_addr_) ObAddr(addr);
|
||||
if (OB_FAIL(rpc_encode_req(proxy, pool, pcode, args, opts, req, req_sz, false))) {
|
||||
RPC_LOG(WARN, "rpc encode req fail", K(ret));
|
||||
} else if(OB_FAIL(check_blacklist(addr))) {
|
||||
@ -155,6 +156,7 @@ public:
|
||||
} else if (OB_FAIL(rpc_decode_resp(resp, resp_sz, out, resp_pkt, rcode))) {
|
||||
RPC_LOG(WARN, "rpc decode response fail", KP(resp), K(resp_sz), K(ret));
|
||||
}
|
||||
lib::Thread::rpc_dest_addr_.reset();
|
||||
if (rcode.rcode_ != OB_DESERIALIZE_ERROR) {
|
||||
int wb_ret = OB_SUCCESS;
|
||||
if (common::OB_SUCCESS != (wb_ret = log_user_error_and_warn(rcode))) {
|
||||
|
||||
1
deps/oblib/src/rpc/obrpc/ob_rpc_stat.h
vendored
1
deps/oblib/src/rpc/obrpc/ob_rpc_stat.h
vendored
@ -18,6 +18,7 @@
|
||||
#include "lib/random/ob_random.h"
|
||||
#include "rpc/obrpc/ob_rpc_packet.h"
|
||||
#include "lib/list/ob_dlist.h"
|
||||
#include "lib/worker.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
|
||||
@ -1893,14 +1893,23 @@ int dump_thread_info(lua_State *L)
|
||||
gen.next_column(loop_ts);
|
||||
// latch_hold
|
||||
{
|
||||
char addrs[256];
|
||||
GET_OTHER_TSI_ADDR(uint32_t**, locks_addr, &ObLatch::current_locks);
|
||||
GET_OTHER_TSI_ADDR(int8_t, slot_cnt, &ObLatch::max_lock_slot_idx)
|
||||
const int64_t cnt = std::min(ARRAYSIZEOF(ObLatch::current_locks), (int64_t)slot_cnt);
|
||||
locks_addr = (uint32_t**)(thread_base + locks_addr_offset);
|
||||
char addrs[256];
|
||||
addrs[0] = 0;
|
||||
for (auto i = 0, offset1 = 0; i < slot_cnt; ++i) {
|
||||
if (OB_NOT_NULL(locks_addr[i]) && offset1 < 256) {
|
||||
offset1 += snprintf(addrs + offset1, 256 - offset1, "%p ", locks_addr[i]);
|
||||
for (int64_t i = 0, j = 0; i < cnt; ++i) {
|
||||
int64_t idx = (slot_cnt + i) % ARRAYSIZEOF(ObLatch::current_locks);
|
||||
if (OB_NOT_NULL(locks_addr[idx]) && j < 256) {
|
||||
bool has_segv = false;
|
||||
uint32_t val = 0;
|
||||
do_with_crash_restore([&] {
|
||||
val = *locks_addr[idx];
|
||||
}, has_segv);
|
||||
if (!has_segv && 0 != val) {
|
||||
j += snprintf(addrs + j, 256 - j, "%p ", locks_addr[idx]);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (0 == addrs[0]) {
|
||||
|
||||
@ -170,17 +170,19 @@ int ObAllVirtualThread::inner_get_next_row(common::ObNewRow *&row)
|
||||
case LATCH_HOLD: {
|
||||
GET_OTHER_TSI_ADDR(uint32_t**, locks_addr, &ObLatch::current_locks);
|
||||
GET_OTHER_TSI_ADDR(int8_t, slot_cnt, &ObLatch::max_lock_slot_idx)
|
||||
const int64_t cnt = std::min(ARRAYSIZEOF(ObLatch::current_locks), (int64_t)slot_cnt);
|
||||
locks_addr = (uint32_t**)(thread_base + locks_addr_offset);
|
||||
locks_addr_[0] = 0;
|
||||
for (auto i = 0, j = 0; i < slot_cnt; ++i) {
|
||||
if (OB_NOT_NULL(locks_addr[i])) {
|
||||
for (int64_t i = 0, j = 0; i < cnt; ++i) {
|
||||
int64_t idx = (slot_cnt + i) % ARRAYSIZEOF(ObLatch::current_locks);
|
||||
if (OB_NOT_NULL(locks_addr[idx]) && j < 256) {
|
||||
bool has_segv = false;
|
||||
uint32_t val = 0;
|
||||
do_with_crash_restore([&] {
|
||||
val = *locks_addr[i];
|
||||
val = *locks_addr[idx];
|
||||
}, has_segv);
|
||||
if (!has_segv && 0 != val && j < 256) {
|
||||
j += snprintf(locks_addr_ + j, 256 - j, "%p ", locks_addr[i]);
|
||||
if (!has_segv && 0 != val) {
|
||||
j += snprintf(locks_addr_ + j, 256 - j, "%p ", locks_addr[idx]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -42,6 +42,8 @@ namespace oceanbase
|
||||
{
|
||||
namespace common
|
||||
{
|
||||
using ObAddrIArray = ObIArray<ObAddr>;
|
||||
using ObAddrArray = ObSEArray<ObAddr, 3>;
|
||||
class ObMySQLProxy;
|
||||
class ObAddr;
|
||||
class ObMySQLTransaction;
|
||||
@ -116,7 +118,6 @@ private:
|
||||
|
||||
class ObDDLService
|
||||
{
|
||||
|
||||
public:
|
||||
typedef std::pair<share::ObLSID, common::ObTabletID> LSTabletID;
|
||||
public:
|
||||
|
||||
@ -15,6 +15,7 @@
|
||||
|
||||
#include <pthread.h>
|
||||
#include "lib/compress/ob_compressor_pool.h"
|
||||
#include "lib/container/ob_array_serialization.h"
|
||||
#include "share/config/ob_config_helper.h"
|
||||
#include "share/ob_encryption_util.h"
|
||||
#include "share/parameter/ob_parameter_attr.h"
|
||||
|
||||
@ -22,6 +22,7 @@
|
||||
#include "lib/guard/ob_shared_guard.h"
|
||||
#include "lib/utility/utility.h"
|
||||
#include "lib/utility/ob_print_utils.h"
|
||||
#include "lib/container/ob_array_serialization.h"
|
||||
#include "share/ob_occam_time_guard.h"
|
||||
|
||||
#define DETECT_TIME_GUARD(threshold) TIMEGUARD_INIT(DETECT, threshold, 10_s)
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
#include "common/ob_zone.h" // for ObZone
|
||||
#include "lib/net/ob_addr.h" // for ObAddr
|
||||
#include "lib/ob_replica_define.h" //for ObReplicaProperty
|
||||
#include "lib/container/ob_array_serialization.h" // for ObSArray
|
||||
#include "common/ob_role.h" // for ObRole
|
||||
#include "common/ob_member_list.h" // for ObMemberList
|
||||
#include "share/restore/ob_ls_restore_status.h"
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
#include "lib/lock/ob_latch.h"
|
||||
#include "lib/net/ob_addr.h"
|
||||
#include "lib/task/ob_timer.h"
|
||||
#include "lib/container/ob_se_array.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
|
||||
@ -11,6 +11,9 @@
|
||||
*/
|
||||
|
||||
#include "share/ob_proposal_id.h"
|
||||
|
||||
#include "lib/json/ob_yson_encode.h"
|
||||
#include "lib/ob_name_id_def.h"
|
||||
#include "share/ob_cluster_version.h"
|
||||
|
||||
namespace oceanbase
|
||||
|
||||
@ -218,6 +218,7 @@ private:
|
||||
|
||||
class GenPreCommitSetHashMapFunctor
|
||||
{
|
||||
using ObAddrArray = ObSEArray<ObAddr, 3>;
|
||||
public:
|
||||
GenPreCommitSetHashMapFunctor(ObAddrArray &addr_array, const int64_t gts)
|
||||
: addr_array_(addr_array), gts_(gts), err_(OB_SUCCESS) {}
|
||||
@ -318,6 +319,7 @@ private:
|
||||
|
||||
class ObDupTablePartitionMgr
|
||||
{
|
||||
using ObAddrArray = ObSEArray<ObAddr, 3>;
|
||||
public:
|
||||
ObDupTablePartitionMgr() : dup_table_lease_infos_(1 << 7) { reset(); }
|
||||
~ObDupTablePartitionMgr() { destroy(); }
|
||||
@ -426,6 +428,7 @@ public:
|
||||
|
||||
class ObPreCommitTask : public ObTransTask
|
||||
{
|
||||
using ObAddrArray = ObSEArray<ObAddr, 3>;
|
||||
public:
|
||||
enum ObPreCommitTaskStatus {
|
||||
UNKNOWN = -1,
|
||||
|
||||
Reference in New Issue
Block a user