Merge branch '2.2' into develop

This commit is contained in:
Markus Mäkelä
2018-05-16 11:46:45 +03:00
10 changed files with 183 additions and 365 deletions

View File

@ -663,6 +663,10 @@ add_test_executable(mxs1743_rconn_bitmask.cpp mxs1743_rconn_bitmask mxs1743_rcon
# https://jira.mariadb.org/browse/MXS-1751
add_test_executable(mxs1751_available_when_donor_crash.cpp mxs1751_available_when_donor_crash mxs1751_available_when_donor_crash LABELS galeramon GALERA_BACKEND)
# MXS-1760: With use_sql_variables_in=master, unknown PS errors are logged
# https://jira.mariadb.org/browse/MXS-1760
add_test_executable(mxs1760_use_sql_variables_in.cpp mxs1760_use_sql_variables_in mxs1760_use_sql_variables_in LABELS readwritesplit REPL_BACKEND)
# MXS-1773: Failing LOAD DATA LOCAL INFILE confuses readwritesplit
# https://jira.mariadb.org/browse/MXS-1773
add_test_executable(mxs1773_failing_ldli.cpp mxs1773_failing_ldli replication LABELS readwritesplit REPL_BACKEND)

View File

@ -1,316 +0,0 @@
#include "cdc_connector.h"
#include <arpa/inet.h>
#include <stdexcept>
#include <unistd.h>
#include <string.h>
#include <openssl/sha.h>
#include <sys/socket.h>
#include <sys/types.h>
#define CDC_CONNECTOR_VERSION "1.0.0"
#define ERRBUF_SIZE 512
#define READBUF_SIZE 1024
static const char OK_RESPONSE[] = "OK\n";
static const char CLOSE_MSG[] = "CLOSE";
static const char REGISTER_MSG[] = "REGISTER UUID=CDC_CONNECTOR-" CDC_CONNECTOR_VERSION ", TYPE=";
static const char REQUEST_MSG[] = "REQUEST-DATA ";
namespace
{
static inline int nointr_read(int fd, void *dest, size_t size)
{
int rc = read(fd, dest, size);
while (rc == -1 && errno == EINTR)
{
rc = read(fd, dest, size);
}
return rc;
}
static inline int nointr_write(int fd, const void *src, size_t size)
{
int rc = write(fd, src, size);
while (rc == -1 && errno == EINTR)
{
rc = write(fd, src, size);
}
return rc;
}
static std::string bin2hex(const uint8_t *data, size_t len)
{
std::string result;
static const char hexconvtab[] = "0123456789abcdef";
for (int i = 0; i < len; i++)
{
result += hexconvtab[data[i] >> 4];
result += hexconvtab[data[i] & 0x0f];
}
return result;
}
std::string generateAuthString(const std::string& user, const std::string& password)
{
uint8_t digest[SHA_DIGEST_LENGTH];
SHA1(reinterpret_cast<const uint8_t*> (password.c_str()), password.length(), digest);
std::string auth_str = user;
auth_str += ":";
std::string part1 = bin2hex((uint8_t*)auth_str.c_str(), auth_str.length());
std::string part2 = bin2hex(digest, sizeof(digest));
return part1 + part2;
}
}
namespace CDC
{
/**
* Public functions
*/
Connection::Connection(const std::string& address,
uint16_t port,
const std::string& user,
const std::string& password,
uint32_t flags) :
m_fd(-1),
m_address(address),
m_port(port),
m_user(user),
m_password(password),
m_flags(flags) { }
Connection::~Connection()
{
closeConnection();
}
bool Connection::createConnection()
{
bool rval = false;
struct sockaddr_in remote = {};
remote.sin_port = htons(m_port);
remote.sin_family = AF_INET;
if (inet_aton(m_address.c_str(), (struct in_addr*)&remote.sin_addr.s_addr) == 0)
{
m_error = "Invalid address: ";
m_error += m_address;
}
else
{
int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (fd == -1)
{
char err[ERRBUF_SIZE];
m_error = "Failed to create socket: ";
m_error += strerror_r(errno, err, sizeof (err));
}
m_fd = fd;
if (connect(fd, (struct sockaddr*) &remote, sizeof (remote)) == -1)
{
char err[ERRBUF_SIZE];
m_error = "Failed to connect: ";
m_error += strerror_r(errno, err, sizeof (err));
}
else if (doAuth())
{
rval = doRegistration();
}
}
return rval;
}
void Connection::closeConnection()
{
if (m_fd != -1)
{
nointr_write(m_fd, CLOSE_MSG, sizeof (CLOSE_MSG) - 1);
close(m_fd);
m_fd = -1;
}
}
bool Connection::requestData(const std::string& table, const std::string& gtid)
{
bool rval = true;
std::string req_msg(REQUEST_MSG);
req_msg += table;
if (gtid.length())
{
req_msg += " ";
req_msg += gtid;
}
if (nointr_write(m_fd, req_msg.c_str(), req_msg.length()) == -1)
{
rval = false;
char err[ERRBUF_SIZE];
m_error = "Failed to write request: ";
m_error += strerror_r(errno, err, sizeof (err));
}
if (rval)
{
/** Read the Avro schema */
rval = readRow(m_schema);
}
return rval;
}
bool Connection::readRow(std::string& dest)
{
bool rval = true;
while (true)
{
char buf;
int rc = nointr_read(m_fd, &buf, 1);
if (rc == -1)
{
rval = false;
char err[ERRBUF_SIZE];
m_error = "Failed to read row: ";
m_error += strerror_r(errno, err, sizeof (err));
break;
}
if (buf == '\n')
{
break;
}
else
{
dest += buf;
if (dest[0] == 'E' && dest[1] == 'R' & dest[2] == 'R')
{
m_error = "Server responded with an error: ";
m_error += dest;
rval = false;
break;
}
}
}
return rval;
}
/**
* Private functions
*/
bool Connection::doAuth()
{
bool rval = false;
std::string auth_str = generateAuthString(m_user, m_password);
/** Send the auth string */
if (nointr_write(m_fd, auth_str.c_str(), auth_str.length()) == -1)
{
char err[ERRBUF_SIZE];
m_error = "Failed to write authentication data: ";
m_error += strerror_r(errno, err, sizeof (err));
}
else
{
/** Read the response */
char buf[READBUF_SIZE];
int bytes;
if ((bytes = nointr_read(m_fd, buf, sizeof (buf))) == -1)
{
char err[ERRBUF_SIZE];
m_error = "Failed to read authentication response: ";
m_error += strerror_r(errno, err, sizeof (err));
}
else if (memcmp(buf, OK_RESPONSE, sizeof (OK_RESPONSE) - 1) != 0)
{
buf[bytes] = '\0';
m_error = "Authentication failed: ";
m_error += buf;
}
else
{
rval = true;
}
}
return rval;
}
bool Connection::doRegistration()
{
bool rval = false;
std::string reg_msg(REGISTER_MSG);
const char *type = "";
if (m_flags & CDC_REQUEST_TYPE_JSON)
{
type = "JSON";
}
else if (m_flags & CDC_REQUEST_TYPE_AVRO)
{
type = "AVRO";
}
reg_msg += type;
/** Send the registration message */
if (nointr_write(m_fd, reg_msg.c_str(), reg_msg.length()) == -1)
{
char err[ERRBUF_SIZE];
m_error = "Failed to write registration message: ";
m_error += strerror_r(errno, err, sizeof (err));
}
else
{
/** Read the response */
char buf[READBUF_SIZE];
int bytes;
if ((bytes = nointr_read(m_fd, buf, sizeof (buf))) == -1)
{
char err[ERRBUF_SIZE];
m_error = "Failed to read registration response: ";
m_error += strerror_r(errno, err, sizeof (err));
}
else if (memcmp(buf, OK_RESPONSE, sizeof (OK_RESPONSE) - 1) != 0)
{
buf[bytes] = '\0';
m_error = "Registration failed: ";
m_error += buf;
}
else
{
rval = true;
}
}
return rval;
}
}

View File

@ -1,47 +0,0 @@
#include <cstdint>
#include <string>
/** Request format flags */
#define CDC_REQUEST_TYPE_JSON (1 << 0)
#define CDC_REQUEST_TYPE_AVRO (1 << 1)
namespace CDC
{
class Connection
{
public:
Connection(const std::string& address,
uint16_t port,
const std::string& user,
const std::string& password,
uint32_t flags = CDC_REQUEST_TYPE_JSON);
virtual ~Connection();
bool createConnection();
bool requestData(const std::string& table, const std::string& gtid = "");
bool readRow(std::string& dest);
void closeConnection();
const std::string& getSchema() const
{
return m_schema;
}
const std::string& getError() const
{
return m_error;
}
private:
int m_fd;
uint32_t m_flags;
uint16_t m_port;
std::string m_address;
std::string m_user;
std::string m_password;
std::string m_error;
std::string m_schema;
bool doAuth();
bool doRegistration();
};
}

View File

@ -0,0 +1,59 @@
[maxscale]
threads=###threads###
[MySQL-Monitor]
type=monitor
module=mysqlmon
###repl51###
servers=server1,server2,server3,server4
user=maxskysql
password=skysql
monitor_interval=1000
[RW-Split-Router]
type=service
router=readwritesplit
servers=server1,server2,server3,server4
user=maxskysql
password=skysql
use_sql_variables_in=master
[RW-Split-Listener]
type=listener
service=RW-Split-Router
protocol=MySQLClient
port=4006
[CLI]
type=service
router=cli
[CLI-Listener]
type=listener
service=CLI
protocol=maxscaled
socket=default
[server1]
type=server
address=###node_server_IP_1###
port=###node_server_port_1###
protocol=MySQLBackend
[server2]
type=server
address=###node_server_IP_2###
port=###node_server_port_2###
protocol=MySQLBackend
[server3]
type=server
address=###node_server_IP_3###
port=###node_server_port_3###
protocol=MySQLBackend
[server4]
type=server
address=###node_server_IP_4###
port=###node_server_port_4###
protocol=MySQLBackend

View File

@ -0,0 +1,68 @@
/**
* MXS-1760: Adding use_sql_variables_in=master resulted in error "Client requests unknown
* prepared statement ID '0' that does not map to an internal ID"
*
* https://jira.mariadb.org/browse/MXS-1760
*/
#include "testconnections.h"
#include <vector>
#include <iostream>
using namespace std;
const int NUM_STMT = 2000;
int main(int argc, char** argv)
{
TestConnections test(argc, argv);
std::vector<MYSQL_STMT*> stmts;
test.maxscales->connect();
cout << "Setting variable @a to 123" << endl;
mysql_query(test.maxscales->conn_rwsplit[0], "SET @a = 123");
int rc = execute_query_check_one(test.maxscales->conn_rwsplit[0], "SELECT @a", "123");
test.assert(rc == 0, "Text protocol should return 123 as the value of @a");
cout << "Preparing and executing " << NUM_STMT << " prepared statements" << endl;
for (int i = 0; i < NUM_STMT && test.global_result == 0; i++)
{
stmts.push_back(mysql_stmt_init(test.maxscales->conn_rwsplit[0]));
MYSQL_STMT* stmt = stmts.back();
const char* query = "SELECT @a";
test.add_result(mysql_stmt_prepare(stmt, query, strlen(query)), "Failed to prepare: %s",
mysql_stmt_error(stmt));
}
for (auto stmt: stmts)
{
char buffer[100] = "";
my_bool err = false;
my_bool isnull = false;
MYSQL_BIND bind[1] = {};
bind[0].buffer_length = sizeof(buffer);
bind[0].buffer = buffer;
bind[0].error = &err;
bind[0].is_null = &isnull;
// Execute a write, should return the master's server ID
test.add_result(mysql_stmt_execute(stmt), "Failed to execute: %s", mysql_stmt_error(stmt));
test.add_result(mysql_stmt_bind_result(stmt, bind), "Failed to bind result: %s", mysql_stmt_error(stmt));
while (mysql_stmt_fetch(stmt) == 0)
{
;
}
test.add_result(strcmp(buffer, "123"), "Value is '%s', not '123'", buffer);
mysql_stmt_close(stmt);
}
test.maxscales->disconnect();
test.check_log_err(0, "unknown prepared statement", false);
return test.global_result;
}