Move thread initialization into Worker::run
By moving the initialization into Worker::run, all threads, including the main thread, are properly initialized. This was not noticed before as qc_sqlite initialized the main thread in the process initialization callback.
This commit is contained in:
@ -4413,8 +4413,6 @@ static int32_t qc_sqlite_process_init(void)
|
|||||||
|
|
||||||
this_unit.initialized = true;
|
this_unit.initialized = true;
|
||||||
|
|
||||||
if (qc_sqlite_thread_init() == 0)
|
|
||||||
{
|
|
||||||
if (this_unit.log_level != QC_LOG_NOTHING)
|
if (this_unit.log_level != QC_LOG_NOTHING)
|
||||||
{
|
{
|
||||||
const char* message = NULL;
|
const char* message = NULL;
|
||||||
@ -4441,13 +4439,6 @@ static int32_t qc_sqlite_process_init(void)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
|
||||||
this_unit.initialized = false;
|
|
||||||
|
|
||||||
sqlite3_shutdown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
{
|
||||||
MXS_ERROR("Failed to initialize sqlite3.");
|
MXS_ERROR("Failed to initialize sqlite3.");
|
||||||
}
|
}
|
||||||
|
@ -176,8 +176,9 @@ QUERY_CLASSIFIER* get_classifier(const char* zName, qc_sql_mode_t sql_mode, cons
|
|||||||
|
|
||||||
if (pClassifier)
|
if (pClassifier)
|
||||||
{
|
{
|
||||||
if ((pClassifier->qc_setup(sql_mode, zArgs) != QC_RESULT_OK) ||
|
if (pClassifier->qc_setup(sql_mode, zArgs) != QC_RESULT_OK ||
|
||||||
((pClassifier->qc_process_init() != QC_RESULT_OK)))
|
pClassifier->qc_process_init() != QC_RESULT_OK ||
|
||||||
|
pClassifier->qc_thread_init() != QC_RESULT_OK)
|
||||||
{
|
{
|
||||||
cerr << "error: Could not setup or init classifier " << zName << "." << endl;
|
cerr << "error: Could not setup or init classifier " << zName << "." << endl;
|
||||||
qc_unload(pClassifier);
|
qc_unload(pClassifier);
|
||||||
|
@ -856,12 +856,20 @@ void Worker::delete_zombies()
|
|||||||
}
|
}
|
||||||
|
|
||||||
void Worker::run()
|
void Worker::run()
|
||||||
|
{
|
||||||
|
if (modules_thread_init())
|
||||||
{
|
{
|
||||||
this_thread.current_worker_id = m_id;
|
this_thread.current_worker_id = m_id;
|
||||||
poll_waitevents();
|
poll_waitevents();
|
||||||
this_thread.current_worker_id = WORKER_ABSENT_ID;
|
this_thread.current_worker_id = WORKER_ABSENT_ID;
|
||||||
|
|
||||||
MXS_NOTICE("Worker %d has shut down.", m_id);
|
MXS_NOTICE("Worker %d has shut down.", m_id);
|
||||||
|
modules_thread_finish();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
MXS_ERROR("Could not perform thread initialization for all modules. Thread exits.");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Worker::start(size_t stack_size)
|
bool Worker::start(size_t stack_size)
|
||||||
@ -1068,19 +1076,9 @@ void Worker::handle_message(MessageQueue& queue, const MessageQueue::Message& ms
|
|||||||
*/
|
*/
|
||||||
//static
|
//static
|
||||||
void Worker::thread_main(void* pArg)
|
void Worker::thread_main(void* pArg)
|
||||||
{
|
|
||||||
if (modules_thread_init())
|
|
||||||
{
|
{
|
||||||
Worker* pWorker = static_cast<Worker*>(pArg);
|
Worker* pWorker = static_cast<Worker*>(pArg);
|
||||||
|
|
||||||
pWorker->run();
|
pWorker->run();
|
||||||
|
|
||||||
modules_thread_finish();
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
MXS_ERROR("Could not perform thread initialization for all modules. Thread exits.");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -92,6 +92,9 @@ int dbfw_yyparse(void*);
|
|||||||
#endif
|
#endif
|
||||||
MXS_END_DECLS
|
MXS_END_DECLS
|
||||||
|
|
||||||
|
namespace
|
||||||
|
{
|
||||||
|
|
||||||
/** The rules and users for each thread */
|
/** The rules and users for each thread */
|
||||||
struct DbfwThread
|
struct DbfwThread
|
||||||
{
|
{
|
||||||
@ -107,6 +110,8 @@ struct DbfwThread
|
|||||||
|
|
||||||
thread_local DbfwThread* this_thread = NULL;
|
thread_local DbfwThread* this_thread = NULL;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
bool parse_at_times(const char** tok, char** saveptr, Rule* ruledef);
|
bool parse_at_times(const char** tok, char** saveptr, Rule* ruledef);
|
||||||
bool parse_limit_queries(Dbfw* instance, Rule* ruledef, const char* rule, char** saveptr);
|
bool parse_limit_queries(Dbfw* instance, Rule* ruledef, const char* rule, char** saveptr);
|
||||||
static void rule_free_all(Rule* rule);
|
static void rule_free_all(Rule* rule);
|
||||||
@ -450,6 +455,7 @@ bool dbfw_show_rules_json(const MODULECMD_ARG *argv, json_t** output)
|
|||||||
|
|
||||||
static int dbfw_thr_init()
|
static int dbfw_thr_init()
|
||||||
{
|
{
|
||||||
|
ss_dassert(this_thread == NULL);
|
||||||
int rval = 0;
|
int rval = 0;
|
||||||
|
|
||||||
if ((this_thread = new (std::nothrow) DbfwThread) == NULL)
|
if ((this_thread = new (std::nothrow) DbfwThread) == NULL)
|
||||||
|
Reference in New Issue
Block a user