Merge branch '2.2' into develop

This commit is contained in:
Johan Wikman 2018-05-22 16:18:36 +03:00
commit 0d92b0bc8f
6 changed files with 179 additions and 45 deletions

View File

@ -225,6 +225,18 @@ were used, they need to be recreated once MaxScale is stopped. After stopping
MaxScale and optionally creating the schema files, the conversion process can be
started by starting MaxScale.
# Stopping the Avrorouter
The safest way to stop the avrorouter when used with the binlogrouter is to
follow the following steps:
* Issue `STOP SLAVE` on the binlogrouter
* Wait for the avrorouter to process all files
* Stop MaxScale with `systemctl stop maxscale`
This guarantees that the conversion process halts at a known good position in
the latest binlog file.
# Example Client
The avrorouter comes with an example client program, _cdc.py_, written in Python 3.

View File

@ -5,7 +5,7 @@
set(MAXSCALE_VERSION_MAJOR "2" CACHE STRING "Major version")
set(MAXSCALE_VERSION_MINOR "2" CACHE STRING "Minor version")
set(MAXSCALE_VERSION_PATCH "6" CACHE STRING "Patch version")
set(MAXSCALE_VERSION_PATCH "7" CACHE STRING "Patch version")
# This should only be incremented if a package is rebuilt
set(MAXSCALE_BUILD_NUMBER 1 CACHE STRING "Release number")

View File

@ -235,13 +235,6 @@ void dcb_close(DCB *);
*/
void dcb_close_in_owning_thread(DCB *dcb);
/**
* Add a DCB to the owner's list
*
* @param dcb DCB to add
*/
void dcb_add_to_list(DCB *dcb);
void printAllDCBs(); /* Debug to print all DCB in the system */
void printDCB(DCB *); /* Debug print routine */
void dprintDCBList(DCB *); /* Debug print DCB list statistics */

View File

@ -642,8 +642,8 @@ void TestConnections::init_maxscale(int m)
"chmod a+x %s;"
"%s"
"iptables -F INPUT;"
"rm -f %s/maxscale.log;"
"rm -f %s/maxscale1.log;"
"truncate -s 0 %s/maxscale.log;"
"truncate -s 0 %s/maxscale1.log;"
"rm -rf /tmp/core* /dev/shm/* /var/lib/maxscale/maxscale.cnf.d/ /var/lib/maxscale/*;"
"%s",
maxscales->access_homedir[m],

View File

