Support storage migration (#534)

Add a migration lock to lock the data when doing storage migration.
This commit is contained in:
yiguolei
2019-01-15 12:53:24 +08:00
committed by Mingyu Chen
parent b3b86731cb
commit f20c99fd09
8 changed files with 101 additions and 19 deletions

View File

@ -67,7 +67,17 @@ OLAPStatus DeltaWriter::init() {
<< "schema_hash: " << _req.schema_hash << " not found";
return OLAP_ERR_TABLE_NOT_FOUND;
}
OLAPStatus lock_status = _table->try_migration_rdlock();
if (lock_status != OLAP_SUCCESS) {
return lock_status;
} else {
OLAPStatus res = _init();
_table->release_migration_lock();
return res;
}
}
OLAPStatus DeltaWriter::_init() {
{
MutexLock push_lock(_table->get_push_lock());
RETURN_NOT_OK(OLAPEngine::get_instance()->add_transaction(

View File

@ -62,7 +62,8 @@ public:
int64_t partition_id() const { return _req.partition_id; }
private:
void _garbage_collection();
OLAPStatus _init();
bool _is_init = false;
WriteRequest _req;
OLAPTablePtr _table;

View File

@ -2553,10 +2553,17 @@ OLAPStatus OLAPEngine::cancel_delete(const TCancelDeleteDataReq& request) {
// 2. Remove delete conditions from each tablet.
DeleteConditionHandler cond_handler;
for (OLAPTablePtr temp_table : table_list) {
OLAPStatus lock_status = temp_table->try_migration_rdlock();
if (lock_status != OLAP_SUCCESS) {
OLAP_LOG_WARNING("cancel delete failed. could not get migration lock [res=%d table=%s]",
res, temp_table->full_name().c_str());
break;
}
temp_table->obtain_header_wrlock();
res = cond_handler.delete_cond(temp_table, request.version, false);
if (res != OLAP_SUCCESS) {
temp_table->release_header_lock();
temp_table->release_migration_lock();
OLAP_LOG_WARNING("cancel delete failed. [res=%d table=%s]",
res, temp_table->full_name().c_str());
break;
@ -2565,11 +2572,13 @@ OLAPStatus OLAPEngine::cancel_delete(const TCancelDeleteDataReq& request) {
res = temp_table->save_header();
if (res != OLAP_SUCCESS) {
temp_table->release_header_lock();
temp_table->release_migration_lock();
OLAP_LOG_WARNING("fail to save header. [res=%d table=%s]",
res, temp_table->full_name().c_str());
break;
}
temp_table->release_header_lock();
temp_table->release_migration_lock();
}
// Show delete conditions in tablet header.
@ -2627,9 +2636,15 @@ OLAPStatus OLAPEngine::recover_tablet_until_specfic_version(
OLAPTablePtr table = get_table(recover_tablet_req.tablet_id,
recover_tablet_req.schema_hash);
if (table == nullptr) { return OLAP_ERR_TABLE_NOT_FOUND; }
RETURN_NOT_OK(table->recover_tablet_until_specfic_version(recover_tablet_req.version,
recover_tablet_req.version_hash));
return OLAP_SUCCESS;
OLAPStatus lock_status = table->try_migration_rdlock();
if (lock_status != OLAP_SUCCESS) {
return lock_status;
}
OLAPStatus res = OLAP_SUCCESS;
res = table->recover_tablet_until_specfic_version(recover_tablet_req.version,
recover_tablet_req.version_hash);
table->release_migration_lock();
return res;
}
string OLAPEngine::get_info_before_incremental_clone(OLAPTablePtr tablet,
@ -2954,16 +2969,22 @@ OLAPStatus OLAPEngine::push(
int64_t duration_ns = 0;
PushHandler push_handler;
if (request.__isset.transaction_id) {
{
SCOPED_RAW_TIMER(&duration_ns);
res = push_handler.process_realtime_push(olap_table, request, type, tablet_info_vec);
}
OLAPStatus lock_status = olap_table->try_migration_rdlock();
if (lock_status != OLAP_SUCCESS) {
res = lock_status;
} else {
{
SCOPED_RAW_TIMER(&duration_ns);
res = push_handler.process(olap_table, request, type, tablet_info_vec);
if (request.__isset.transaction_id) {
{
SCOPED_RAW_TIMER(&duration_ns);
res = push_handler.process_realtime_push(olap_table, request, type, tablet_info_vec);
}
} else {
{
SCOPED_RAW_TIMER(&duration_ns);
res = push_handler.process(olap_table, request, type, tablet_info_vec);
}
}
olap_table->release_migration_lock();
}
if (res != OLAP_SUCCESS) {

View File

@ -657,11 +657,20 @@ OLAPStatus OLAPEngine::storage_medium_migrate(
}
vector<ColumnData*> olap_data_sources;
OLAPStatus lock_status = tablet->try_migration_wrlock();
if (lock_status != OLAP_SUCCESS) {
return lock_status;
}
tablet->obtain_push_lock();
do {
// get all versions to be migrate
tablet->obtain_header_rdlock();
if (tablet->has_pending_data()) {
OLAP_LOG_WARNING("could not migration because has pending data [tablet='%s' ]",
tablet->full_name().c_str());
break;
}
const PDelta* lastest_version = tablet->lastest_version();
if (lastest_version == NULL) {
tablet->release_header_lock();
@ -720,7 +729,8 @@ OLAPStatus OLAPEngine::storage_medium_migrate(
OLAPHeader* new_olap_header = new(std::nothrow) OLAPHeader();
if (new_olap_header == NULL) {
OLAP_LOG_WARNING("new olap header failed");
return OLAP_ERR_BUFFER_OVERFLOW;
res = OLAP_ERR_BUFFER_OVERFLOW;
break;
}
res = _generate_new_header(stores[0], shard, tablet, version_entity_vec, new_olap_header);
if (res != OLAP_SUCCESS) {
@ -747,7 +757,8 @@ OLAPStatus OLAPEngine::storage_medium_migrate(
if (new_tablet.get() == NULL) {
OLAP_LOG_WARNING("get null olap table. [tablet_id=%ld schema_hash=%d]",
tablet_id, schema_hash);
return OLAP_ERR_TABLE_NOT_FOUND;
res = OLAP_ERR_TABLE_NOT_FOUND;
break;
}
SchemaChangeStatus tablet_status = tablet->schema_change_status();
if (tablet->schema_change_status().status == AlterTableStatus::ALTER_TABLE_FINISHED) {
@ -763,7 +774,7 @@ OLAPStatus OLAPEngine::storage_medium_migrate(
tablet->release_push_lock();
tablet->release_data_sources(&olap_data_sources);
tablet->release_migration_lock();
return res;
}

View File

@ -729,6 +729,11 @@ bool OLAPTable::has_pending_data(int64_t transaction_id) {
return _pending_data_sources.find(transaction_id) != _pending_data_sources.end();
}
bool OLAPTable::has_pending_data() {
ReadLock rdlock(&_header_lock);
return !_pending_data_sources.empty();
}
void OLAPTable::delete_pending_data(int64_t transaction_id) {
obtain_header_wrlock();
@ -872,6 +877,18 @@ void OLAPTable::load_pending_data() {
// 3. move pending data to incremental data, it won't be merged, so we can do incremental clone
OLAPStatus OLAPTable::publish_version(int64_t transaction_id, Version version,
VersionHash version_hash) {
OLAPStatus lock_status = _migration_lock.tryrdlock();
if (lock_status != OLAP_SUCCESS) {
return lock_status;
} else {
OLAPStatus publish_status = _publish_version(transaction_id, version, version_hash);
_migration_lock.unlock();
return publish_status;
}
}
OLAPStatus OLAPTable::_publish_version(int64_t transaction_id, Version version,
VersionHash version_hash) {
WriteLock wrlock(&_header_lock);
if (_pending_data_sources.find(transaction_id) == _pending_data_sources.end()) {
LOG(WARNING) << "pending data not exists in tablet, not finished or deleted."

View File

@ -176,6 +176,8 @@ public:
bool has_pending_data(int64_t transaction_id);
bool has_pending_data();
void delete_pending_data(int64_t transaction_id);
// check the pending data that still not publish version
@ -262,6 +264,18 @@ public:
RWMutex* get_header_lock_ptr() {
return &_header_lock;
}
OLAPStatus try_migration_rdlock() {
return _migration_lock.tryrdlock();
}
OLAPStatus try_migration_wrlock() {
return _migration_lock.trywrlock();
}
void release_migration_lock() {
_migration_lock.unlock();
}
// Prevent push operations execute concurrently.
void obtain_push_lock() {
@ -675,6 +689,8 @@ private:
void _list_files_with_suffix(const std::string& file_suffix,
std::set<std::string>* file_names) const;
OLAPStatus _publish_version(int64_t transaction_id, Version version, VersionHash version_hash);
// 获取最大的index(只看大小)
SegmentGroup* _get_largest_index();
@ -721,7 +737,8 @@ private:
// A series of status
SchemaChangeStatus _schema_change_status;
// related locks to ensure that commands are executed correctly.
RWMutex _header_lock;
RWMutex _header_lock;
RWMutex _migration_lock;
Mutex _push_lock;
Mutex _cumulative_lock;
Mutex _base_compaction_lock;

View File

@ -1388,7 +1388,13 @@ OLAPStatus SchemaChangeHandler::process_alter_table(
if (new_tablet.get() != NULL) {
res = OLAP_SUCCESS;
} else {
res = _do_alter_table(type, ref_olap_table, request);
OLAPStatus lock_status = ref_olap_table->try_migration_rdlock();
if (lock_status != OLAP_SUCCESS) {
res = lock_status;
} else {
res = _do_alter_table(type, ref_olap_table, request);
ref_olap_table->release_migration_lock();
}
}
OLAPEngine::get_instance()->release_schema_change_lock(request.base_tablet_id);

View File

@ -247,8 +247,7 @@ public class ReportHandler extends Daemon {
deleteFromBackend(backendTablets, foundTabletsWithValidSchema, foundTabletsWithInvalidSchema, backendId);
// 5. migration (ssd <-> hdd)
// disable migration because stream load does not support migration
// handleMigration(tabletMigrationMap, backendId);
handleMigration(tabletMigrationMap, backendId);
// 6. send clear transactions to be
handleClearTransactions(transactionsToClear, backendId);