Merge branch '2.3' into develop

This commit is contained in:
Markus Mäkelä
2019-05-10 09:28:39 +03:00
7 changed files with 76 additions and 18 deletions

View File

@ -427,12 +427,17 @@ encryption_key_file=/var/binlogs/enc_key.txt
From MaxScale 2.3 onwards it is possible to specify secondary masters that From MaxScale 2.3 onwards it is possible to specify secondary masters that
the binlog router can use in case the connection to the default master fails. the binlog router can use in case the connection to the default master fails.
**Note:** This is _only_ supported in conjunction with a Galera cluster and **Note:** This is _only_ supported in a Galera Cluster environment in which:
provided the following holds:
* `@@log_slave_updates` is enabled on all servers, * _Wsrep GTID mode_ is enabled in the cluster.
* all nodes in the Galera cluster have the *same* `server_id`, and * _All_ of the requirements for wsrep GTID mode are met by the cluster.
* all nodes in the Galera cluster use the *same* basename for the
binlog files (specified in the server config file with `log_bin=basename`). _Wsrep GTID mode_ is also imperfect, so this secondary master functionality is
only guaranteed to work if GTIDs have not become inconsistent within the cluster.
See
[Wsrep GTID Mode](https://mariadb.com/kb/en/library/using-mariadb-gtids-with-mariadb-galera-cluster/#wsrep-gtid-mode)
for more information.
The initial setup is performed exactly like when there is but one default master. The initial setup is performed exactly like when there is but one default master.
``` ```

View File

@ -6,6 +6,3 @@ insert into masking_mxs1733 values ("john@example.com");
select * from masking_mxs1733; select * from masking_mxs1733;
email email
**************** ****************
select * from masking_mxs1733 UNION select * from masking_mxs1733;
email
****************

View File

@ -14,4 +14,3 @@ create table masking_mxs1733 (email TEXT);
insert into masking_mxs1733 values ("john@example.com"); insert into masking_mxs1733 values ("john@example.com");
select * from masking_mxs1733; select * from masking_mxs1733;
select * from masking_mxs1733 UNION select * from masking_mxs1733;

View File

@ -70,5 +70,55 @@ void store(T* t, R v, int mode = SEQ_CST)
{ {
__atomic_store_n(t, v, mode); __atomic_store_n(t, v, mode);
} }
/**
* Perform atomic compare and exchange operation
*
* @param ptr Variable where the value is stored
* @param expected Expected value of the variable
* @param desired The desired new value of the variable
* @param success_model On success, this memory model is used
* @param fail_model On failure, this memory model is used
*
* @return True if value was exchanged, false if exchange failed
*/
template<class T>
bool compare_exchange(T* ptr, T* expected, T desired, int success_model = ACQ_REL, int fail_model = ACQUIRE)
{
return __atomic_compare_exchange_n(ptr, expected, desired, true, success_model, fail_model);
}
/**
* Add to a value if it doesn't exceed a limit
*
* If the value of `ptr` + `value` is less than or equal to `limit`, the value is atomically added.
*
* @param ptr Pointer to value to add to
* @param value Value to add
* @param limit Upper limit that is not exceeded
*
* @return True if value was modified, false if the addition failed.
*/
template<class T>
bool add_limited(T* ptr, T value, T limit)
{
T expected;
T next_value;
do
{
expected = mxb::atomic::load(ptr, mxb::atomic::ACQUIRE);
if (limit < expected + value)
{
return false;
}
next_value = expected + value;
}
while (!mxb::atomic::compare_exchange(ptr, &expected, next_value));
return true;
}
} }
} }

View File

@ -1188,12 +1188,13 @@ void dcb_final_close(DCB* dcb)
dcb_stop_polling_and_shutdown(dcb); dcb_stop_polling_and_shutdown(dcb);
} }
if (dcb->server) if (dcb->server && dcb->persistentstart == 0)
{ {
// This is now a DCB::Role::BACKEND_HANDLER. // This is now a DCB::Role::BACKEND_HANDLER.
// TODO: Make decisions according to the role and assert // TODO: Make decisions according to the role and assert
// TODO: that what the role implies is preset. // TODO: that what the role implies is preset.
mxb::atomic::add(&dcb->server->stats.n_current, -1, mxb::atomic::RELAXED); MXB_AT_DEBUG(int rc = ) mxb::atomic::add(&dcb->server->stats.n_current, -1, mxb::atomic::RELAXED);
mxb_assert(rc > 0);
} }
if (dcb->fd != DCBFD_CLOSED) if (dcb->fd != DCBFD_CLOSED)
@ -1249,9 +1250,13 @@ static bool dcb_maybe_add_persistent(DCB* dcb)
&& server->persistpoolmax() && server->persistpoolmax()
&& (server->status & SERVER_RUNNING) && (server->status & SERVER_RUNNING)
&& !dcb->dcb_errhandle_called && !dcb->dcb_errhandle_called
&& dcb_persistent_clean_count(dcb, owner->id(), false) < server->persistpoolmax() && dcb_persistent_clean_count(dcb, owner->id(), false) < server->persistpoolmax())
&& mxb::atomic::load(&server->stats.n_persistent) < server->persistpoolmax())
{ {
if (!mxb::atomic::add_limited(&server->stats.n_persistent, 1, (int)server->persistpoolmax()))
{
return false;
}
DCB_CALLBACK* loopcallback; DCB_CALLBACK* loopcallback;
MXS_DEBUG("Adding DCB to persistent pool, user %s.", dcb->user); MXS_DEBUG("Adding DCB to persistent pool, user %s.", dcb->user);
dcb->was_persistent = false; dcb->was_persistent = false;
@ -1277,8 +1282,8 @@ static bool dcb_maybe_add_persistent(DCB* dcb)
dcb->nextpersistent = server->persistent[owner->id()]; dcb->nextpersistent = server->persistent[owner->id()];
server->persistent[owner->id()] = dcb; server->persistent[owner->id()] = dcb;
mxb::atomic::add(&dcb->server->stats.n_persistent, 1); MXB_AT_DEBUG(int rc = ) mxb::atomic::add(&server->stats.n_current, -1, mxb::atomic::RELAXED);
mxb::atomic::add(&dcb->server->stats.n_current, -1, mxb::atomic::RELAXED); mxb_assert(rc > 0);
return true; return true;
} }

View File

@ -376,7 +376,8 @@ bool notify_cb(DCB* dcb, void* data)
if (dcb->service == service && dcb->role == DCB::Role::CLIENT) if (dcb->service == service && dcb->role == DCB::Role::CLIENT)
{ {
poll_fake_write_event(dcb); auto session = (AvroSession*)dcb->session->router_session;
session->queue_client_callback();
} }
return true; return true;

View File

@ -173,6 +173,8 @@ public:
*/ */
int routeQuery(GWBUF* buffer); int routeQuery(GWBUF* buffer);
void queue_client_callback();
private: private:
AvroSession(Avro* instance, MXS_SESSION* session); AvroSession(Avro* instance, MXS_SESSION* session);
@ -186,7 +188,6 @@ private:
bool stream_data(); bool stream_data();
void rotate_avro_file(std::string fullname); void rotate_avro_file(std::string fullname);
void client_callback(); void client_callback();
void queue_client_callback();
}; };
void read_table_info(uint8_t* ptr, void read_table_info(uint8_t* ptr,