Merge branch 'develop' into MXS-1075

This commit is contained in:
MassimilianoPinto
2017-03-07 16:11:18 +01:00
29 changed files with 878 additions and 530 deletions

View File

@ -119,9 +119,10 @@ This functionality is similar to the [Multi-Master Monitor](MM-Monitor.md)
functionality. The only difference is that the MySQL monitor will also detect functionality. The only difference is that the MySQL monitor will also detect
traditional Master-Slave topologies. traditional Master-Slave topologies.
### `failover` ### `detect_standalone_master`
Failover mode. This feature takes a boolean parameter is disabled by default. Detect standalone master servers. This feature takes a boolean parameter and is
disabled by default. In MaxScale 2.1.0, this parameter was called `failover`.
This parameter is intended to be used with simple, two node master-slave pairs This parameter is intended to be used with simple, two node master-slave pairs
where the failure of the master can be resolved by "promoting" the slave as the where the failure of the master can be resolved by "promoting" the slave as the
@ -130,22 +131,40 @@ new master. Normally this is done by using an external agent of some sort
[MariaDB Replication Manager](https://github.com/tanji/replication-manager) [MariaDB Replication Manager](https://github.com/tanji/replication-manager)
or [MHA](https://code.google.com/p/mysql-master-ha/). or [MHA](https://code.google.com/p/mysql-master-ha/).
The failover mode in mysqlmon is completely passive in the sense that it does When the number of running servers in the cluster drops down to one, MaxScale
not modify the cluster or any servers in it. It labels a slave server as a cannot be absolutely certain whether the last remaining server is a master or a
master server when there is only one running server. Before a failover can be slave. At this point, MaxScale will try to deduce the type of the server by
initiated, the following conditions must have been met: looking at the system variables of the server in question.
By default, MaxScale will only attempt to deduce if the server can be used as a
slave server (controlled by the `detect_stale_slave` parameter). When the
`detect_standalone_master` mode is enabled, MaxScale will also attempt to deduce
whether the server can be used as a master server. This is done by checking that
the server is not in read-only mode and that it is not configured as a slave.
This mode in mysqlmon is completely passive in the sense that it does not modify
the cluster or any of the servers in it. It only labels the last remaining
server in a cluster as the master server.
Before a server is labeled as a standalone master, the following conditions must
have been met:
- Previous attempts to connect to other servers in the cluster have failed,
controlled by the `failcount` parameter
- The monitor has repeatedly failed to connect to the failed servers
- There is only one running server among the monitored servers - There is only one running server among the monitored servers
- @@read_only is not enabled on the last running server
- The value of the `@@read_only` system variable is set to `OFF`
In 2.1.1, the following additional condition was added:
- The last running server is not configured as a slave - The last running server is not configured as a slave
When these conditions are met, the monitor assigns the last remaining server the If the value of the `allow_cluster_recovery` parameter is set to false, the monitor
master status and puts all other servers into maintenance mode. This is done to sets all other servers into maintenance mode. This is done to prevent accidental
prevent accidental use of the failed servers if they came back online. use of the failed servers if they came back online. If the failed servers come
back up, the maintenance mode needs to be manually cleared once replication has
When the failed servers come back up, the maintenance mode needs to be manually been set up.
cleared once replication has been set up.
**Note**: A failover will cause permanent changes in the data of the promoted **Note**: A failover will cause permanent changes in the data of the promoted
server. Only use this feature if you know that the slave servers are capable server. Only use this feature if you know that the slave servers are capable
@ -153,32 +172,33 @@ cleared once replication has been set up.
### `failcount` ### `failcount`
Number of failures that must occur on all failed servers before a failover is Number of failures that must occur on all failed servers before a standalone
initiated. The default value is 5 failures. server is labeled as a master. The default value is 5 failures.
The monitor will attemt to contact all servers once per monitoring cycle. When The monitor will attempt to contact all servers once per monitoring cycle. When
_failover_ mode is enabled, all of the failed servers must fail _failcount_ `detect_standalone_master` is enabled, all of the failed servers must fail
number of connection attemps before a failover is initiated. _failcount_ number of connection attempts before the last server is labeled as
the master.
The formula for calculating the actual number of milliseconds before failover The formula for calculating the actual number of milliseconds before the server
can start is `monitor_interval * failcount`. This means that to trigger a is labeled as the master is `monitor_interval * failcount`.
failover after 10 seconds of master failure with a _monitor_interval_ of 1000
milliseconds, the value of _failcount_ must be 10.
### `failover_recovery` ### `allow_cluster_recovery`
Allow recovery after failover. This feature takes a boolean parameter is Allow recovery after the cluster has dropped down to one server. This feature
enabled by default. takes a boolean parameter is enabled by default. This parameter requires that
`detect_standalone_master` is set to true. In MaxScale 2.1.0, this parameter was
called `failover_recovery`.
When this parameter is disabled, if a failover has been triggered and the last When this parameter is disabled, if the last remaining server is labeled as the
remaining server is chosen as the master, the monitor will set all of the failed master, the monitor will set all of the failed servers into maintenance
servers into maintenance mode. When this option is enabled, the failed servers mode. When this option is enabled, the failed servers are allowed to rejoin the
are allowed to rejoin the cluster. cluster.
This option should be enabled when failover in MaxScale is used in conjunction This option should be enabled only when MaxScale is used in conjunction with an
with an external agent that resets the slave status for new master servers. One external agent that automatically reintegrates failed servers into the
of these agents is the _replication-manager_ which clears the slave cluster. One of these agents is the _replication-manager_ which automatically
configuration for each new master and removes the read-only mode. configures the failed servers as new slaves of the current master.
## Example 1 - Monitor script ## Example 1 - Monitor script

View File

@ -171,6 +171,11 @@ data block. The default value is 1 transaction.
Controls the number of row events that are grouped into a single Avro Controls the number of row events that are grouped into a single Avro
data block. The default value is 1000 row events. data block. The default value is 1000 row events.
#### `block_size`
The Avro data block size in bytes. The default is 16 kilobytes. Increase this
value if individual events in the binary logs are very large.
## Module commands ## Module commands
Read [Module Commands](../Reference/Module-Commands.md) documentation for details about module commands. Read [Module Commands](../Reference/Module-Commands.md) documentation for details about module commands.

View File

@ -6,7 +6,7 @@
# to the CMakeLists.txt. You don't need to link against the pcre2 library # to the CMakeLists.txt. You don't need to link against the pcre2 library
# because the static symbols will be in MaxScale. # because the static symbols will be in MaxScale.
ExternalProject_Add(pcre2 SOURCE_DIR ${CMAKE_SOURCE_DIR}/pcre2/ ExternalProject_Add(pcre2 SOURCE_DIR ${CMAKE_SOURCE_DIR}/pcre2/
CMAKE_ARGS -DCMAKE_C_FLAGS=-fPIC -DBUILD_SHARED_LIBS=N -DPCRE2_BUILD_PCRE2GREP=N -DPCRE2_BUILD_TESTS=N CMAKE_ARGS -DCMAKE_C_FLAGS=-fPIC -DBUILD_SHARED_LIBS=N -DPCRE2_BUILD_PCRE2GREP=N -DPCRE2_BUILD_TESTS=N -DPCRE2_SUPPORT_JIT=Y
BINARY_DIR ${CMAKE_BINARY_DIR}/pcre2/ BINARY_DIR ${CMAKE_BINARY_DIR}/pcre2/
BUILD_COMMAND make BUILD_COMMAND make
INSTALL_COMMAND "") INSTALL_COMMAND "")

248
include/maxscale/router.hh Normal file
View File

@ -0,0 +1,248 @@
#pragma once
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include <maxscale/cppdefs.hh>
#include <maxscale/router.h>
namespace maxscale
{
/**
* @class RouterSession router.hh <maxscale/router.hh>
*
* RouterSession is a base class for router sessions. A concrete router session
* class should be derived from this class and override all relevant functions.
*
* Note that even though this class is intended to be derived from, no functions
* are virtual. That is by design, as the class will be used in a context where
* the concrete class is known. That is, there is no need for the virtual mechanism.
*/
class RouterSession : public MXS_ROUTER_SESSION
{
public:
/**
* The RouterSession instance will be deleted when a client session
* has terminated. Will be called only after @c close() has been called.
*/
~RouterSession();
/**
* Called when a client session has been closed.
*/
void close();
/**
* Called when a packet being is routed to the backend. The router should
* forward the packet to the appropriate server(s).
*
* @param pPacket A client packet.
*/
int32_t routeQuery(GWBUF* pPacket);
/**
* Called when a packet is routed to the client. The router should
* forward the packet to the client using `MXS_SESSION_ROUTE_REPLY`.
*
* @param pPacket A client packet.
* @param pBackend The backend the packet is coming from.
*/
void clientReply(GWBUF* pPacket, DCB* pBackend);
/**
*
* @param pMessage The rror message.
* @param pProblem The DCB on which the error occurred.
* @param action The context.
* @param pSuccess On output, if false, the session will be terminated.
*/
void handleError(GWBUF* pMessage,
DCB* pProblem,
mxs_error_action_t action,
bool* pSuccess);
protected:
RouterSession(MXS_SESSION* pSession);
protected:
MXS_SESSION* m_pSession; /*< The MXS_SESSION this router session is associated with. */
};
/**
* @class Router router.hh <maxscale/router.hh>
*
* An instantiation of the Router template is used for creating a router.
* Router is an example of the "Curiously recurring template pattern"
* https://en.wikipedia.org/wiki/Curiously_recurring_template_pattern
* that is used for compile time polymorfism.
*
* The typical way for using the template is as follows:
*
* @code
* class MyRouterSession : public maxscale::RouterSession
* {
* // Override the relevant functions.
* };
*
* class MyRouter : public maxscale::Router<MyRouter, MyRouterSession>
* {
* public:
* static MyRouter* create(SERVICE* pService, char** pzOptions);
*
* MyRouterSession* newSession(MXS_SESSION* pSession);
*
* void diagnostics(DCB* pDcb);
* uint64_t getCapabilities();
* };
* @endcode
*
* The concrete router class must implement the methods @c create, @c newSession,
* @c diagnostics and @c getCapabilities, with the prototypes as shown above.
*
* The plugin function @c GetModuleObject is then implemented as follows:
*
* @code
* extern "C" MXS_MODULE* MXS_CREATE_MODULE()
* {
* static MXS_MODULE module_object =
* {
* ...
* &MyRouter::s_object,
* ...
* };
*
* return &module_object;
* }
* @endcode
*/
template<class RouterType, class RouterSessionType>
class Router : public MXS_ROUTER
{
public:
static MXS_ROUTER* createInstance(SERVICE* pService, char** pzOptions)
{
RouterType* pRouter = NULL;
MXS_EXCEPTION_GUARD(pRouter = RouterType::create(pService, pzOptions));
return pRouter;
}
static MXS_ROUTER_SESSION* newSession(MXS_ROUTER* pInstance, MXS_SESSION* pSession)
{
RouterType* pRouter = static_cast<RouterType*>(pInstance);
RouterSessionType* pRouter_session;
MXS_EXCEPTION_GUARD(pRouter_session = pRouter->newSession(pSession));
return pRouter_session;
}
static void closeSession(MXS_ROUTER*, MXS_ROUTER_SESSION* pData)
{
RouterSessionType* pRouter_session = static_cast<RouterSessionType*>(pData);
MXS_EXCEPTION_GUARD(pRouter_session->close());
}
static void freeSession(MXS_ROUTER*, MXS_ROUTER_SESSION* pData)
{
RouterSessionType* pRouter_session = static_cast<RouterSessionType*>(pData);
MXS_EXCEPTION_GUARD(delete pRouter_session);
}
static int32_t routeQuery(MXS_ROUTER*, MXS_ROUTER_SESSION* pData, GWBUF* pPacket)
{
RouterSessionType* pRouter_session = static_cast<RouterSessionType*>(pData);
int32_t rv = 0;
MXS_EXCEPTION_GUARD(rv = pRouter_session->routeQuery(pPacket));
return rv;
}
static void diagnostics(MXS_ROUTER* pInstance, DCB* pDcb)
{
RouterType* pRouter = static_cast<RouterType*>(pInstance);
MXS_EXCEPTION_GUARD(pRouter->diagnostics(pDcb));
}
static void clientReply(MXS_ROUTER*, MXS_ROUTER_SESSION* pData, GWBUF* pPacket, DCB* pBackend)
{
RouterSessionType* pRouter_session = static_cast<RouterSessionType*>(pData);
MXS_EXCEPTION_GUARD(pRouter_session->clientReply(pPacket, pBackend));
}
static void handleError(MXS_ROUTER* pInstance,
MXS_ROUTER_SESSION* pData,
GWBUF* pMessage,
DCB* pProblem,
mxs_error_action_t action,
bool* pSuccess)
{
RouterSessionType* pRouter_session = static_cast<RouterSessionType*>(pData);
MXS_EXCEPTION_GUARD(pRouter_session->handleError(pMessage, pProblem, action, pSuccess));
}
static uint64_t getCapabilities(MXS_ROUTER* pInstance)
{
uint64_t rv = 0;
RouterType* pRouter = static_cast<RouterType*>(pInstance);
MXS_EXCEPTION_GUARD(rv = pRouter->getCapabilities());
return rv;
}
static void destroyInstance(MXS_ROUTER* pInstance)
{
RouterType* pRouter = static_cast<RouterType*>(pInstance);
MXS_EXCEPTION_GUARD(delete pRouter);
}
static MXS_ROUTER_OBJECT s_object;
protected:
Router(SERVICE *pService)
: m_pService(pService)
{
}
SERVICE* m_pService;
};
template<class RouterType, class RouterSessionType>
MXS_ROUTER_OBJECT Router<RouterType, RouterSessionType>::s_object =
{
&Router<RouterType, RouterSessionType>::createInstance,
&Router<RouterType, RouterSessionType>::newSession,
&Router<RouterType, RouterSessionType>::closeSession,
&Router<RouterType, RouterSessionType>::freeSession,
&Router<RouterType, RouterSessionType>::routeQuery,
&Router<RouterType, RouterSessionType>::diagnostics,
&Router<RouterType, RouterSessionType>::clientReply,
&Router<RouterType, RouterSessionType>::handleError,
&Router<RouterType, RouterSessionType>::getCapabilities,
&Router<RouterType, RouterSessionType>::destroyInstance,
};
}

View File

@ -1,4 +1,4 @@
add_library(maxscale-common SHARED adminusers.c alloc.c authenticator.c atomic.c buffer.c config.c config_runtime.c dcb.c filter.c filter.cc externcmd.c paths.c hashtable.c hint.c housekeeper.c load_utils.c log_manager.cc maxscale_pcre2.c misc.c mlist.c modutil.c monitor.c queuemanager.c query_classifier.c poll.c random_jkiss.c resultset.c secrets.c server.c service.c session.c spinlock.c thread.c users.c utils.c skygw_utils.cc statistics.c listener.c ssl.c mysql_utils.c mysql_binlog.c modulecmd.c ) add_library(maxscale-common SHARED adminusers.c alloc.c authenticator.c atomic.c buffer.c config.c config_runtime.c dcb.c filter.c filter.cc externcmd.c paths.c hashtable.c hint.c housekeeper.c load_utils.c log_manager.cc maxscale_pcre2.c misc.c mlist.c modutil.c monitor.c queuemanager.c query_classifier.c poll.c random_jkiss.c resultset.c router.cc secrets.c server.c service.c session.c spinlock.c thread.c users.c utils.c skygw_utils.cc statistics.c listener.c ssl.c mysql_utils.c mysql_binlog.c modulecmd.c )
if(WITH_JEMALLOC) if(WITH_JEMALLOC)
target_link_libraries(maxscale-common ${JEMALLOC_LIBRARIES}) target_link_libraries(maxscale-common ${JEMALLOC_LIBRARIES})

View File

@ -97,6 +97,7 @@ const char* column_type_to_string(uint8_t type)
case TABLE_COL_TYPE_GEOMETRY: case TABLE_COL_TYPE_GEOMETRY:
return "GEOMETRY"; return "GEOMETRY";
default: default:
ss_dassert(false);
break; break;
} }
return "UNKNOWN"; return "UNKNOWN";
@ -215,7 +216,6 @@ static void unpack_year(uint8_t *ptr, struct tm *dest)
dest->tm_year = *ptr; dest->tm_year = *ptr;
} }
#ifdef USE_OLD_DATETIME
/** /**
* @brief Unpack a DATETIME * @brief Unpack a DATETIME
* *
@ -224,8 +224,10 @@ static void unpack_year(uint8_t *ptr, struct tm *dest)
* @param val Value read from the binary log * @param val Value read from the binary log
* @param dest Pointer where the unpacked value is stored * @param dest Pointer where the unpacked value is stored
*/ */
static void unpack_datetime(uint8_t *ptr, uint8_t decimals, struct tm *dest) static void unpack_datetime(uint8_t *ptr, struct tm *dest)
{ {
uint64_t val = 0;
memcpy(&val, ptr, sizeof(val));
uint32_t second = val - ((val / 100) * 100); uint32_t second = val - ((val / 100) * 100);
val /= 100; val /= 100;
uint32_t minute = val - ((val / 100) * 100); uint32_t minute = val - ((val / 100) * 100);
@ -240,13 +242,12 @@ static void unpack_datetime(uint8_t *ptr, uint8_t decimals, struct tm *dest)
memset(dest, 0, sizeof(struct tm)); memset(dest, 0, sizeof(struct tm));
dest->tm_year = year - 1900; dest->tm_year = year - 1900;
dest->tm_mon = month; dest->tm_mon = month - 1;
dest->tm_mday = day; dest->tm_mday = day;
dest->tm_hour = hour; dest->tm_hour = hour;
dest->tm_min = minute; dest->tm_min = minute;
dest->tm_sec = second; dest->tm_sec = second;
} }
#endif
/** /**
* Unpack a 5 byte reverse byte order value * Unpack a 5 byte reverse byte order value
@ -412,6 +413,8 @@ static size_t temporal_field_size(uint8_t type, uint8_t decimals)
return 3 + ((decimals + 1) / 2); return 3 + ((decimals + 1) / 2);
case TABLE_COL_TYPE_DATETIME: case TABLE_COL_TYPE_DATETIME:
return 8;
case TABLE_COL_TYPE_TIMESTAMP: case TABLE_COL_TYPE_TIMESTAMP:
return 4; return 4;
@ -447,8 +450,7 @@ size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, stru
break; break;
case TABLE_COL_TYPE_DATETIME: case TABLE_COL_TYPE_DATETIME:
// This is not used with MariaDB RBR unpack_datetime(ptr, tm);
//unpack_datetime(ptr, *metadata, tm);
break; break;
case TABLE_COL_TYPE_DATETIME2: case TABLE_COL_TYPE_DATETIME2:
@ -467,6 +469,10 @@ size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, stru
case TABLE_COL_TYPE_TIMESTAMP2: case TABLE_COL_TYPE_TIMESTAMP2:
unpack_timestamp(ptr, *metadata, tm); unpack_timestamp(ptr, *metadata, tm);
break; break;
default:
ss_dassert(false);
break;
} }
return temporal_field_size(type, *metadata); return temporal_field_size(type, *metadata);
} }
@ -596,6 +602,10 @@ static uint64_t unpack_bytes(uint8_t *ptr, size_t bytes)
((uint64_t)ptr[3] << 32) | ((uint64_t)ptr[2] << 40) | ((uint64_t)ptr[3] << 32) | ((uint64_t)ptr[2] << 40) |
((uint64_t)ptr[1] << 48) | ((uint64_t)ptr[0] << 56); ((uint64_t)ptr[1] << 48) | ((uint64_t)ptr[0] << 56);
break; break;
default:
ss_dassert(false);
break;
} }
return val; return val;

51
server/core/router.cc Normal file
View File

@ -0,0 +1,51 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include <maxscale/router.hh>
namespace maxscale
{
//
// RouterSession
//
RouterSession::RouterSession(MXS_SESSION* pSession)
: m_pSession(pSession)
{
}
RouterSession::~RouterSession()
{
}
void RouterSession::close()
{
}
int32_t RouterSession::routeQuery(GWBUF* pPacket)
{
return 0;
}
void RouterSession::clientReply(GWBUF* pPacket, DCB* pBackend)
{
}
void RouterSession::handleError(GWBUF* pMessage,
DCB* pProblem,
mxs_error_action_t action,
bool* pSuccess)
{
}
}

View File

@ -20,6 +20,7 @@
#include <pthread.h> #include <pthread.h>
#include <semaphore.h> #include <semaphore.h>
#include <maxscale/log_manager.h> #include <maxscale/log_manager.h>
#include <maxscale/random_jkiss.h>
using std::cerr; using std::cerr;
using std::cout; using std::cout;
@ -152,7 +153,7 @@ int main(int argc, char* argv[])
int rc; int rc;
std::ios::sync_with_stdio(); std::ios::sync_with_stdio();
random_jkiss_init();
rc = sem_init(&u_semstart, 0, 0); rc = sem_init(&u_semstart, 0, 0);
ensure(rc == 0); ensure(rc == 0);

View File

@ -66,6 +66,7 @@ test1()
int input_counter = 0; int input_counter = 0;
int output_counter = 0; int output_counter = 0;
random_jkiss_init();
hkheartbeat = 0; hkheartbeat = 0;
queue = mxs_queue_alloc(TEST_QUEUE_SIZE, HEARTBEATS_TO_EXPIRE); queue = mxs_queue_alloc(TEST_QUEUE_SIZE, HEARTBEATS_TO_EXPIRE);

View File

@ -27,7 +27,7 @@
%pure-parser %pure-parser
/** Prefix all functions */ /** Prefix all functions */
%name-prefix="dbfw_yy" %name-prefix "dbfw_yy"
/** The pure parser requires one extra parameter */ /** The pure parser requires one extra parameter */
%parse-param {void* scanner} %parse-param {void* scanner}

