[FEAT MERGE] rpc authentication improvement

This commit is contained in:
obdev
2024-02-07 16:35:49 +00:00
committed by ob-robot
parent ddfec5fc23
commit 04ba1bb648
44 changed files with 1593 additions and 77 deletions

View File

@ -263,6 +263,7 @@ ob_set_subtarget(oblib_lib utility
utility/ob_backtrace.cpp
utility/ob_proto_trans_util.cpp
utility/ob_unify_serialize.cpp
utility/ob_rpc_authentication_utility.cpp
)
ob_set_subtarget(oblib_lib ash

View File

@ -0,0 +1,92 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX LIB
#include "rpc/obrpc/ob_rpc_packet.h"
#include "lib/ob_errno.h"
#include "lib/utility/ob_print_utils.h"
using namespace oceanbase::obrpc;
namespace oceanbase
{
namespace common
{
extern "C" {
int ob_decode_pcode(const char *buf, const int64_t data_len, int64_t *pos, uint32_t *val)
{
int ret = (NULL != buf && NULL != pos && data_len - *pos >= 4) ? OB_SUCCESS : OB_DESERIALIZE_ERROR;
if (OB_SUCC(ret)) {
*val = ((static_cast<uint32_t>(*(buf + (*pos)++))) & 0xff) << 24;
*val |= ((static_cast<uint32_t>(*(buf + (*pos)++))) & 0xff) << 16;
*val |= ((static_cast<uint32_t>(*(buf + (*pos)++))) & 0xff) << 8;
*val |= ((static_cast<uint32_t>(*(buf + (*pos)++))) & 0xff);
}
return ret;
}
int ob_is_bypass_pcode(uint32_t pcode)
{
//table_api pcode directly bypass
int bret = 0;
if (pcode >= OB_TABLE_API_LOGIN && pcode <= OB_TABLE_API_MOVE) { //table_api
bret = 1;
}
#ifdef OB_BUILD_ARBITRATION
else if (OB_CREATE_ARB == pcode || OB_DELETE_ARB == pcode
|| OB_LOG_FORCE_CLEAR_ARB_CLUSTER_INFO == pcode
|| OB_ARB_CLUSTER_OP == pcode) { //arb
bret = 1;
}
#endif
else if (OB_SET_MEMBER_LIST == pcode || OB_ARB_GC_NOTIFY == pcode
|| OB_SET_CONFIG == pcode) { //arb
bret = 1;
} else if (pcode >= OB_LOG_PUSH_REQ && pcode <= OB_LOG_BATCH_FETCH_RESP) { //arb
bret = 1;
} else if (OB_LOG_REQ_START_LSN_BY_TS == pcode || OB_LS_FETCH_LOG2 == pcode
||OB_LS_FETCH_MISSING_LOG == pcode) { //libcdc or standby cluster
bret = 1;
}
return bret;
}
int ob_is_tableapi_pcode(uint32_t pcode)
{
int bret = 0;
if (pcode >= OB_TABLE_API_LOGIN && pcode <= OB_TABLE_API_MOVE) { //table_api
bret = 1;
}
return bret;
}
int ob_judge_is_tableapi_pcode_from_raw_packet(const char *buf, ssize_t data_len)
{
int bret = 0;
int ret = OB_SUCCESS;
if (NULL != buf) {
int demand_length = OB_NET_HEADER_LENGTH + 4;
if (data_len >= demand_length && 0 == memcmp(buf, ObRpcPacket::MAGIC_HEADER_FLAG,
sizeof(ObRpcPacket::MAGIC_HEADER_FLAG))) {
int64_t pos = 0;
uint32_t pcode = 0;
if (OB_SUCC(ob_decode_pcode(buf + OB_NET_HEADER_LENGTH, 4, &pos, &pcode))) {
if (ob_is_tableapi_pcode(pcode)) {
bret = 1;
}
}
}
}
return bret;
}
}
}
}

View File

@ -1903,6 +1903,52 @@ int64_t get_level3_cache_size()
return l3_cache_size;
}
int extract_cert_expired_time(const char* cert, const int64_t cert_len, int64_t &expired_time)
{
int ret = OB_SUCCESS;
STACK_OF(X509_INFO) *chain = NULL;
BIO *cbio = NULL;
if (OB_ISNULL(cert)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("public cert from kms is null!", K(ret));
} else if (OB_ISNULL(cbio = BIO_new_mem_buf((void*)cert, cert_len))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("BIO_new_mem_buf failed", K(ret));
} else if (OB_ISNULL(chain = PEM_X509_INFO_read_bio(cbio, NULL, NULL, NULL))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("PEM_X509_INFO_read_bio failed", K(ret));
} else {
ASN1_TIME *notAfter = NULL;
X509_INFO *x509_info = NULL;
if (OB_ISNULL(x509_info = sk_X509_INFO_value(chain, 0))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get app cert failed!", K(ret));
} else if (OB_ISNULL((notAfter = X509_get_notAfter(x509_info->x509)))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("X509_get_notAfter failed",K(ret));
} else {
struct tm tm1;
memset (&tm1, 0, sizeof (tm1));
tm1.tm_year = (notAfter->data[ 0] - '0') * 10 + (notAfter->data[ 1] - '0') + 100;
tm1.tm_mon = (notAfter->data[ 2] - '0') * 10 + (notAfter->data[ 3] - '0') - 1;
tm1.tm_mday = (notAfter->data[ 4] - '0') * 10 + (notAfter->data[ 5] - '0');
tm1.tm_hour = (notAfter->data[ 6] - '0') * 10 + (notAfter->data[ 7] - '0');
tm1.tm_min = (notAfter->data[ 8] - '0') * 10 + (notAfter->data[ 9] - '0');
tm1.tm_sec = (notAfter->data[10] - '0') * 10 + (notAfter->data[11] - '0');
time_t expired_time_t = mktime(&tm1);
expired_time_t += (int)(mktime(localtime(&expired_time_t)) - mktime(gmtime(&expired_time_t)));
expired_time = expired_time_t * 1000000;
}
}
if (NULL != cbio) {
BIO_free(cbio);
}
if (NULL != chain) {
sk_X509_INFO_pop_free(chain, X509_INFO_free);
}
return ret;
}
} // end namespace common
} // end namespace oceanbase

