Use Worker::delayed_call in session_delay_routing
The use of the housekeeper is no longer needed as a better mechanism exists.
This commit is contained in:
@ -1351,10 +1351,10 @@ void session_dump_statements(MXS_SESSION* pSession)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class DelayedRoutingTask: public mxs::WorkerDisposableTask
|
class DelayedRoutingTask
|
||||||
{
|
{
|
||||||
DelayedRoutingTask(const DelayedRoutingTask&);
|
DelayedRoutingTask(const DelayedRoutingTask&) = delete;
|
||||||
DelayedRoutingTask& operator=(const DelayedRoutingTask&);
|
DelayedRoutingTask& operator=(const DelayedRoutingTask&) = delete;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
DelayedRoutingTask(MXS_SESSION* session, MXS_DOWNSTREAM down, GWBUF *buffer):
|
DelayedRoutingTask(MXS_SESSION* session, MXS_DOWNSTREAM down, GWBUF *buffer):
|
||||||
@ -1370,7 +1370,7 @@ public:
|
|||||||
gwbuf_free(m_buffer);
|
gwbuf_free(m_buffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
void execute(Worker& worker)
|
void execute()
|
||||||
{
|
{
|
||||||
if (m_session->state == SESSION_STATE_ROUTER_READY)
|
if (m_session->state == SESSION_STATE_ROUTER_READY)
|
||||||
{
|
{
|
||||||
@ -1391,22 +1391,14 @@ private:
|
|||||||
GWBUF* m_buffer;
|
GWBUF* m_buffer;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct TaskAssignment
|
static bool delayed_routing_cb(Worker::Call::action_t action, DelayedRoutingTask* task)
|
||||||
{
|
{
|
||||||
TaskAssignment(std::auto_ptr<DelayedRoutingTask> task, Worker* worker):
|
if (action == Worker::Call::EXECUTE)
|
||||||
task(task),
|
|
||||||
worker(worker)
|
|
||||||
{}
|
|
||||||
|
|
||||||
std::auto_ptr<DelayedRoutingTask> task;
|
|
||||||
Worker* worker;
|
|
||||||
};
|
|
||||||
|
|
||||||
static bool delayed_routing_cb(void* data)
|
|
||||||
{
|
{
|
||||||
TaskAssignment* job = static_cast<TaskAssignment*>(data);
|
task->execute();
|
||||||
job->worker->post(job->task, mxs::Worker::EXECUTE_QUEUED);
|
}
|
||||||
delete job;
|
|
||||||
|
delete task;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1420,23 +1412,9 @@ bool session_delay_routing(MXS_SESSION* session, MXS_DOWNSTREAM down, GWBUF* buf
|
|||||||
ss_dassert(worker == Worker::get(session->client_dcb->poll.thread.id));
|
ss_dassert(worker == Worker::get(session->client_dcb->poll.thread.id));
|
||||||
std::auto_ptr<DelayedRoutingTask> task(new DelayedRoutingTask(session, down, buffer));
|
std::auto_ptr<DelayedRoutingTask> task(new DelayedRoutingTask(session, down, buffer));
|
||||||
|
|
||||||
if (seconds == 0)
|
// Delay the routing for at least a millisecond
|
||||||
{
|
int32_t delay = 1 + seconds * 1000;
|
||||||
/**
|
worker->delayed_call(delay, delayed_routing_cb, task.release());
|
||||||
* No actual delay, just re-route query. The worker task should
|
|
||||||
* never be immediately executed. They must be executed on the next
|
|
||||||
* "tick" of the worker to prevent recursive calls to
|
|
||||||
* e.g. routeQuery in readwritesplit when errors are handled.
|
|
||||||
*/
|
|
||||||
worker->post(task, Worker::EXECUTE_QUEUED);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
std::stringstream name;
|
|
||||||
name << "Session_" << session->ses_id << "_retry";
|
|
||||||
std::auto_ptr<TaskAssignment> job(new TaskAssignment(task, worker));
|
|
||||||
hktask_add(name.str().c_str(), delayed_routing_cb, job.release(), seconds);
|
|
||||||
}
|
|
||||||
|
|
||||||
success = true;
|
success = true;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user