View File

@ -74,10 +74,10 @@ typedef struct
char* script; /*< Script to call when state changes occur on servers */ char* script; /*< Script to call when state changes occur on servers */
uint64_t events; /*< enabled events */ uint64_t events; /*< enabled events */
HASHTABLE *server_info; /**< Contains server specific information */ HASHTABLE *server_info; /**< Contains server specific information */
bool failover; /**< If simple failover is enabled */ bool detect_standalone_master; /**< If standalone master are detected */
int failcount; /**< How many monitoring cycles servers must be int failcount; /**< How many monitoring cycles servers must be
down before failover is initiated */ down before failover is initiated */
bool failover_recovery; /**< Allow servers to rejoin the cluster in failover mode */ bool allow_cluster_recovery; /**< Allow failed servers to rejoin the cluster */
bool warn_failover; /**< Log a warning when failover happens */ bool warn_failover; /**< Log a warning when failover happens */
} MYSQL_MONITOR; } MYSQL_MONITOR;

View File

@ -125,9 +125,9 @@ MXS_MODULE* MXS_CREATE_MODULE()
{"detect_stale_slave", MXS_MODULE_PARAM_BOOL, "true"}, {"detect_stale_slave", MXS_MODULE_PARAM_BOOL, "true"},
{"mysql51_replication", MXS_MODULE_PARAM_BOOL, "false"}, {"mysql51_replication", MXS_MODULE_PARAM_BOOL, "false"},
{"multimaster", MXS_MODULE_PARAM_BOOL, "false"}, {"multimaster", MXS_MODULE_PARAM_BOOL, "false"},
{"failover", MXS_MODULE_PARAM_BOOL, "false"}, {"detect_standalone_master", MXS_MODULE_PARAM_BOOL, "false"},
{"failcount", MXS_MODULE_PARAM_COUNT, "5"}, {"failcount", MXS_MODULE_PARAM_COUNT, "5"},
{"failover_recovery", MXS_MODULE_PARAM_BOOL, "true"}, {"allow_cluster_recovery", MXS_MODULE_PARAM_BOOL, "true"},
{ {
"script", "script",
MXS_MODULE_PARAM_PATH, MXS_MODULE_PARAM_PATH,
@ -279,9 +279,9 @@ startMonitor(MXS_MONITOR *monitor, const MXS_CONFIG_PARAMETER* params)
handle->detectStaleSlave = config_get_bool(params, "detect_stale_slave"); handle->detectStaleSlave = config_get_bool(params, "detect_stale_slave");
handle->replicationHeartbeat = config_get_bool(params, "detect_replication_lag"); handle->replicationHeartbeat = config_get_bool(params, "detect_replication_lag");
handle->multimaster = config_get_bool(params, "multimaster"); handle->multimaster = config_get_bool(params, "multimaster");
handle->failover = config_get_bool(params, "failover"); handle->detect_standalone_master = config_get_bool(params, "detect_standalone_master");
handle->failcount = config_get_integer(params, "failcount"); handle->failcount = config_get_integer(params, "failcount");
handle->failover_recovery = config_get_bool(params, "failover_recovery"); handle->allow_cluster_recovery = config_get_bool(params, "allow_cluster_recovery");
handle->mysql51_replication = config_get_bool(params, "mysql51_replication"); handle->mysql51_replication = config_get_bool(params, "mysql51_replication");
handle->script = config_copy_string(params, "script"); handle->script = config_copy_string(params, "script");
handle->events = config_get_enum(params, "events", mxs_monitor_event_enum_values); handle->events = config_get_enum(params, "events", mxs_monitor_event_enum_values);
@ -1010,7 +1010,7 @@ void do_failover(MYSQL_MONITOR *handle, MXS_MONITOR_SERVERS *db)
{ {
MXS_WARNING("Failover initiated, server '%s' is now the master.%s", MXS_WARNING("Failover initiated, server '%s' is now the master.%s",
db->server->unique_name, db->server->unique_name,
handle->failover_recovery ? handle->allow_cluster_recovery ?
"" : " All other servers are set into maintenance mode."); "" : " All other servers are set into maintenance mode.");
handle->warn_failover = false; handle->warn_failover = false;
} }
@ -1019,7 +1019,7 @@ void do_failover(MYSQL_MONITOR *handle, MXS_MONITOR_SERVERS *db)
monitor_set_pending_status(db, SERVER_MASTER); monitor_set_pending_status(db, SERVER_MASTER);
monitor_clear_pending_status(db, SERVER_SLAVE); monitor_clear_pending_status(db, SERVER_SLAVE);
} }
else if (!handle->failover_recovery) else if (!handle->allow_cluster_recovery)
{ {
server_set_status_nolock(db->server, SERVER_MAINT); server_set_status_nolock(db->server, SERVER_MAINT);
monitor_set_pending_status(db, SERVER_MAINT); monitor_set_pending_status(db, SERVER_MAINT);
@ -1298,7 +1298,7 @@ monitorMain(void *arg)
/** Now that all servers have their status correctly set, we can check /** Now that all servers have their status correctly set, we can check
if we need to do a failover */ if we need to do a failover */
if (handle->failover) if (handle->detect_standalone_master)
{ {
if (failover_required(handle, mon->databases)) if (failover_required(handle, mon->databases))
{ {

View File

@ -602,6 +602,34 @@ gw_read_backend_event(DCB *dcb)
return rc; return rc;
} }
static void do_handle_error(DCB *dcb, mxs_error_action_t action, const char *errmsg)
{
bool succp = false;
MXS_SESSION *session = dcb->session;
if (!dcb->dcb_errhandle_called)
{
GWBUF *errbuf = mysql_create_custom_error(1, 0, errmsg);
void *rsession = session->router_session;
MXS_ROUTER_OBJECT *router = session->service->router;
MXS_ROUTER *router_instance = session->service->router_instance;
router->handleError(router_instance, rsession, errbuf,
dcb, action, &succp);
gwbuf_free(errbuf);
dcb->dcb_errhandle_called = true;
}
/**
* If error handler fails it means that routing session can't continue
* and it must be closed. In success, only this DCB is closed.
*/
if (!succp)
{
session->state = SESSION_STATE_STOPPING;
}
}
/** /**
* @brief Authentication of backend - read the reply, or handle an error * @brief Authentication of backend - read the reply, or handle an error
* *
@ -609,43 +637,21 @@ gw_read_backend_event(DCB *dcb)
* @param local_session The current MySQL session data structure * @param local_session The current MySQL session data structure
* @return * @return
*/ */
static void static void gw_reply_on_error(DCB *dcb, mxs_auth_state_t state)
gw_reply_on_error(DCB *dcb, mxs_auth_state_t state)
{ {
MXS_SESSION *session = dcb->session; MXS_SESSION *session = dcb->session;
CHK_SESSION(session); CHK_SESSION(session);
/* Only reload the users table if authentication failed and the
* client session is not stopping. It is possible that authentication
* fails because the client has closed the connection before all
* backends have done authentication. */
if (state == MXS_AUTH_STATE_FAILED && session->state != SESSION_STATE_STOPPING)
{
service_refresh_users(session->service);
}
GWBUF* errbuf = mysql_create_custom_error(1, 0, "Authentication with backend "
"failed. Session will be closed.");
if (session->router_session) if (session->router_session)
{ {
bool succp = false; do_handle_error(dcb, ERRACT_REPLY_CLIENT,
"Authentication with backend failed. Session will be closed.");
session->service->router->handleError(session->service->router_instance,
session->router_session,
errbuf, dcb, ERRACT_REPLY_CLIENT, &succp);
session->state = SESSION_STATE_STOPPING; session->state = SESSION_STATE_STOPPING;
ss_dassert(dcb->dcb_errhandle_called);
} }
else else
{ {
/** A NULL router_session is valid if a router declares the
* RCAP_TYPE_NO_RSESSION capability flag */
dcb->dcb_errhandle_called = true; dcb->dcb_errhandle_called = true;
} }
gwbuf_free(errbuf);
} }
/** /**
@ -712,28 +718,7 @@ gw_read_and_write(DCB *dcb)
if (return_code < 0) if (return_code < 0)
{ {
GWBUF* errbuf; do_handle_error(dcb, ERRACT_NEW_CONNECTION, "Read from backend failed");
bool succp;
#if defined(SS_DEBUG)
MXS_ERROR("Backend read error handling #2.");
#endif
errbuf = mysql_create_custom_error(1,
0,
"Read from backend failed");
session->service->router->handleError(
session->service->router_instance,
session->router_session,
errbuf,
dcb,
ERRACT_NEW_CONNECTION,
&succp);
gwbuf_free(errbuf);
if (!succp)
{
session->state = SESSION_STATE_STOPPING;
}
return 0; return 0;
} }
@ -1113,18 +1098,11 @@ static int gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
*/ */
static int gw_error_backend_event(DCB *dcb) static int gw_error_backend_event(DCB *dcb)
{ {
MXS_SESSION* session;
void* rsession;
MXS_ROUTER_OBJECT* router;
MXS_ROUTER* router_instance;
GWBUF* errbuf;
bool succp;
mxs_session_state_t ses_state;
CHK_DCB(dcb); CHK_DCB(dcb);
session = dcb->session; MXS_SESSION *session = dcb->session;
CHK_SESSION(session); CHK_SESSION(session);
if (SESSION_STATE_DUMMY == session->state)
if (session->state == SESSION_STATE_DUMMY)
{ {
if (dcb->persistentstart == 0) if (dcb->persistentstart == 0)
{ {
@ -1133,81 +1111,32 @@ static int gw_error_backend_event(DCB *dcb)
"Closing connection."); "Closing connection.");
} }
dcb_close(dcb); dcb_close(dcb);
return 1;
} }
rsession = session->router_session; else if (dcb->state != DCB_STATE_POLLING || session->state != SESSION_STATE_ROUTER_READY)
router = session->service->router; {
router_instance = session->service->router_instance; int error;
int len = sizeof(error);
/** if (getsockopt(dcb->fd, SOL_SOCKET, SO_ERROR, &error, (socklen_t *) & len) == 0 && error != 0)
* Avoid running redundant error handling procedure. {
* dcb_close is already called for the DCB. Thus, either connection is char errstring[MXS_STRERROR_BUFLEN];
* closed by router and COM_QUIT sent or there was an error which
* have already been handled.
*/
if (dcb->state != DCB_STATE_POLLING) if (dcb->state != DCB_STATE_POLLING)
{ {
int error, len; MXS_ERROR("DCB in state %s got error '%s'.", STRDCBSTATE(dcb->state),
len = sizeof(error);
if (getsockopt(dcb->fd, SOL_SOCKET, SO_ERROR, &error, (socklen_t *) & len) == 0)
{
if (error != 0)
{
char errstring[MXS_STRERROR_BUFLEN];
MXS_ERROR("DCB in state %s got error '%s'.",
STRDCBSTATE(dcb->state),
strerror_r(error, errstring, sizeof(errstring))); strerror_r(error, errstring, sizeof(errstring)));
} }
} else
return 1;
}
errbuf = mysql_create_custom_error(1,
0,
"Lost connection to backend server.");
ses_state = session->state;
if (ses_state != SESSION_STATE_ROUTER_READY)
{ {
int error, len;
len = sizeof(error);
if (getsockopt(dcb->fd, SOL_SOCKET, SO_ERROR, &error, (socklen_t *) & len) == 0)
{
if (error != 0)
{
char errstring[MXS_STRERROR_BUFLEN];
MXS_ERROR("Error '%s' in session that is not ready for routing.", MXS_ERROR("Error '%s' in session that is not ready for routing.",
strerror_r(error, errstring, sizeof(errstring))); strerror_r(error, errstring, sizeof(errstring)));
} }
} }
gwbuf_free(errbuf);
goto retblock;
} }
else
#if defined(SS_DEBUG)
MXS_INFO("Backend error event handling.");
#endif
router->handleError(router_instance,
rsession,
errbuf,
dcb,
ERRACT_NEW_CONNECTION,
&succp);
gwbuf_free(errbuf);
/**
* If error handler fails it means that routing session can't continue
* and it must be closed. In success, only this DCB is closed.
*/
if (!succp)
{ {
session->state = SESSION_STATE_STOPPING; do_handle_error(dcb, ERRACT_NEW_CONNECTION, "Lost connection to backend server.");
} }
retblock:
return 1; return 1;
} }
@ -1223,47 +1152,21 @@ retblock:
*/ */
static int gw_backend_hangup(DCB *dcb) static int gw_backend_hangup(DCB *dcb)
{ {
MXS_SESSION* session;
void* rsession;
MXS_ROUTER_OBJECT* router;
MXS_ROUTER* router_instance;
bool succp;
GWBUF* errbuf;
mxs_session_state_t ses_state;
CHK_DCB(dcb); CHK_DCB(dcb);
MXS_SESSION *session = dcb->session;
CHK_SESSION(session);
if (dcb->persistentstart) if (dcb->persistentstart)
{ {
dcb->dcb_errhandle_called = true; dcb->dcb_errhandle_called = true;
goto retblock;
} }
session = dcb->session; else if (session->state != SESSION_STATE_ROUTER_READY)
if (session == NULL)
{ {
goto retblock; int error;
} int len = sizeof(error);
CHK_SESSION(session);
rsession = session->router_session;
router = session->service->router;
router_instance = session->service->router_instance;
errbuf = mysql_create_custom_error(1,
0,
"Lost connection to backend server.");
ses_state = session->state;
if (ses_state != SESSION_STATE_ROUTER_READY)
{
int error, len;
len = sizeof(error);
if (getsockopt(dcb->fd, SOL_SOCKET, SO_ERROR, &error, (socklen_t *) & len) == 0) if (getsockopt(dcb->fd, SOL_SOCKET, SO_ERROR, &error, (socklen_t *) & len) == 0)
{ {
if (error != 0 && ses_state != SESSION_STATE_STOPPING) if (error != 0 && session->state != SESSION_STATE_STOPPING)
{ {
char errstring[MXS_STRERROR_BUFLEN]; char errstring[MXS_STRERROR_BUFLEN];
MXS_ERROR("Hangup in session that is not ready for routing, " MXS_ERROR("Hangup in session that is not ready for routing, "
@ -1271,31 +1174,12 @@ static int gw_backend_hangup(DCB *dcb)
strerror_r(error, errstring, sizeof(errstring))); strerror_r(error, errstring, sizeof(errstring)));
} }
} }
gwbuf_free(errbuf);
/*
* I'm pretty certain this is best removed and
* causes trouble if present, but have left it
* here just for now as a comment. Martin
*/
/* dcb_close(dcb); */
goto retblock;
} }
else
router->handleError(router_instance,
rsession,
errbuf,
dcb,
ERRACT_NEW_CONNECTION,
&succp);
gwbuf_free(errbuf);
/** There are no required backends available, close session. */
if (!succp)
{ {
session->state = SESSION_STATE_STOPPING; do_handle_error(dcb, ERRACT_NEW_CONNECTION, "Lost connection to backend server.");
} }
retblock:
return 1; return 1;
} }
@ -1400,29 +1284,7 @@ static int backend_write_delayqueue(DCB *dcb, GWBUF *buffer)
if (rc == 0) if (rc == 0)
{ {
MXS_SESSION *session = dcb->session; do_handle_error(dcb, ERRACT_NEW_CONNECTION, "Lost connection to backend server.");
CHK_SESSION(session);
MXS_ROUTER_OBJECT *router = session->service->router;
MXS_ROUTER *router_instance = session->service->router_instance;
void *rsession = session->router_session;
bool succp = false;
GWBUF* errbuf = mysql_create_custom_error(
1, 0, "Failed to write buffered data to back-end server. "
"Buffer was empty or back-end was disconnected during "
"operation. Attempting to find a new backend.");
router->handleError(router_instance,
rsession,
errbuf,
dcb,
ERRACT_NEW_CONNECTION,
&succp);
gwbuf_free(errbuf);
if (!succp)
{
session->state = SESSION_STATE_STOPPING;
}
} }
return rc; return rc;

View File

@ -1019,36 +1019,11 @@ gw_read_finish_processing(DCB *dcb, GWBUF *read_buffer, uint64_t capabilities)
/* else return_code is still 0 from when it was originally set */ /* else return_code is still 0 from when it was originally set */
/* Note that read_buffer has been freed or transferred by this point */ /* Note that read_buffer has been freed or transferred by this point */
/** Routing failed */
if (return_code != 0) if (return_code != 0)
{ {
bool router_can_continue; /** Routing failed, close the client connection */
GWBUF* errbuf; dcb_close(dcb);
/** MXS_ERROR("Routing the query failed. Session will be closed.");
* Create error to be sent to client if session
* can't be continued.
*/
errbuf = mysql_create_custom_error(1, 0,
"Routing failed. Session is closed.");
/**
* Ensure that there are enough backends
* available for router to continue operation.
*/
session->service->router->handleError(session->service->router_instance,
session->router_session,
errbuf,
dcb,
ERRACT_NEW_CONNECTION,
&router_can_continue);
gwbuf_free(errbuf);
/**
* If the router cannot continue, close session
*/
if (!router_can_continue)
{
MXS_ERROR("Routing the query failed. "
"Session will be closed.");
}
} }
if (proto->current_command == MYSQL_COM_QUIT) if (proto->current_command == MYSQL_COM_QUIT)

View File

@ -7,6 +7,7 @@ endif()
add_subdirectory(cli) add_subdirectory(cli)
add_subdirectory(debugcli) add_subdirectory(debugcli)
add_subdirectory(hintrouter)
add_subdirectory(maxinfo) add_subdirectory(maxinfo)
add_subdirectory(readconnroute) add_subdirectory(readconnroute)
add_subdirectory(readwritesplit) add_subdirectory(readwritesplit)

View File

@ -193,6 +193,7 @@ MXS_MODULE* MXS_CREATE_MODULE()
{"group_rows", MXS_MODULE_PARAM_COUNT, "1000"}, {"group_rows", MXS_MODULE_PARAM_COUNT, "1000"},
{"group_trx", MXS_MODULE_PARAM_COUNT, "1"}, {"group_trx", MXS_MODULE_PARAM_COUNT, "1"},
{"start_index", MXS_MODULE_PARAM_COUNT, "1"}, {"start_index", MXS_MODULE_PARAM_COUNT, "1"},
{"block_size", MXS_MODULE_PARAM_COUNT, "0"},
{"codec", MXS_MODULE_PARAM_ENUM, "null", MXS_MODULE_OPT_ENUM_UNIQUE, codec_values}, {"codec", MXS_MODULE_PARAM_ENUM, "null", MXS_MODULE_OPT_ENUM_UNIQUE, codec_values},
{MXS_END_MODULE_PARAMS} {MXS_END_MODULE_PARAMS}
} }
@ -416,6 +417,7 @@ createInstance(SERVICE *service, char **options)
inst->trx_target = config_get_integer(params, "group_trx"); inst->trx_target = config_get_integer(params, "group_trx");
inst->codec = config_get_enum(params, "codec", codec_values); inst->codec = config_get_enum(params, "codec", codec_values);
int first_file = config_get_integer(params, "start_index"); int first_file = config_get_integer(params, "start_index");
inst->block_size = config_get_integer(params, "block_size");
MXS_CONFIG_PARAMETER *param = config_get_param(params, "source"); MXS_CONFIG_PARAMETER *param = config_get_param(params, "source");
bool err = false; bool err = false;
@ -490,6 +492,10 @@ createInstance(SERVICE *service, char **options)
{ {
first_file = MXS_MAX(1, atoi(value)); first_file = MXS_MAX(1, atoi(value));
} }
else if (strcmp(options[i], "block_size") == 0)
{
inst->block_size = atoi(value);
}
else else
{ {
MXS_WARNING("Unknown router option: '%s'", options[i]); MXS_WARNING("Unknown router option: '%s'", options[i]);
@ -1065,14 +1071,20 @@ void converter_func(void* data)
while (!router->service->svc_do_shutdown && ok && binlog_end == AVRO_OK) while (!router->service->svc_do_shutdown && ok && binlog_end == AVRO_OK)
{ {
uint64_t start_pos = router->current_pos; uint64_t start_pos = router->current_pos;
char binlog_name[BINLOG_FNAMELEN + 1];
strcpy(binlog_name, router->binlog_name);
if (avro_open_binlog(router->binlogdir, router->binlog_name, &router->binlog_fd)) if (avro_open_binlog(router->binlogdir, router->binlog_name, &router->binlog_fd))
{ {
binlog_end = avro_read_all_events(router); binlog_end = avro_read_all_events(router);
if (router->current_pos != start_pos) if (router->current_pos != start_pos || strcmp(binlog_name, router->binlog_name) != 0)
{ {
/** We processed some data, reset the conversion task delay */ /** We processed some data, reset the conversion task delay */
router->task_delay = 1; router->task_delay = 1;
/** Update the GTID index */
avro_update_index(router);
} }
avro_close_binlog(router->binlog_fd); avro_close_binlog(router->binlog_fd);

View File

@ -106,7 +106,7 @@ void avro_close_binlog(int fd)
* @param filepath Path to the created file * @param filepath Path to the created file
* @param json_schema The schema of the table in JSON format * @param json_schema The schema of the table in JSON format
*/ */
AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema, const char *codec) AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema, const char *codec, size_t block_size)
{ {
AVRO_TABLE *table = MXS_CALLOC(1, sizeof(AVRO_TABLE)); AVRO_TABLE *table = MXS_CALLOC(1, sizeof(AVRO_TABLE));
if (table) if (table)
@ -128,7 +128,7 @@ AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema, cons
else else
{ {
rc = avro_file_writer_create_with_codec(filepath, table->avro_schema, rc = avro_file_writer_create_with_codec(filepath, table->avro_schema,
&table->avro_file, codec, 0); &table->avro_file, codec, block_size);
} }
if (rc) if (rc)
@ -884,12 +884,6 @@ void avro_flush_all_tables(AVRO_INSTANCE *router, enum avrorouter_file_op flush)
} }
hashtable_iterator_free(iter); hashtable_iterator_free(iter);
} }
/** Update the GTID index */
if (flush == AVROROUTER_FLUSH)
{
avro_update_index(router);
}
} }
/** /**

View File

@ -122,7 +122,8 @@ bool handle_table_map_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr
/** Close the file and open a new one */ /** Close the file and open a new one */
hashtable_delete(router->open_tables, table_ident); hashtable_delete(router->open_tables, table_ident);
AVRO_TABLE *avro_table = avro_table_alloc(filepath, json_schema, AVRO_TABLE *avro_table = avro_table_alloc(filepath, json_schema,
codec_to_string(router->codec)); codec_to_string(router->codec),
router->block_size);
if (avro_table) if (avro_table)
{ {
@ -306,14 +307,19 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
* beforehand so we must continue processing them until we reach the end * beforehand so we must continue processing them until we reach the end
* of the event. */ * of the event. */
int rows = 0; int rows = 0;
while (ptr - start < hdr->event_size - BINLOG_EVENT_HDR_LEN) while (ptr - start < hdr->event_size - BINLOG_EVENT_HDR_LEN)
{ {
/** Add the current GTID and timestamp */ /** Add the current GTID and timestamp */
uint8_t *end = ptr + hdr->event_size; uint8_t *end = ptr + hdr->event_size - BINLOG_EVENT_HDR_LEN;
int event_type = get_event_type(hdr->event_type); int event_type = get_event_type(hdr->event_type);
prepare_record(router, hdr, event_type, &record); prepare_record(router, hdr, event_type, &record);
ptr = process_row_event_data(map, create, &record, ptr, col_present, end); ptr = process_row_event_data(map, create, &record, ptr, col_present, end);
avro_file_writer_append_value(table->avro_file, &record); if (avro_file_writer_append_value(table->avro_file, &record))
{
MXS_ERROR("Failed to write value at position %ld: %s",
router->current_pos, avro_strerror());
}
/** Update rows events have the before and after images of the /** Update rows events have the before and after images of the
* affected rows so we'll process them as another record with * affected rows so we'll process them as another record with
@ -322,7 +328,11 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
{ {
prepare_record(router, hdr, UPDATE_EVENT_AFTER, &record); prepare_record(router, hdr, UPDATE_EVENT_AFTER, &record);
ptr = process_row_event_data(map, create, &record, ptr, col_present, end); ptr = process_row_event_data(map, create, &record, ptr, col_present, end);
avro_file_writer_append_value(table->avro_file, &record); if (avro_file_writer_append_value(table->avro_file, &record))
{
MXS_ERROR("Failed to write value at position %ld: %s",
router->current_pos, avro_strerror());
}
} }
rows++; rows++;
@ -518,15 +528,24 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
for (long i = 0; i < map->columns && npresent < ncolumns; i++) for (long i = 0; i < map->columns && npresent < ncolumns; i++)
{ {
ss_dassert(create->columns == map->columns); ss_dassert(create->columns == map->columns);
avro_value_get_by_name(record, create->column_names[i], &field, NULL); ss_debug(int rc = )avro_value_get_by_name(record, create->column_names[i], &field, NULL);
ss_dassert(rc == 0);
if (bit_is_set(columns_present, ncolumns, i)) if (bit_is_set(columns_present, ncolumns, i))
{ {
npresent++; npresent++;
if (bit_is_set(null_bitmap, ncolumns, i)) if (bit_is_set(null_bitmap, ncolumns, i))
{
if (column_is_blob(map->column_types[i]))
{
uint8_t nullvalue = 0;
avro_value_set_bytes(&field, &nullvalue, 1);
}
else
{ {
avro_value_set_null(&field); avro_value_set_null(&field);
} }
}
else if (column_is_fixed_string(map->column_types[i])) else if (column_is_fixed_string(map->column_types[i]))
{ {
/** ENUM and SET are stored as STRING types with the type stored /** ENUM and SET are stored as STRING types with the type stored
@ -614,8 +633,16 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
uint64_t len = 0; uint64_t len = 0;
memcpy(&len, ptr, bytes); memcpy(&len, ptr, bytes);
ptr += bytes; ptr += bytes;
if (len)
{
avro_value_set_bytes(&field, ptr, len); avro_value_set_bytes(&field, ptr, len);
ptr += len; ptr += len;
}
else
{
uint8_t nullvalue = 0;
avro_value_set_bytes(&field, &nullvalue, 1);
}
ss_dassert(ptr < end); ss_dassert(ptr < end);
} }
else if (column_is_temporal(map->column_types[i])) else if (column_is_temporal(map->column_types[i]))

View File

@ -281,6 +281,7 @@ typedef struct avro_instance
uint64_t row_count; /*< Row events processed */ uint64_t row_count; /*< Row events processed */
uint64_t row_target; /*< Minimum about of row events that will trigger uint64_t row_target; /*< Minimum about of row events that will trigger
* a flush of all tables */ * a flush of all tables */
uint64_t block_size; /**< Avro datablock size */
enum mxs_avro_codec_type codec; /**< Avro codec type, defaults to `null` */ enum mxs_avro_codec_type codec; /**< Avro codec type, defaults to `null` */
struct avro_instance *next; struct avro_instance *next;
} AVRO_INSTANCE; } AVRO_INSTANCE;
@ -299,7 +300,8 @@ extern void avro_client_rotate(AVRO_INSTANCE *router, AVRO_CLIENT *client, uint8
extern bool avro_open_binlog(const char *binlogdir, const char *file, int *fd); extern bool avro_open_binlog(const char *binlogdir, const char *file, int *fd);
extern void avro_close_binlog(int fd); extern void avro_close_binlog(int fd);
extern avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router); extern avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router);
extern AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema, const char *codec); extern AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema,
const char *codec, size_t block_size);
extern void avro_table_free(AVRO_TABLE *table); extern void avro_table_free(AVRO_TABLE *table);
extern char* json_new_schema_from_table(TABLE_MAP *map); extern char* json_new_schema_from_table(TABLE_MAP *map);
extern void save_avro_schema(const char *path, const char* schema, TABLE_MAP *map); extern void save_avro_schema(const char *path, const char* schema, TABLE_MAP *map);

