MXS-2004 Change tpmfilter to use std::thread instead of THREAD

The bare minimum of changes; there seem to be numerous races in
the thread function.
This commit is contained in:
Johan Wikman 2018-08-10 15:22:25 +03:00
parent 7ef163477a
commit 5d6b05af1f

View File

@ -53,13 +53,13 @@
#include <sys/types.h>
#include <sys/stat.h>
#include <regex.h>
#include <thread>
#include <maxscale/alloc.h>
#include <maxscale/filter.h>
#include <maxscale/modinfo.h>
#include <maxscale/modutil.h>
#include <maxscale/log_manager.h>
#include <maxscale/thread.h>
#include <maxscale/server.h>
#include <maxscale/atomic.h>
#include <maxscale/query_classifier.h>
@ -78,6 +78,8 @@ static const int default_sql_size = 4 * 1024;
/*
* The filter entry points
*/
struct TPM_INSTANCE;
static MXS_FILTER *createInstance(const char *name, MXS_CONFIG_PARAMETER *);
static MXS_FILTER_SESSION *newSession(MXS_FILTER *instance, MXS_SESSION *session);
static void closeSession(MXS_FILTER *instance, MXS_FILTER_SESSION *session);
@ -88,13 +90,15 @@ static int routeQuery(MXS_FILTER *instance, MXS_FILTER_SESSION *fsession, GWBUF
static int clientReply(MXS_FILTER *instance, MXS_FILTER_SESSION *fsession, GWBUF *queue);
static void diagnostic(MXS_FILTER *instance, MXS_FILTER_SESSION *fsession, DCB *dcb);
static json_t* diagnostic_json(const MXS_FILTER *instance, const MXS_FILTER_SESSION *fsession);
static uint64_t getCapabilities(MXS_FILTER* instance);
static void checkNamedPipe(void *args);
static uint64_t getCapabilities(MXS_FILTER* instance);
static void destroyInstance(MXS_FILTER *instance);
static void checkNamedPipe(TPM_INSTANCE *args);
/**
* A instance structure, every instance will write to a same file.
*/
typedef struct
struct TPM_INSTANCE
{
int sessions; /* Session count */
char *source; /* The source of the client connection */
@ -108,7 +112,9 @@ typedef struct
int query_delimiter_size; /* the length of the query delimiter */
FILE* fp;
} TPM_INSTANCE;
std::thread thread;
bool shutdown;
};
/**
* The session structure for this TPM filter.
@ -166,7 +172,7 @@ MXS_MODULE* MXS_CREATE_MODULE()
diagnostic,
diagnostic_json,
getCapabilities,
NULL, // No destroyInstance
destroyInstance
};
static MXS_MODULE info =
@ -272,11 +278,17 @@ createInstance(const char *name, MXS_CONFIG_PARAMETER *params)
/*
* Launch a thread that checks the named pipe.
*/
THREAD thread;
if (!error && thread_start(&thread, checkNamedPipe, (void*)my_instance, 0) == NULL)
if (!error)
{
MXS_ERROR("Couldn't create a thread to check the named pipe: %s", strerror(errno));
error = true;
try
{
my_instance->thread = std::thread(checkNamedPipe, my_instance);
}
catch (const std::exception& x)
{
MXS_ERROR("Couldn't create a thread to check the named pipe: %s", x.what());
error = true;
}
}
if (error)
@ -693,19 +705,30 @@ static uint64_t getCapabilities(MXS_FILTER* instance)
return RCAP_TYPE_NONE;
}
static void checkNamedPipe(void *args)
static void destroyInstance(MXS_FILTER* instance)
{
TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance;
my_instance->shutdown = true;
if (my_instance->thread.joinable())
{
my_instance->thread.join();
}
}
static void checkNamedPipe(TPM_INSTANCE *inst)
{
int ret;
char buffer[2];
char buf[4096];
TPM_INSTANCE* inst = (TPM_INSTANCE*) args;
char* named_pipe = inst->named_pipe;
// open named pipe and this will block until middleware opens it.
while ((inst->named_pipe_fd = open(named_pipe, O_RDONLY)) > 0)
while (!inst->shutdown && ((inst->named_pipe_fd = open(named_pipe, O_RDONLY)) > 0))
{
// 1 for start logging, 0 for stopping.
while ((ret = read(inst->named_pipe_fd, buffer, 1)) > 0)
while (!inst->shutdown && ((ret = read(inst->named_pipe_fd, buffer, 1)) > 0))
{
if (buffer[0] == '1')
{
@ -729,7 +752,8 @@ static void checkNamedPipe(void *args)
close(inst->named_pipe_fd);
}
}
if (inst->named_pipe_fd == -1)
if (!inst->shutdown && (inst->named_pipe_fd == -1))
{
MXS_ERROR("Failed to open the named pipe '%s': %s", named_pipe, strerror(errno));
return;