@ -115,6 +115,8 @@ static int dcb_listen_create_socket_inet(const char *host, uint16_t port);
static int dcb_listen_create_socket_unix(const char *path);
static int dcb_set_socket_option(int sockfd, int level, int optname, void *optval, socklen_t optlen);
static void dcb_add_to_all_list(DCB *dcb);
static void dcb_add_to_list(DCB *dcb);
static bool dcb_add_to_worker(int worker_id, DCB *dcb, uint32_t events);
static DCB *dcb_find_free();
static void dcb_remove_from_list(DCB *dcb);
@ -2034,7 +2036,8 @@ static void dcb_hangup_foreach_worker(int thread_id, struct server* server)
if (dcb->state == DCB_STATE_POLLING && dcb->server &&
dcb->server == server)
{
poll_fake_hangup_event(dcb);
dcb->flags |= DCBF_HUNG;
dcb->func.hangup(dcb);
}
}
}
@ -2792,7 +2795,7 @@ dcb_role_name(DCB *dcb)
return name;
}
static void dcb_add_to_worker_list(int thread_id, void* data)
static void dcb_add_to_list_cb(int thread_id, void* data)
{
DCB *dcb = (DCB*)data;
@ -2801,7 +2804,7 @@ static void dcb_add_to_worker_list(int thread_id, void* data)
dcb_add_to_list(dcb);
}
void dcb_add_to_list(DCB *dcb)
static void dcb_add_to_list(DCB *dcb)
{
if (dcb->dcb_role != DCB_ROLE_SERVICE_LISTENER ||
(dcb->thread.next == NULL && dcb->thread.tail == NULL))
@ -2811,33 +2814,17 @@ void dcb_add_to_list(DCB *dcb)
* is not in the list. Stopped listeners are not removed from the list.
*/
int worker_id = Worker::get_current_id();
ss_dassert(dcb->poll.thread.id == Worker::get_current_id());
if (worker_id == dcb->poll.thread.id)
if (this_unit.all_dcbs[dcb->poll.thread.id] == NULL)
{
if (this_unit.all_dcbs[dcb->poll.thread.id] == NULL)
{
this_unit.all_dcbs[dcb->poll.thread.id] = dcb;
this_unit.all_dcbs[dcb->poll.thread.id]->thread.tail = dcb;
}
else
{
this_unit.all_dcbs[dcb->poll.thread.id]->thread.tail->thread.next = dcb;
this_unit.all_dcbs[dcb->poll.thread.id]->thread.tail = dcb;
}
this_unit.all_dcbs[dcb->poll.thread.id] = dcb;
this_unit.all_dcbs[dcb->poll.thread.id]->thread.tail = dcb;
}
else
{
Worker* worker = Worker::get(dcb->poll.thread.id);
ss_dassert(worker);
intptr_t arg1 = (intptr_t)dcb_add_to_worker_list;
intptr_t arg2 = (intptr_t)dcb;
if (!worker->post_message(MXS_WORKER_MSG_CALL, arg1, arg2))
{
MXS_ERROR("Could not post DCB to worker.");
}
this_unit.all_dcbs[dcb->poll.thread.id]->thread.tail->thread.next = dcb;
this_unit.all_dcbs[dcb->poll.thread.id]->thread.tail = dcb;
}
}
}
@ -2863,8 +2850,11 @@ static void dcb_remove_from_list(DCB *dcb)
}
else
{
DCB *current = this_unit.all_dcbs[dcb->poll.thread.id]->thread.next;
// If the creation of the DCB failed, it will not have been added
// to the list at all. And if it happened to be the first DCB to be
// created, then `prev` is NULL at this point.
DCB *prev = this_unit.all_dcbs[dcb->poll.thread.id];
DCB *current = prev ? prev->thread.next : NULL;
while (current)
{
@ -3430,6 +3420,122 @@ static inline void dcb_sanity_check(DCB* dcb)
}
}
namespace
{
class AddDcbToWorker: public mxs::WorkerDisposableTask
{
public:
AddDcbToWorker(const AddDcbToWorker&) = delete;
AddDcbToWorker& operator=(const AddDcbToWorker&) = delete;
AddDcbToWorker(DCB* dcb, uint32_t events)
: m_dcb(dcb)
, m_events(events)
{
}
void execute(Worker& worker)
{
ss_dassert(worker.id() == m_dcb->poll.thread.id);
bool added = dcb_add_to_worker(worker.id(), m_dcb, m_events);
ss_dassert(added);
if (!added)
{
dcb_close(m_dcb);
}
}
private:
DCB* m_dcb;
uint32_t m_events;
};
}
static bool dcb_add_to_worker(int worker_id, DCB* dcb, uint32_t events)
{
bool rv = false;
if (worker_id == MXS_WORKER_ALL)
{
// A listening DCB, we add it immediately (poll_add_fd_to_worker() is thread-safe).
if (poll_add_fd_to_worker(worker_id, dcb->fd, events, (MXS_POLL_DATA*)dcb))
{
// If this takes place on the main thread (all listening DCBs are
// stored on the main thread),
if (dcb->poll.thread.id == Worker::get_current_id())
{
// we'll add it immediately to the list,
dcb_add_to_list(dcb);
}
else
{
// otherwise we must move the adding to the main thread.
// TODO: Separate listening and other DCBs, as this is a mess.
Worker* worker = Worker::get(dcb->poll.thread.id);
ss_dassert(worker);
intptr_t arg1 = (intptr_t)dcb_add_to_list_cb;
intptr_t arg2 = (intptr_t)dcb;
if (!worker->post_message(MXS_WORKER_MSG_CALL, arg1, arg2))
{
MXS_ERROR("Could not post listening DCB for book-keeping to worker.");
}
}
rv = true;
}
}
else
{
ss_dassert(worker_id == dcb->poll.thread.id);
if (worker_id == Worker::get_current_id())
{
// If the DCB should end up on the current thread, we can both add it
// to the epoll-instance and to the DCB book-keeping immediately.
if (poll_add_fd_to_worker(worker_id, dcb->fd, events, (MXS_POLL_DATA*)dcb))
{
dcb_add_to_list(dcb);
rv = true;
}
}
else
{
// Otherwise we'll move the whole operation to the correct worker.
// This will only happen for "cli" and "maxinfo" services that must
// be served by one thread as there otherwise deadlocks can occur.
AddDcbToWorker* task = new (std::nothrow) AddDcbToWorker(dcb, events);
ss_dassert(task);
if (task)
{
Worker* worker = Worker::get(dcb->poll.thread.id);
ss_dassert(worker);
if (worker->post(std::auto_ptr<AddDcbToWorker>(task), mxs::Worker::EXECUTE_QUEUED))
{
rv = true;
}
else
{
MXS_ERROR("Could not post task to add DCB to worker.");
}
}
else
{
MXS_OOM();
}
}
}
return rv;
}
int poll_add_dcb(DCB *dcb)
{
dcb_sanity_check(dcb);
@ -3490,19 +3596,16 @@ int poll_add_dcb(DCB *dcb)
int rc = 0;
if (poll_add_fd_to_worker(worker_id, dcb->fd, events, (MXS_POLL_DATA*)dcb))
{
dcb_add_to_list(dcb);
MXS_DEBUG("%lu [poll_add_dcb] Added dcb %p in state %s to poll set.",
thread_self(), dcb, STRDCBSTATE(dcb->state));
}
else
if (!dcb_add_to_worker(worker_id, dcb, events))
{
/**
* We failed to add the DCB to a worker. Revert the state so that it
* will be treated as a DCB in the correct state.
* will be treated as a DCB in the correct state. As this will involve
* cleanup, ensure that the current thread is the owner, as otherwise
* debug asserts will be triggered.
*/
dcb->state = old_state;
dcb->poll.thread.id = Worker::get_current_id();
rc = -1;
}

View File

@ -16,6 +16,7 @@
#include <fcntl.h>
#include <string.h>
#include <unistd.h>
#include <fstream>
#include <maxscale/debug.h>
#include <maxscale/log_manager.h>
#include "internal/routingworker.hh"
@ -26,11 +27,25 @@ namespace
static struct
{
bool initialized;
int pipe_max_size;
} this_unit =
{
false
};
int get_pipe_max_size()
{
int size = 65536; // Default value from pipe(7)
std::ifstream file("/proc/sys/fs/pipe-max-size");
if (file.good())
{
file >> size;
}
return size;
}
}
namespace maxscale
@ -65,6 +80,7 @@ bool MessageQueue::init()
ss_dassert(!this_unit.initialized);
this_unit.initialized = true;
this_unit.pipe_max_size = get_pipe_max_size();
return this_unit.initialized;
}
@ -123,7 +139,17 @@ MessageQueue* MessageQueue::create(Handler* pHandler)
{
int read_fd = fds[0];
int write_fd = fds[1];
#ifdef F_SETPIPE_SZ
/**
* Increase the pipe buffer size on systems that support it. Modifying
* the buffer size of one fd will also increase it for the other.
*/
if (fcntl(fds[0], F_SETPIPE_SZ, this_unit.pipe_max_size) == -1)
{
MXS_WARNING("Failed to increase pipe buffer size to '%d': %d, %s",
this_unit.pipe_max_size, errno, mxs_strerror(errno));
}
#endif
pThis = new (std::nothrow) MessageQueue(pHandler, read_fd, write_fd);
if (!pThis)