View File

@ -1784,6 +1784,7 @@ errorReply(MXS_ROUTER *instance,
mxs_error_action_t action, mxs_error_action_t action,
bool *succp) bool *succp)
{ {
ss_dassert(backend_dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER);
ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance; ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance;
int error; int error;
socklen_t len; socklen_t len;
@ -1794,9 +1795,6 @@ errorReply(MXS_ROUTER *instance,
mysql_errno = (unsigned long) extract_field(((uint8_t *)GWBUF_DATA(message) + 5), 16); mysql_errno = (unsigned long) extract_field(((uint8_t *)GWBUF_DATA(message) + 5), 16);
errmsg = extract_message(message); errmsg = extract_message(message);
/** Don't handle same error twice on same DCB */
if (backend_dcb->dcb_errhandle_called)
{
/** Check router state and set errno an message */ /** Check router state and set errno an message */
if (router->master_state < BLRM_BINLOGDUMP || router->master_state != BLRM_SLAVE_STOPPED) if (router->master_state < BLRM_BINLOGDUMP || router->master_state != BLRM_SLAVE_STOPPED)
{ {
@ -1836,11 +1834,6 @@ errorReply(MXS_ROUTER *instance,
/** we optimistically assume that previous call succeed */ /** we optimistically assume that previous call succeed */
*succp = true; *succp = true;
return; return;
}
else
{
backend_dcb->dcb_errhandle_called = true;
}
len = sizeof(error); len = sizeof(error);
if (router->master && if (router->master &&

View File

@ -0,0 +1,8 @@
add_library(hintrouter SHARED
hintrouter.cc
hintroutersession.cc
)
target_link_libraries(hintrouter maxscale-common)
set_target_properties(hintrouter PROPERTIES VERSION "1.0.0")
install_module(hintrouter core)

View File

@ -0,0 +1,68 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#define MXS_MODULE_NAME "hintrouter"
#include "hintrouter.hh"
#include <maxscale/log_manager.h>
HintRouter::HintRouter(SERVICE* pService)
: maxscale::Router<HintRouter, HintRouterSession>(pService)
{
MXS_NOTICE("Hint router [%s] created.", pService->name);
}
//static
HintRouter* HintRouter::create(SERVICE* pService, char** pzOptions)
{
return new HintRouter(pService);
}
HintRouterSession* HintRouter::newSession(MXS_SESSION *pSession)
{
return new HintRouterSession(pSession, this);
}
void HintRouter::diagnostics(DCB* pOut)
{
}
uint64_t HintRouter::getCapabilities()
{
return 0;
}
extern "C" MXS_MODULE* MXS_CREATE_MODULE()
{
static MXS_MODULE module =
{
MXS_MODULE_API_ROUTER, /* Module type */
MXS_MODULE_BETA_RELEASE, /* Release status */
MXS_ROUTER_VERSION, /* Implemented module API version */
"A hint router", /* Description */
"V1.0.0", /* Module version */
&HintRouter::s_object,
NULL, /* Process init, can be null */
NULL, /* Process finish, can be null */
NULL, /* Thread init */
NULL, /* Thread finish */
{
{MXS_END_MODULE_PARAMS}
}
};
return &module;
}

View File

@ -0,0 +1,36 @@
#pragma once
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include <maxscale/cppdefs.hh>
#include <maxscale/router.hh>
#include "hintroutersession.hh"
class HintRouter : public maxscale::Router<HintRouter, HintRouterSession>
{
public:
static HintRouter* create(SERVICE* pService, char** pzOptions);
HintRouterSession* newSession(MXS_SESSION *pSession);
void diagnostics(DCB* pOut);
uint64_t getCapabilities();
private:
HintRouter(SERVICE* pService);
private:
HintRouter(const HintRouter&);
HintRouter& operator = (const HintRouter&);
};

View File

@ -0,0 +1,56 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#define MXS_MODULE_NAME "hintrouter"
#include "hintroutersession.hh"
#include <maxscale/log_manager.h>
HintRouterSession::HintRouterSession(MXS_SESSION* pSession, HintRouter* pRouter)
: maxscale::RouterSession(pSession)
, m_pRouter(pRouter)
{
}
HintRouterSession::~HintRouterSession()
{
}
void HintRouterSession::close()
{
}
int32_t HintRouterSession::routeQuery(GWBUF* pPacket)
{
MXS_ERROR("routeQuery not implemented yet.");
return 0;
}
void HintRouterSession::clientReply(GWBUF* pPacket, DCB* pBackend)
{
MXS_ERROR("clientReply not implemented yet.");
}
void HintRouterSession::handleError(GWBUF* pMessage,
DCB* pProblem,
mxs_error_action_t action,
bool* pSuccess)
{
ss_dassert(pProblem->dcb_role == DCB_ROLE_BACKEND_HANDLER);
MXS_ERROR("handleError not implemented yet.");
}

View File

@ -0,0 +1,45 @@
#pragma once
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include <maxscale/cppdefs.hh>
#include <maxscale/router.hh>
class HintRouter;
class HintRouterSession : public maxscale::RouterSession
{
public:
HintRouterSession(MXS_SESSION* pSession,
HintRouter* pRouter);
~HintRouterSession();
void close();
int32_t routeQuery(GWBUF* pPacket);
void clientReply(GWBUF* pPacket, DCB* pBackend);
void handleError(GWBUF* pMessage,
DCB* pProblem,
mxs_error_action_t action,
bool* pSuccess);
private:
HintRouterSession(const HintRouterSession&);
HintRouterSession& operator = (const HintRouterSession&);
private:
HintRouter* m_pRouter;
};

View File

@ -276,26 +276,13 @@ static void handleError(MXS_ROUTER *instance,
bool *succp) bool *succp)
{ {
ss_dassert(backend_dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER);
DCB *client_dcb; DCB *client_dcb;
MXS_SESSION *session = backend_dcb->session; MXS_SESSION *session = backend_dcb->session;
mxs_session_state_t sesstate;
/** Don't handle same error twice on same DCB */
if (backend_dcb->dcb_errhandle_called)
{
/** we optimistically assume that previous call succeed */
*succp = true;
return;
}
else
{
backend_dcb->dcb_errhandle_called = true;
}
sesstate = session->state;
client_dcb = session->client_dcb; client_dcb = session->client_dcb;
if (sesstate == SESSION_STATE_ROUTER_READY) if (session->state == SESSION_STATE_ROUTER_READY)
{ {
CHK_DCB(client_dcb); CHK_DCB(client_dcb);
client_dcb->func.write(client_dcb, gwbuf_clone(errbuf)); client_dcb->func.write(client_dcb, gwbuf_clone(errbuf));

View File

@ -679,23 +679,12 @@ static void handleError(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session
DCB *problem_dcb, mxs_error_action_t action, bool *succp) DCB *problem_dcb, mxs_error_action_t action, bool *succp)
{ {
ss_dassert(problem_dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER);
DCB *client_dcb; DCB *client_dcb;
MXS_SESSION *session = problem_dcb->session; MXS_SESSION *session = problem_dcb->session;
mxs_session_state_t sesstate; mxs_session_state_t sesstate;
ROUTER_CLIENT_SES *router_cli_ses = (ROUTER_CLIENT_SES *) router_session; ROUTER_CLIENT_SES *router_cli_ses = (ROUTER_CLIENT_SES *) router_session;
/** Don't handle same error twice on same DCB */
if (problem_dcb->dcb_errhandle_called)
{
/** we optimistically assume that previous call succeed */
*succp = true;
return;
}
else
{
problem_dcb->dcb_errhandle_called = true;
}
sesstate = session->state; sesstate = session->state;
client_dcb = session->client_dcb; client_dcb = session->client_dcb;
@ -706,11 +695,7 @@ static void handleError(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session
client_dcb->func.write(client_dcb, gwbuf_clone(errbuf)); client_dcb->func.write(client_dcb, gwbuf_clone(errbuf));
} }
if (DCB_ROLE_CLIENT_HANDLER == problem_dcb->dcb_role) if (router_cli_ses && problem_dcb == router_cli_ses->backend_dcb)
{
dcb_close(problem_dcb);
}
else if (router_cli_ses && problem_dcb == router_cli_ses->backend_dcb)
{ {
router_cli_ses->backend_dcb = NULL; router_cli_ses->backend_dcb = NULL;
dcb_close(problem_dcb); dcb_close(problem_dcb);

View File

@ -1180,6 +1180,7 @@ static void handleError(MXS_ROUTER *instance,
mxs_error_action_t action, mxs_error_action_t action,
bool *succp) bool *succp)
{ {
ss_dassert(problem_dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER);
ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *)instance; ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *)instance;
ROUTER_CLIENT_SES *rses = (ROUTER_CLIENT_SES *)router_session; ROUTER_CLIENT_SES *rses = (ROUTER_CLIENT_SES *)router_session;
CHK_CLIENT_RSES(rses); CHK_CLIENT_RSES(rses);
@ -1187,38 +1188,13 @@ static void handleError(MXS_ROUTER *instance,
if (rses->rses_closed) if (rses->rses_closed)
{ {
/** Session is already closed */
problem_dcb->dcb_errhandle_called = true;
*succp = false; *succp = false;
return; return;
} }
/** Don't handle same error twice on same DCB */
if (problem_dcb->dcb_errhandle_called)
{
/** we optimistically assume that previous call succeed */
/*
* The return of true is potentially misleading, but appears to
* be safe with the code as it stands on 9 Sept 2015 - MNB
*/
*succp = true;
return;
}
else
{
problem_dcb->dcb_errhandle_called = true;
}
MXS_SESSION *session = problem_dcb->session; MXS_SESSION *session = problem_dcb->session;
ss_dassert(session); ss_dassert(session);
if (problem_dcb->dcb_role == DCB_ROLE_CLIENT_HANDLER)
{
dcb_close(problem_dcb);
*succp = false;
}
else
{
backend_ref_t *bref = get_bref_from_dcb(rses, problem_dcb); backend_ref_t *bref = get_bref_from_dcb(rses, problem_dcb);
switch (action) switch (action)
@ -1332,7 +1308,6 @@ static void handleError(MXS_ROUTER *instance,
break; break;
} }
} }
}
/** /**
* @brief Handle an error reply for a client * @brief Handle an error reply for a client

View File

@ -3557,33 +3557,19 @@ static void handleError(MXS_ROUTER* instance,
mxs_error_action_t action, mxs_error_action_t action,
bool* succp) bool* succp)
{ {
ss_dassert(problem_dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER);
MXS_SESSION* session; MXS_SESSION* session;
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance; ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES *)router_session; ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES *)router_session;
CHK_DCB(problem_dcb); CHK_DCB(problem_dcb);
/** Don't handle same error twice on same DCB */
if (problem_dcb->dcb_errhandle_called)
{
/** we optimistically assume that previous call succeed */
*succp = true;
return;
}
else
{
problem_dcb->dcb_errhandle_called = true;
}
session = problem_dcb->session; session = problem_dcb->session;
if (session == NULL || rses == NULL) if (session == NULL || rses == NULL)
{ {
*succp = false; *succp = false;
} }
else if (DCB_ROLE_CLIENT_HANDLER == problem_dcb->dcb_role)
{
*succp = false;
}
else else
{ {
CHK_SESSION(session); CHK_SESSION(session);