View File

@ -1310,6 +1310,8 @@ void call_dtor(T *&ptr)
// OB_IO_ERROR Error executing system call
// OB_SUCCESS successfully executed
int is_dir_empty(const char *dirname, bool &is_empty);
int extract_cert_expired_time(const char* cert, const int64_t cert_len, int64_t &expired_time);
} // end namespace common
} // end namespace oceanbase

View File

@ -22,7 +22,9 @@ extern "C" {
};
#include "rpc/obrpc/ob_rpc_endec.h"
#define cfgi(k, v) atoi(getenv(k)?:v)
extern "C" {
int ussl_check_pcode_mismatch_connection(int fd, uint32_t pcode);
};
namespace oceanbase
{
namespace obrpc
@ -57,59 +59,64 @@ int ObPocServerHandleContext::create(int64_t resp_id, const char* buf, int64_t s
} else {
ObCurTraceId::set(tmp_pkt.get_trace_id());
obrpc::ObRpcPacketCode pcode = tmp_pkt.get_pcode();
auto &set = obrpc::ObRpcPacketSet::instance();
const char* pcode_label = set.label_of_idx(set.idx_of_pcode(pcode));
const int64_t pool_size = sizeof(ObPocServerHandleContext) + sizeof(ObRequest) + sizeof(ObRpcPacket) + alloc_payload_sz;
int64_t tenant_id = tmp_pkt.get_tenant_id();
if (OB_UNLIKELY(tmp_pkt.get_group_id() == OBCG_ELECTION)) {
tenant_id = OB_SERVER_TENANT_ID;
}
timeguard.click();
ObRpcMemPool* pool = ObRpcMemPool::create(tenant_id, pcode_label, pool_size);
void *temp = NULL;
#ifdef ERRSIM
THIS_WORKER.set_module_type(tmp_pkt.get_module_type());
#endif
if (OB_ISNULL(pool)) {
ret = common::OB_ALLOCATE_MEMORY_FAILED;
RPC_LOG(WARN, "create memory pool failed", K(tenant_id), K(pcode_label));
} else if (OB_ISNULL(temp = pool->alloc(sizeof(ObPocServerHandleContext) + sizeof(ObRequest)))){
ret = common::OB_ALLOCATE_MEMORY_FAILED;
RPC_LOG(WARN, "pool allocate memory failed", K(tenant_id), K(pcode_label));
if (OB_UNLIKELY(ussl_check_pcode_mismatch_connection(pn_get_fd(resp_id), pcode))) {
ret = OB_UNKNOWN_CONNECTION;
RPC_LOG(WARN, "rpc bypass connection received none bypass pcode", K(pcode), K(ret));
} else {
int64_t resp_expired_abs_us = ObTimeUtility::current_time() + tmp_pkt.get_timeout();
ctx = new(temp)ObPocServerHandleContext(*pool, resp_id, resp_expired_abs_us);
ctx->set_peer_unsafe();
req = new(ctx + 1)ObRequest(ObRequest::OB_RPC, ObRequest::TRANSPORT_PROTO_POC);
auto &set = obrpc::ObRpcPacketSet::instance();
const char* pcode_label = set.label_of_idx(set.idx_of_pcode(pcode));
const int64_t pool_size = sizeof(ObPocServerHandleContext) + sizeof(ObRequest) + sizeof(ObRpcPacket) + alloc_payload_sz;
int64_t tenant_id = tmp_pkt.get_tenant_id();
if (OB_UNLIKELY(tmp_pkt.get_group_id() == OBCG_ELECTION)) {
tenant_id = OB_SERVER_TENANT_ID;
}
timeguard.click();
ObRpcPacket* pkt = (ObRpcPacket*)pool->alloc(sizeof(ObRpcPacket) + alloc_payload_sz);
if (NULL == pkt) {
RPC_LOG(WARN, "pool allocate rpc packet memory failed", K(tenant_id), K(pcode_label));
ret = common::OB_ALLOCATE_MEMORY_FAILED;
} else {
MEMCPY(reinterpret_cast<void *>(pkt), reinterpret_cast<void *>(&tmp_pkt), sizeof(ObRpcPacket));
const char* packet_data = NULL;
if (alloc_payload_sz > 0) {
packet_data = reinterpret_cast<char *>(pkt + 1);
MEMCPY(const_cast<char*>(packet_data), tmp_pkt.get_cdata(), tmp_pkt.get_clen());
} else {
packet_data = tmp_pkt.get_cdata();
}
int64_t receive_ts = ObTimeUtility::current_time();
pkt->set_receive_ts(receive_ts);
pkt->set_content(packet_data, tmp_pkt.get_clen());
req->set_server_handle_context(ctx);
req->set_packet(pkt);
req->set_receive_timestamp(pkt->get_receive_ts());
req->set_request_arrival_time(pkt->get_receive_ts());
req->set_arrival_push_diff(common::ObTimeUtility::current_time());
ObRpcMemPool* pool = ObRpcMemPool::create(tenant_id, pcode_label, pool_size);
void *temp = NULL;
const int64_t fly_ts = receive_ts - pkt->get_timestamp();
if (fly_ts > oceanbase::common::OB_MAX_PACKET_FLY_TS && TC_REACH_TIME_INTERVAL(100 * 1000)) {
RPC_LOG(WARN, "PNIO packet wait too much time between proxy and server_cb", "pcode", pkt->get_pcode(),
"fly_ts", fly_ts, "send_timestamp", pkt->get_timestamp());
#ifdef ERRSIM
THIS_WORKER.set_module_type(tmp_pkt.get_module_type());
#endif
if (OB_ISNULL(pool)) {
ret = common::OB_ALLOCATE_MEMORY_FAILED;
RPC_LOG(WARN, "create memory pool failed", K(tenant_id), K(pcode_label));
} else if (OB_ISNULL(temp = pool->alloc(sizeof(ObPocServerHandleContext) + sizeof(ObRequest)))){
ret = common::OB_ALLOCATE_MEMORY_FAILED;
RPC_LOG(WARN, "pool allocate memory failed", K(tenant_id), K(pcode_label));
} else {
int64_t resp_expired_abs_us = ObTimeUtility::current_time() + tmp_pkt.get_timeout();
ctx = new(temp)ObPocServerHandleContext(*pool, resp_id, resp_expired_abs_us);
ctx->set_peer_unsafe();
req = new(ctx + 1)ObRequest(ObRequest::OB_RPC, ObRequest::TRANSPORT_PROTO_POC);
timeguard.click();
ObRpcPacket* pkt = (ObRpcPacket*)pool->alloc(sizeof(ObRpcPacket) + alloc_payload_sz);
if (NULL == pkt) {
RPC_LOG(WARN, "pool allocate rpc packet memory failed", K(tenant_id), K(pcode_label));
ret = common::OB_ALLOCATE_MEMORY_FAILED;
} else {
MEMCPY(reinterpret_cast<void *>(pkt), reinterpret_cast<void *>(&tmp_pkt), sizeof(ObRpcPacket));
const char* packet_data = NULL;
if (alloc_payload_sz > 0) {
packet_data = reinterpret_cast<char *>(pkt + 1);
MEMCPY(const_cast<char*>(packet_data), tmp_pkt.get_cdata(), tmp_pkt.get_clen());
} else {
packet_data = tmp_pkt.get_cdata();
}
int64_t receive_ts = ObTimeUtility::current_time();
pkt->set_receive_ts(receive_ts);
pkt->set_content(packet_data, tmp_pkt.get_clen());
req->set_server_handle_context(ctx);
req->set_packet(pkt);
req->set_receive_timestamp(pkt->get_receive_ts());
req->set_request_arrival_time(pkt->get_receive_ts());
req->set_arrival_push_diff(common::ObTimeUtility::current_time());
const int64_t fly_ts = receive_ts - pkt->get_timestamp();
if (fly_ts > oceanbase::common::OB_MAX_PACKET_FLY_TS && TC_REACH_TIME_INTERVAL(100 * 1000)) {
RPC_LOG(WARN, "PNIO packet wait too much time between proxy and server_cb", "pcode", pkt->get_pcode(),
"fly_ts", fly_ts, "send_timestamp", pkt->get_timestamp());
}
}
}
}

View File

@ -30,6 +30,10 @@ using namespace oceanbase::common;
using namespace oceanbase::common::serialization;
using namespace oceanbase::rpc::frame;
extern "C" {
int ussl_check_pcode_mismatch_connection(int fd, uint32_t pcode);
};
namespace oceanbase
{
namespace obrpc
@ -193,15 +197,22 @@ void *ObRpcNetHandler::decode(easy_message_t *ms)
LOG_ERROR("failed to decode", K(easy_conn), KP(ms), K(is_current_normal_mode), K(ret));
} else {
if (NULL != pkt) {
const int64_t receive_ts = common::ObClockGenerator::getClock();
const int64_t fly_ts = receive_ts - pkt->get_timestamp();
if (!pkt->is_resp() && fly_ts > common::OB_MAX_PACKET_FLY_TS && TC_REACH_TIME_INTERVAL(100 * 1000)) {
LOG_WARN_RET(common::OB_ERR_TOO_MUCH_TIME, "packet fly cost too much time", "pcode", pkt->get_pcode(),
"fly_ts", fly_ts, "send_timestamp", pkt->get_timestamp(), "connection", easy_connection_str(ms->c));
}
pkt->set_receive_ts(receive_ts);
if (receive_ts - start_ts > common::OB_MAX_PACKET_DECODE_TS && TC_REACH_TIME_INTERVAL(100 * 1000)) {
LOG_WARN_RET(OB_ERR_TOO_MUCH_TIME, "packet decode cost too much time", "pcode", pkt->get_pcode(), "connection", easy_connection_str(ms->c));
uint32_t pcode = pkt->get_pcode();
if (OB_UNLIKELY(ussl_check_pcode_mismatch_connection(ms->c->fd, pcode))) {
pkt = NULL;
ms->status = EASY_ERROR;
LOG_WARN_RET(common::OB_BAD_ADDRESS, "enable bypass connection received other RPC, disconnect", K(pcode));
} else {
const int64_t receive_ts = common::ObClockGenerator::getClock();
const int64_t fly_ts = receive_ts - pkt->get_timestamp();
if (!pkt->is_resp() && fly_ts > common::OB_MAX_PACKET_FLY_TS && TC_REACH_TIME_INTERVAL(100 * 1000)) {
LOG_WARN_RET(common::OB_ERR_TOO_MUCH_TIME, "packet fly cost too much time", "pcode", pkt->get_pcode(),
"fly_ts", fly_ts, "send_timestamp", pkt->get_timestamp(), "connection", easy_connection_str(ms->c));
}
pkt->set_receive_ts(receive_ts);
if (receive_ts - start_ts > common::OB_MAX_PACKET_DECODE_TS && TC_REACH_TIME_INTERVAL(100 * 1000)) {
LOG_WARN_RET(OB_ERR_TOO_MUCH_TIME, "packet decode cost too much time", "pcode", pkt->get_pcode(), "connection", easy_connection_str(ms->c));
}
}
} else {
//receive data is not enough

View File

@ -588,3 +588,17 @@ int dispatch_accept_fd_to_certain_group(int fd, uint64_t gid)
}
return ret;
}
PN_API int pn_get_fd(uint64_t req_id)
{
int fd = -1;
pn_resp_ctx_t* ctx = (typeof(ctx))req_id;
if (unlikely(NULL == ctx)) {
rk_warn("invalid arguments, req_id=%p", ctx);
} else {
pkts_t* pkts = &ctx->pn->pkts;
pkts_sk_t* sock = (typeof(sock))idm_get(&pkts->sk_map, ctx->sock_id);
fd = sock->fd;
}
return fd;
}

View File

@ -78,6 +78,7 @@ PN_API uint64_t pn_get_rxbytes(int grp_id);
PN_API int dispatch_accept_fd_to_certain_group(int fd, uint64_t gid);
PN_API void pn_stop(uint64_t gid);
PN_API void pn_wait(uint64_t gid);
PN_API int pn_get_fd(uint64_t req_id);
extern int64_t pnio_keepalive_timeout;
pn_comm_t* get_current_pnio();
void pn_release(pn_comm_t* pn_comm);