diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 1c58dde220..81fd1e28c5 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -67,7 +67,17 @@ OLAPStatus DeltaWriter::init() { << "schema_hash: " << _req.schema_hash << " not found"; return OLAP_ERR_TABLE_NOT_FOUND; } + OLAPStatus lock_status = _table->try_migration_rdlock(); + if (lock_status != OLAP_SUCCESS) { + return lock_status; + } else { + OLAPStatus res = _init(); + _table->release_migration_lock(); + return res; + } +} +OLAPStatus DeltaWriter::_init() { { MutexLock push_lock(_table->get_push_lock()); RETURN_NOT_OK(OLAPEngine::get_instance()->add_transaction( diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h index 4778893fa2..856c362dca 100644 --- a/be/src/olap/delta_writer.h +++ b/be/src/olap/delta_writer.h @@ -62,7 +62,8 @@ public: int64_t partition_id() const { return _req.partition_id; } private: void _garbage_collection(); - + OLAPStatus _init(); + bool _is_init = false; WriteRequest _req; OLAPTablePtr _table; diff --git a/be/src/olap/olap_engine.cpp b/be/src/olap/olap_engine.cpp index e4057c8ad0..90c5e44422 100644 --- a/be/src/olap/olap_engine.cpp +++ b/be/src/olap/olap_engine.cpp @@ -2553,10 +2553,17 @@ OLAPStatus OLAPEngine::cancel_delete(const TCancelDeleteDataReq& request) { // 2. Remove delete conditions from each tablet. DeleteConditionHandler cond_handler; for (OLAPTablePtr temp_table : table_list) { + OLAPStatus lock_status = temp_table->try_migration_rdlock(); + if (lock_status != OLAP_SUCCESS) { + OLAP_LOG_WARNING("cancel delete failed. could not get migration lock [res=%d table=%s]", + res, temp_table->full_name().c_str()); + break; + } temp_table->obtain_header_wrlock(); res = cond_handler.delete_cond(temp_table, request.version, false); if (res != OLAP_SUCCESS) { temp_table->release_header_lock(); + temp_table->release_migration_lock(); OLAP_LOG_WARNING("cancel delete failed. [res=%d table=%s]", res, temp_table->full_name().c_str()); break; @@ -2565,11 +2572,13 @@ OLAPStatus OLAPEngine::cancel_delete(const TCancelDeleteDataReq& request) { res = temp_table->save_header(); if (res != OLAP_SUCCESS) { temp_table->release_header_lock(); + temp_table->release_migration_lock(); OLAP_LOG_WARNING("fail to save header. [res=%d table=%s]", res, temp_table->full_name().c_str()); break; } temp_table->release_header_lock(); + temp_table->release_migration_lock(); } // Show delete conditions in tablet header. @@ -2627,9 +2636,15 @@ OLAPStatus OLAPEngine::recover_tablet_until_specfic_version( OLAPTablePtr table = get_table(recover_tablet_req.tablet_id, recover_tablet_req.schema_hash); if (table == nullptr) { return OLAP_ERR_TABLE_NOT_FOUND; } - RETURN_NOT_OK(table->recover_tablet_until_specfic_version(recover_tablet_req.version, - recover_tablet_req.version_hash)); - return OLAP_SUCCESS; + OLAPStatus lock_status = table->try_migration_rdlock(); + if (lock_status != OLAP_SUCCESS) { + return lock_status; + } + OLAPStatus res = OLAP_SUCCESS; + res = table->recover_tablet_until_specfic_version(recover_tablet_req.version, + recover_tablet_req.version_hash); + table->release_migration_lock(); + return res; } string OLAPEngine::get_info_before_incremental_clone(OLAPTablePtr tablet, @@ -2954,16 +2969,22 @@ OLAPStatus OLAPEngine::push( int64_t duration_ns = 0; PushHandler push_handler; - if (request.__isset.transaction_id) { - { - SCOPED_RAW_TIMER(&duration_ns); - res = push_handler.process_realtime_push(olap_table, request, type, tablet_info_vec); - } + OLAPStatus lock_status = olap_table->try_migration_rdlock(); + if (lock_status != OLAP_SUCCESS) { + res = lock_status; } else { - { - SCOPED_RAW_TIMER(&duration_ns); - res = push_handler.process(olap_table, request, type, tablet_info_vec); + if (request.__isset.transaction_id) { + { + SCOPED_RAW_TIMER(&duration_ns); + res = push_handler.process_realtime_push(olap_table, request, type, tablet_info_vec); + } + } else { + { + SCOPED_RAW_TIMER(&duration_ns); + res = push_handler.process(olap_table, request, type, tablet_info_vec); + } } + olap_table->release_migration_lock(); } if (res != OLAP_SUCCESS) { diff --git a/be/src/olap/olap_snapshot.cpp b/be/src/olap/olap_snapshot.cpp index 9d5e7e9395..9ae32ef635 100644 --- a/be/src/olap/olap_snapshot.cpp +++ b/be/src/olap/olap_snapshot.cpp @@ -657,11 +657,20 @@ OLAPStatus OLAPEngine::storage_medium_migrate( } vector olap_data_sources; + OLAPStatus lock_status = tablet->try_migration_wrlock(); + if (lock_status != OLAP_SUCCESS) { + return lock_status; + } tablet->obtain_push_lock(); do { // get all versions to be migrate tablet->obtain_header_rdlock(); + if (tablet->has_pending_data()) { + OLAP_LOG_WARNING("could not migration because has pending data [tablet='%s' ]", + tablet->full_name().c_str()); + break; + } const PDelta* lastest_version = tablet->lastest_version(); if (lastest_version == NULL) { tablet->release_header_lock(); @@ -720,7 +729,8 @@ OLAPStatus OLAPEngine::storage_medium_migrate( OLAPHeader* new_olap_header = new(std::nothrow) OLAPHeader(); if (new_olap_header == NULL) { OLAP_LOG_WARNING("new olap header failed"); - return OLAP_ERR_BUFFER_OVERFLOW; + res = OLAP_ERR_BUFFER_OVERFLOW; + break; } res = _generate_new_header(stores[0], shard, tablet, version_entity_vec, new_olap_header); if (res != OLAP_SUCCESS) { @@ -747,7 +757,8 @@ OLAPStatus OLAPEngine::storage_medium_migrate( if (new_tablet.get() == NULL) { OLAP_LOG_WARNING("get null olap table. [tablet_id=%ld schema_hash=%d]", tablet_id, schema_hash); - return OLAP_ERR_TABLE_NOT_FOUND; + res = OLAP_ERR_TABLE_NOT_FOUND; + break; } SchemaChangeStatus tablet_status = tablet->schema_change_status(); if (tablet->schema_change_status().status == AlterTableStatus::ALTER_TABLE_FINISHED) { @@ -763,7 +774,7 @@ OLAPStatus OLAPEngine::storage_medium_migrate( tablet->release_push_lock(); tablet->release_data_sources(&olap_data_sources); - + tablet->release_migration_lock(); return res; } diff --git a/be/src/olap/olap_table.cpp b/be/src/olap/olap_table.cpp index ecdd944873..397713a34b 100644 --- a/be/src/olap/olap_table.cpp +++ b/be/src/olap/olap_table.cpp @@ -729,6 +729,11 @@ bool OLAPTable::has_pending_data(int64_t transaction_id) { return _pending_data_sources.find(transaction_id) != _pending_data_sources.end(); } +bool OLAPTable::has_pending_data() { + ReadLock rdlock(&_header_lock); + return !_pending_data_sources.empty(); +} + void OLAPTable::delete_pending_data(int64_t transaction_id) { obtain_header_wrlock(); @@ -872,6 +877,18 @@ void OLAPTable::load_pending_data() { // 3. move pending data to incremental data, it won't be merged, so we can do incremental clone OLAPStatus OLAPTable::publish_version(int64_t transaction_id, Version version, VersionHash version_hash) { + OLAPStatus lock_status = _migration_lock.tryrdlock(); + if (lock_status != OLAP_SUCCESS) { + return lock_status; + } else { + OLAPStatus publish_status = _publish_version(transaction_id, version, version_hash); + _migration_lock.unlock(); + return publish_status; + } +} + +OLAPStatus OLAPTable::_publish_version(int64_t transaction_id, Version version, + VersionHash version_hash) { WriteLock wrlock(&_header_lock); if (_pending_data_sources.find(transaction_id) == _pending_data_sources.end()) { LOG(WARNING) << "pending data not exists in tablet, not finished or deleted." diff --git a/be/src/olap/olap_table.h b/be/src/olap/olap_table.h index b1621957a0..4f4e769675 100644 --- a/be/src/olap/olap_table.h +++ b/be/src/olap/olap_table.h @@ -176,6 +176,8 @@ public: bool has_pending_data(int64_t transaction_id); + bool has_pending_data(); + void delete_pending_data(int64_t transaction_id); // check the pending data that still not publish version @@ -262,6 +264,18 @@ public: RWMutex* get_header_lock_ptr() { return &_header_lock; } + + OLAPStatus try_migration_rdlock() { + return _migration_lock.tryrdlock(); + } + + OLAPStatus try_migration_wrlock() { + return _migration_lock.trywrlock(); + } + + void release_migration_lock() { + _migration_lock.unlock(); + } // Prevent push operations execute concurrently. void obtain_push_lock() { @@ -675,6 +689,8 @@ private: void _list_files_with_suffix(const std::string& file_suffix, std::set* file_names) const; + OLAPStatus _publish_version(int64_t transaction_id, Version version, VersionHash version_hash); + // 获取最大的index(只看大小) SegmentGroup* _get_largest_index(); @@ -721,7 +737,8 @@ private: // A series of status SchemaChangeStatus _schema_change_status; // related locks to ensure that commands are executed correctly. - RWMutex _header_lock; + RWMutex _header_lock; + RWMutex _migration_lock; Mutex _push_lock; Mutex _cumulative_lock; Mutex _base_compaction_lock; diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 2e9c615104..365f1fb27a 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -1388,7 +1388,13 @@ OLAPStatus SchemaChangeHandler::process_alter_table( if (new_tablet.get() != NULL) { res = OLAP_SUCCESS; } else { - res = _do_alter_table(type, ref_olap_table, request); + OLAPStatus lock_status = ref_olap_table->try_migration_rdlock(); + if (lock_status != OLAP_SUCCESS) { + res = lock_status; + } else { + res = _do_alter_table(type, ref_olap_table, request); + ref_olap_table->release_migration_lock(); + } } OLAPEngine::get_instance()->release_schema_change_lock(request.base_tablet_id); diff --git a/fe/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/src/main/java/org/apache/doris/master/ReportHandler.java index 1398bf3bb3..fd95b503f2 100644 --- a/fe/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/src/main/java/org/apache/doris/master/ReportHandler.java @@ -247,8 +247,7 @@ public class ReportHandler extends Daemon { deleteFromBackend(backendTablets, foundTabletsWithValidSchema, foundTabletsWithInvalidSchema, backendId); // 5. migration (ssd <-> hdd) - // disable migration because stream load does not support migration - // handleMigration(tabletMigrationMap, backendId); + handleMigration(tabletMigrationMap, backendId); // 6. send clear transactions to be handleClearTransactions(transactionsToClear, backendId);