diff --git a/server/core/session.cc b/server/core/session.cc index 17021f1dd..fbb46fb65 100644 --- a/server/core/session.cc +++ b/server/core/session.cc @@ -1351,10 +1351,10 @@ void session_dump_statements(MXS_SESSION* pSession) } } -class DelayedRoutingTask: public mxs::WorkerDisposableTask +class DelayedRoutingTask { - DelayedRoutingTask(const DelayedRoutingTask&); - DelayedRoutingTask& operator=(const DelayedRoutingTask&); + DelayedRoutingTask(const DelayedRoutingTask&) = delete; + DelayedRoutingTask& operator=(const DelayedRoutingTask&) = delete; public: DelayedRoutingTask(MXS_SESSION* session, MXS_DOWNSTREAM down, GWBUF *buffer): @@ -1370,7 +1370,7 @@ public: gwbuf_free(m_buffer); } - void execute(Worker& worker) + void execute() { if (m_session->state == SESSION_STATE_ROUTER_READY) { @@ -1391,22 +1391,14 @@ private: GWBUF* m_buffer; }; -struct TaskAssignment +static bool delayed_routing_cb(Worker::Call::action_t action, DelayedRoutingTask* task) { - TaskAssignment(std::auto_ptr task, Worker* worker): - task(task), - worker(worker) - {} + if (action == Worker::Call::EXECUTE) + { + task->execute(); + } - std::auto_ptr task; - Worker* worker; -}; - -static bool delayed_routing_cb(void* data) -{ - TaskAssignment* job = static_cast(data); - job->worker->post(job->task, mxs::Worker::EXECUTE_QUEUED); - delete job; + delete task; return false; } @@ -1420,23 +1412,9 @@ bool session_delay_routing(MXS_SESSION* session, MXS_DOWNSTREAM down, GWBUF* buf ss_dassert(worker == Worker::get(session->client_dcb->poll.thread.id)); std::auto_ptr task(new DelayedRoutingTask(session, down, buffer)); - if (seconds == 0) - { - /** - * No actual delay, just re-route query. The worker task should - * never be immediately executed. They must be executed on the next - * "tick" of the worker to prevent recursive calls to - * e.g. routeQuery in readwritesplit when errors are handled. - */ - worker->post(task, Worker::EXECUTE_QUEUED); - } - else - { - std::stringstream name; - name << "Session_" << session->ses_id << "_retry"; - std::auto_ptr job(new TaskAssignment(task, worker)); - hktask_add(name.str().c_str(), delayed_routing_cb, job.release(), seconds); - } + // Delay the routing for at least a millisecond + int32_t delay = 1 + seconds * 1000; + worker->delayed_call(delay, delayed_routing_cb, task.release()); success = true; }