Fixed the concurrency control issue with the PS cache
This commit is contained in:
		@ -316,7 +316,8 @@ ObPsStmtInfo::ObPsStmtInfo(ObIAllocator *inner_allocator,
 | 
				
			|||||||
    raw_sql_(),
 | 
					    raw_sql_(),
 | 
				
			||||||
    raw_params_(inner_allocator),
 | 
					    raw_params_(inner_allocator),
 | 
				
			||||||
    raw_params_idx_(inner_allocator),
 | 
					    raw_params_idx_(inner_allocator),
 | 
				
			||||||
    literal_stmt_type_(stmt::T_NONE)
 | 
					    literal_stmt_type_(stmt::T_NONE),
 | 
				
			||||||
 | 
					    erased_(false)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -462,6 +463,7 @@ int ObPsStmtInfo::deep_copy(const ObPsStmtInfo &other)
 | 
				
			|||||||
    is_expired_ = other.is_expired_;
 | 
					    is_expired_ = other.is_expired_;
 | 
				
			||||||
    is_expired_evicted_ = other.is_expired_evicted_;
 | 
					    is_expired_evicted_ = other.is_expired_evicted_;
 | 
				
			||||||
    literal_stmt_type_ = other.literal_stmt_type_;
 | 
					    literal_stmt_type_ = other.literal_stmt_type_;
 | 
				
			||||||
 | 
					    erased_ = other.erased_;
 | 
				
			||||||
    if (other.get_dep_objs_cnt() > 0) {
 | 
					    if (other.get_dep_objs_cnt() > 0) {
 | 
				
			||||||
      dep_objs_cnt_ = other.get_dep_objs_cnt();
 | 
					      dep_objs_cnt_ = other.get_dep_objs_cnt();
 | 
				
			||||||
      if (NULL == (dep_objs_ = reinterpret_cast<ObSchemaObjVersion *>
 | 
					      if (NULL == (dep_objs_ = reinterpret_cast<ObSchemaObjVersion *>
 | 
				
			||||||
 | 
				
			|||||||
@ -251,6 +251,7 @@ public:
 | 
				
			|||||||
  void set_is_expired() { ATOMIC_STORE(&is_expired_, true); }
 | 
					  void set_is_expired() { ATOMIC_STORE(&is_expired_, true); }
 | 
				
			||||||
  bool is_expired() { return ATOMIC_LOAD(&is_expired_); }
 | 
					  bool is_expired() { return ATOMIC_LOAD(&is_expired_); }
 | 
				
			||||||
  bool *get_is_expired_evicted_ptr() { return &is_expired_evicted_; }
 | 
					  bool *get_is_expired_evicted_ptr() { return &is_expired_evicted_; }
 | 
				
			||||||
 | 
					  bool try_set_erase_flag() { return ATOMIC_BCAS(&erased_, false, true); }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  DECLARE_VIRTUAL_TO_STRING;
 | 
					  DECLARE_VIRTUAL_TO_STRING;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -292,6 +293,7 @@ private:
 | 
				
			|||||||
  ObFixedArray<ObPCParam *, common::ObIAllocator> raw_params_;
 | 
					  ObFixedArray<ObPCParam *, common::ObIAllocator> raw_params_;
 | 
				
			||||||
  ObFixedArray<int64_t, common::ObIAllocator> raw_params_idx_;
 | 
					  ObFixedArray<int64_t, common::ObIAllocator> raw_params_idx_;
 | 
				
			||||||
  stmt::StmtType literal_stmt_type_;
 | 
					  stmt::StmtType literal_stmt_type_;
 | 
				
			||||||
 | 
					  volatile bool erased_;
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
struct TypeInfo {
 | 
					struct TypeInfo {
 | 
				
			||||||
 | 
				
			|||||||
@ -152,6 +152,9 @@ int ObPsCache::deref_ps_stmt(const ObPsStmtId stmt_id, bool erase_item/*=false*/
 | 
				
			|||||||
    if (erase_item) { // dec cached ref
 | 
					    if (erase_item) { // dec cached ref
 | 
				
			||||||
      if (OB_FAIL(erase_stmt_item(stmt_id, ps_sql_key))) {
 | 
					      if (OB_FAIL(erase_stmt_item(stmt_id, ps_sql_key))) {
 | 
				
			||||||
        LOG_WARN("fail to erase stmt", K(ret));
 | 
					        LOG_WARN("fail to erase stmt", K(ret));
 | 
				
			||||||
 | 
					      } else if (ps_info->try_set_erase_flag() && OB_SUCCESS != (tmp_ret = deref_stmt_info(stmt_id))) {
 | 
				
			||||||
 | 
					        ret = tmp_ret;
 | 
				
			||||||
 | 
					        LOG_WARN("deref stmt info failed", K(ret), K(stmt_id), K(ps_sql_key));
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
    } else { // dec session ref
 | 
					    } else { // dec session ref
 | 
				
			||||||
      if (OB_ISNULL(ps_info->get_ps_item())) {
 | 
					      if (OB_ISNULL(ps_info->get_ps_item())) {
 | 
				
			||||||
@ -160,13 +163,12 @@ int ObPsCache::deref_ps_stmt(const ObPsStmtId stmt_id, bool erase_item/*=false*/
 | 
				
			|||||||
      } else {
 | 
					      } else {
 | 
				
			||||||
        ps_info->get_ps_item()->dec_ref_count_check_erase();
 | 
					        ps_info->get_ps_item()->dec_ref_count_check_erase();
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
    }
 | 
					      if (OB_SUCCESS != (tmp_ret = deref_stmt_info(stmt_id))) {
 | 
				
			||||||
 | 
					        ret = tmp_ret; //previous ret ignore
 | 
				
			||||||
    if (OB_SUCCESS != (tmp_ret = deref_stmt_info(stmt_id))) {
 | 
					        LOG_WARN("deref stmt info failed", K(ret), K(stmt_id), K(ps_sql_key));
 | 
				
			||||||
      ret = tmp_ret; //previous ret ignore
 | 
					      } else {
 | 
				
			||||||
      LOG_WARN("deref stmt info failed", K(ret), K(stmt_id), K(ps_sql_key));
 | 
					        LOG_TRACE("deref stmt info success", K(stmt_id), K(ps_sql_key), K(ret));
 | 
				
			||||||
    } else {
 | 
					      }
 | 
				
			||||||
      LOG_TRACE("deref stmt info success", K(stmt_id), K(ps_sql_key), K(ret));
 | 
					 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
  return ret;
 | 
					  return ret;
 | 
				
			||||||
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user