Move client functionality into AvroSession
Most of the code is unchanged. Some of the code that relied on modifiable char pointers was modified to use std::string.
This commit is contained in:
parent
0d73530ff3
commit
e35d9dfc10
@ -539,6 +539,8 @@ Avro::Avro(SERVICE* service, MXS_CONFIG_PARAMETER* params, sqlite3* handle, SERV
|
||||
avrodir(config_get_string(params, "avrodir")),
|
||||
current_pos(4),
|
||||
binlog_fd(-1),
|
||||
event_types(0),
|
||||
event_type_hdr_lens{0},
|
||||
trx_count(0),
|
||||
trx_target(config_get_integer(params, "group_trx")),
|
||||
row_count(0),
|
||||
@ -644,10 +646,9 @@ static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_sessio
|
||||
static int
|
||||
routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queue)
|
||||
{
|
||||
Avro *router = (Avro *) instance;
|
||||
AvroSession *client = (AvroSession *) router_session;
|
||||
|
||||
return avro_client_handle_request(router, client, queue);
|
||||
return client->routeQuery(queue);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -21,6 +21,9 @@
|
||||
#include <stdlib.h>
|
||||
#include <errno.h>
|
||||
#include <string.h>
|
||||
#include <maxavro.h>
|
||||
#include <fstream>
|
||||
#include <sstream>
|
||||
#include <maxscale/service.h>
|
||||
#include <maxscale/server.h>
|
||||
#include <maxscale/router.h>
|
||||
@ -30,75 +33,64 @@
|
||||
#include <maxscale/spinlock.h>
|
||||
#include <maxscale/log_manager.h>
|
||||
#include <maxscale/version.h>
|
||||
#include <maxavro.h>
|
||||
#include <maxscale/alloc.h>
|
||||
#include <maxscale/buffer.hh>
|
||||
#include <maxscale/utils.hh>
|
||||
|
||||
extern char *blr_extract_column(GWBUF *buf, int col);
|
||||
extern uint32_t extract_field(uint8_t *src, int bits);
|
||||
|
||||
/* AVRO */
|
||||
static int avro_client_do_registration(Avro *, AvroSession *, GWBUF *);
|
||||
int avro_client_callback(DCB *dcb, DCB_REASON reason, void *data);
|
||||
static void avro_client_process_command(Avro *router, AvroSession *client, GWBUF *queue);
|
||||
static bool avro_client_stream_data(AvroSession *client);
|
||||
void avro_notify_client(AvroSession *client);
|
||||
void poll_fake_write_event(DCB *dcb);
|
||||
GWBUF* read_avro_json_schema(const char *avrofile, const char* dir);
|
||||
GWBUF* read_avro_binary_schema(const char *avrofile, const char* dir);
|
||||
const char* get_avrofile_name(const char *file_ptr, int data_len, char *dest);
|
||||
std::pair<std::string, std::string> get_avrofile_and_gtid(std::string file);
|
||||
|
||||
/**
|
||||
* Process a request packet from the slave server.
|
||||
*
|
||||
* @param router The router instance this defines the master for this replication chain
|
||||
* @param client The client specific data
|
||||
* @param queue The incoming request packet
|
||||
* @return 1 on success, 0 on error or failure
|
||||
*/
|
||||
int
|
||||
avro_client_handle_request(Avro *router, AvroSession *client, GWBUF *queue)
|
||||
int AvroSession::routeQuery(GWBUF *queue)
|
||||
{
|
||||
int rval = 1;
|
||||
|
||||
switch (client->state)
|
||||
switch (state)
|
||||
{
|
||||
case AVRO_CLIENT_ERRORED:
|
||||
/* force disconnection */
|
||||
return 0;
|
||||
break;
|
||||
case AVRO_CLIENT_UNREGISTERED:
|
||||
if (avro_client_do_registration(router, client, queue) == 0)
|
||||
if (do_registration(queue) == 0)
|
||||
{
|
||||
client->state = AVRO_CLIENT_ERRORED;
|
||||
dcb_printf(client->dcb, "ERR, code 12, msg: Registration failed\n");
|
||||
state = AVRO_CLIENT_ERRORED;
|
||||
dcb_printf(dcb, "ERR, code 12, msg: Registration failed\n");
|
||||
/* force disconnection */
|
||||
dcb_close(client->dcb);
|
||||
dcb_close(dcb);
|
||||
rval = 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Send OK ack to client */
|
||||
dcb_printf(client->dcb, "OK\n");
|
||||
dcb_printf(dcb, "OK\n");
|
||||
|
||||
client->state = AVRO_CLIENT_REGISTERED;
|
||||
state = AVRO_CLIENT_REGISTERED;
|
||||
MXS_INFO("%s: Client [%s] has completed REGISTRATION action",
|
||||
client->dcb->service->name,
|
||||
client->dcb->remote != NULL ? client->dcb->remote : "");
|
||||
dcb->service->name,
|
||||
dcb->remote != NULL ? dcb->remote : "");
|
||||
}
|
||||
break;
|
||||
case AVRO_CLIENT_REGISTERED:
|
||||
case AVRO_CLIENT_REQUEST_DATA:
|
||||
if (client->state == AVRO_CLIENT_REGISTERED)
|
||||
if (state == AVRO_CLIENT_REGISTERED)
|
||||
{
|
||||
client->state = AVRO_CLIENT_REQUEST_DATA;
|
||||
state = AVRO_CLIENT_REQUEST_DATA;
|
||||
}
|
||||
|
||||
/* Process command from client */
|
||||
avro_client_process_command(router, client, queue);
|
||||
process_command(queue);
|
||||
|
||||
break;
|
||||
default:
|
||||
client->state = AVRO_CLIENT_ERRORED;
|
||||
state = AVRO_CLIENT_ERRORED;
|
||||
rval = 0;
|
||||
break;
|
||||
}
|
||||
@ -109,15 +101,13 @@ avro_client_handle_request(Avro *router, AvroSession *client, GWBUF *queue)
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle the REGISTRATION command
|
||||
* Handle client registration
|
||||
*
|
||||
* @param dcb DCB with allocateid protocol
|
||||
* @param data GWBUF with registration message
|
||||
* @return 1 for successful registration 0 otherwise
|
||||
* @param data Buffer with registration message
|
||||
*
|
||||
* @return 1 on successful registration, 0 on error
|
||||
*/
|
||||
static int
|
||||
avro_client_do_registration(Avro *router, AvroSession *client, GWBUF *data)
|
||||
int AvroSession::do_registration(GWBUF *data)
|
||||
{
|
||||
const char reg_uuid[] = "REGISTER UUID=";
|
||||
const int reg_uuid_len = strlen(reg_uuid);
|
||||
@ -130,31 +120,31 @@ avro_client_do_registration(Avro *router, AvroSession *client, GWBUF *data)
|
||||
char *sep_ptr;
|
||||
int uuid_len = (data_len > CDC_UUID_LEN) ? CDC_UUID_LEN : data_len;
|
||||
/* 36 +1 */
|
||||
char uuid[uuid_len + 1];
|
||||
memcpy(uuid, request + reg_uuid_len, uuid_len);
|
||||
uuid[uuid_len] = '\0';
|
||||
char client_uuid[uuid_len + 1];
|
||||
memcpy(client_uuid, request + reg_uuid_len, uuid_len);
|
||||
client_uuid[uuid_len] = '\0';
|
||||
|
||||
if ((sep_ptr = strchr(uuid, ',')) != NULL)
|
||||
if ((sep_ptr = strchr(client_uuid, ',')) != NULL)
|
||||
{
|
||||
*sep_ptr = '\0';
|
||||
}
|
||||
if ((sep_ptr = strchr(uuid + strlen(uuid), ' ')) != NULL)
|
||||
if ((sep_ptr = strchr(client_uuid + strlen(client_uuid), ' ')) != NULL)
|
||||
{
|
||||
*sep_ptr = '\0';
|
||||
}
|
||||
if ((sep_ptr = strchr(uuid, ' ')) != NULL)
|
||||
if ((sep_ptr = strchr(client_uuid, ' ')) != NULL)
|
||||
{
|
||||
*sep_ptr = '\0';
|
||||
}
|
||||
|
||||
if (strlen(uuid) < static_cast<size_t>(uuid_len))
|
||||
if (strlen(client_uuid) < static_cast<size_t>(uuid_len))
|
||||
{
|
||||
data_len -= (uuid_len - strlen(uuid));
|
||||
data_len -= (uuid_len - strlen(client_uuid));
|
||||
}
|
||||
|
||||
uuid_len = strlen(uuid);
|
||||
uuid_len = strlen(client_uuid);
|
||||
|
||||
client->uuid = MXS_STRDUP_A(uuid);
|
||||
uuid = client_uuid;
|
||||
|
||||
if (data_len > 0)
|
||||
{
|
||||
@ -165,14 +155,14 @@ avro_client_do_registration(Avro *router, AvroSession *client, GWBUF *data)
|
||||
if (memcmp(tmp_ptr + 5, "AVRO", 4) == 0)
|
||||
{
|
||||
ret = 1;
|
||||
client->state = AVRO_CLIENT_REGISTERED;
|
||||
client->format = AVRO_FORMAT_AVRO;
|
||||
state = AVRO_CLIENT_REGISTERED;
|
||||
format = AVRO_FORMAT_AVRO;
|
||||
}
|
||||
else if (memcmp(tmp_ptr + 5, "JSON", 4) == 0)
|
||||
{
|
||||
ret = 1;
|
||||
client->state = AVRO_CLIENT_REGISTERED;
|
||||
client->format = AVRO_FORMAT_JSON;
|
||||
state = AVRO_CLIENT_REGISTERED;
|
||||
format = AVRO_FORMAT_JSON;
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -382,7 +372,7 @@ void add_timestamp(sqlite3 *handle, json_t* obj, gtid_pos_t* gtid)
|
||||
* @param router Router instance
|
||||
* @param dcb Client DCB
|
||||
*/
|
||||
void send_gtid_info(Avro *router, gtid_pos_t *gtid_pos, DCB *dcb)
|
||||
void AvroSession::send_gtid_info(gtid_pos_t* gtid_pos)
|
||||
{
|
||||
json_t *obj = json_object();
|
||||
|
||||
@ -426,13 +416,10 @@ bool file_in_dir(const char *dir, const char *file)
|
||||
/**
|
||||
* Process command from client
|
||||
*
|
||||
* @param router The router instance
|
||||
* @param client The specific client data
|
||||
* @param data GWBUF with command
|
||||
* @param data Buffer containing the command
|
||||
*
|
||||
*/
|
||||
static void
|
||||
avro_client_process_command(Avro *router, AvroSession *client, GWBUF *queue)
|
||||
void AvroSession::process_command(GWBUF *queue)
|
||||
{
|
||||
const char req_data[] = "REQUEST-DATA";
|
||||
const char req_last_gtid[] = "QUERY-LAST-TRANSACTION";
|
||||
@ -451,37 +438,39 @@ avro_client_process_command(Avro *router, AvroSession *client, GWBUF *queue)
|
||||
|
||||
if (data_len > 1)
|
||||
{
|
||||
const char *gtid_ptr = get_avrofile_name(file_ptr, data_len, client->avro_binfile);
|
||||
auto file_and_gtid = get_avrofile_and_gtid(file_ptr);
|
||||
|
||||
if (gtid_ptr)
|
||||
if (!file_and_gtid.second.empty())
|
||||
{
|
||||
client->requested_gtid = true;
|
||||
extract_gtid_request(&client->gtid, gtid_ptr, data_len - (gtid_ptr - file_ptr));
|
||||
memcpy(&client->gtid_start, &client->gtid, sizeof(client->gtid_start));
|
||||
requested_gtid = true;
|
||||
extract_gtid_request(>id, file_and_gtid.second.c_str(), file_and_gtid.second.size());
|
||||
memcpy(>id_start, >id, sizeof(gtid_start));
|
||||
}
|
||||
|
||||
if (file_in_dir(router->avrodir.c_str(), client->avro_binfile))
|
||||
avro_binfile = file_and_gtid.first;
|
||||
|
||||
if (file_in_dir(router->avrodir.c_str(), avro_binfile.c_str()))
|
||||
{
|
||||
/* set callback routine for data sending */
|
||||
dcb_add_callback(client->dcb, DCB_REASON_DRAINED, avro_client_callback, client);
|
||||
dcb_add_callback(dcb, DCB_REASON_DRAINED, avro_client_callback, this);
|
||||
|
||||
/* Add fake event that will call the avro_client_callback() routine */
|
||||
poll_fake_write_event(client->dcb);
|
||||
poll_fake_write_event(dcb);
|
||||
}
|
||||
else
|
||||
{
|
||||
dcb_printf(client->dcb, "ERR NO-FILE File '%s' not found.", client->avro_binfile);
|
||||
dcb_printf(dcb, "ERR NO-FILE File '%s' not found.", avro_binfile.c_str());
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
dcb_printf(client->dcb, "ERR REQUEST-DATA with no data");
|
||||
dcb_printf(dcb, "ERR REQUEST-DATA with no data");
|
||||
}
|
||||
}
|
||||
/* Return last GTID info */
|
||||
else if (strstr((char *)data, req_last_gtid))
|
||||
{
|
||||
send_gtid_info(router, &router->gtid, client->dcb);
|
||||
send_gtid_info(&router->gtid);
|
||||
}
|
||||
/** Return requested GTID */
|
||||
else if (strstr((char *)data, req_gtid))
|
||||
@ -489,14 +478,14 @@ avro_client_process_command(Avro *router, AvroSession *client, GWBUF *queue)
|
||||
gtid_pos_t gtid;
|
||||
extract_gtid_request(>id, (char*)data + sizeof(req_gtid),
|
||||
GWBUF_LENGTH(queue) - sizeof(req_gtid));
|
||||
send_gtid_info(router, >id, client->dcb);
|
||||
send_gtid_info(>id);
|
||||
}
|
||||
else
|
||||
{
|
||||
GWBUF *reply = gwbuf_alloc(5);
|
||||
memcpy(GWBUF_DATA(reply), "ECHO:", 5);
|
||||
reply = gwbuf_append(reply, gwbuf_clone(queue));
|
||||
client->dcb->func.write(client->dcb, reply);
|
||||
dcb->func.write(dcb, reply);
|
||||
}
|
||||
}
|
||||
|
||||
@ -508,41 +497,41 @@ avro_client_process_command(Avro *router, AvroSession *client, GWBUF *queue)
|
||||
* @param dest Destination where the file name is stored. Must be at least
|
||||
* @p data_len + 1 bytes.
|
||||
*/
|
||||
const char* get_avrofile_name(const char *file_ptr, int data_len, char *dest)
|
||||
std::pair<std::string, std::string> get_avrofile_and_gtid(std::string file)
|
||||
{
|
||||
while (isspace(*file_ptr))
|
||||
mxs::ltrim(file);
|
||||
auto pos = file.find_first_of(' ');
|
||||
std::string filename;
|
||||
std::string gtid;
|
||||
|
||||
if (pos != std::string::npos)
|
||||
{
|
||||
file_ptr++;
|
||||
data_len--;
|
||||
// Client requests a specific GTID
|
||||
filename = file.substr(0, pos);
|
||||
gtid = file.substr(pos + 1);
|
||||
}
|
||||
|
||||
char avro_file[data_len + 1];
|
||||
memcpy(avro_file, file_ptr, data_len);
|
||||
avro_file[data_len] = '\0';
|
||||
|
||||
char *cmd_sep = strchr(avro_file, ' ');
|
||||
const char *rval = NULL;
|
||||
|
||||
if (cmd_sep)
|
||||
{
|
||||
*cmd_sep++ = '\0';
|
||||
rval = file_ptr + (cmd_sep - avro_file);
|
||||
ss_dassert(rval < file_ptr + data_len);
|
||||
}
|
||||
|
||||
/** Exact file version specified */
|
||||
if ((cmd_sep = strchr(avro_file, '.')) && (cmd_sep = strchr(cmd_sep + 1, '.')) &&
|
||||
strlen(cmd_sep + 1) > 0)
|
||||
{
|
||||
snprintf(dest, AVRO_MAX_FILENAME_LEN, "%s.avro", avro_file);
|
||||
}
|
||||
/** No version specified, send all files */
|
||||
else
|
||||
{
|
||||
snprintf(dest, AVRO_MAX_FILENAME_LEN, "%s.000001.avro", avro_file);
|
||||
filename = file;
|
||||
}
|
||||
|
||||
return rval;
|
||||
auto first_dot = filename.find_first_of('.');
|
||||
auto last_dot = filename.find_last_of('.');
|
||||
|
||||
if (first_dot != std::string::npos &&
|
||||
last_dot != std::string::npos &&
|
||||
first_dot != last_dot)
|
||||
{
|
||||
// Exact file version specified e.g. test.t1.000002
|
||||
filename += ".avro";
|
||||
}
|
||||
else
|
||||
{
|
||||
// No version specified, send first file
|
||||
filename += ".000001.avro";
|
||||
}
|
||||
|
||||
return std::make_pair(filename, gtid);
|
||||
}
|
||||
|
||||
static int send_row(DCB *dcb, json_t* row)
|
||||
@ -568,19 +557,19 @@ static int send_row(DCB *dcb, json_t* row)
|
||||
return rc;
|
||||
}
|
||||
|
||||
static void set_current_gtid(AvroSession *client, json_t *row)
|
||||
void AvroSession::set_current_gtid(json_t *row)
|
||||
{
|
||||
json_t *obj = json_object_get(row, avro_sequence);
|
||||
ss_dassert(json_is_integer(obj));
|
||||
client->gtid.seq = json_integer_value(obj);
|
||||
gtid.seq = json_integer_value(obj);
|
||||
|
||||
obj = json_object_get(row, avro_server_id);
|
||||
ss_dassert(json_is_integer(obj));
|
||||
client->gtid.server_id = json_integer_value(obj);
|
||||
gtid.server_id = json_integer_value(obj);
|
||||
|
||||
obj = json_object_get(row, avro_domain);
|
||||
ss_dassert(json_is_integer(obj));
|
||||
client->gtid.domain = json_integer_value(obj);
|
||||
gtid.domain = json_integer_value(obj);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -590,25 +579,23 @@ static void set_current_gtid(AvroSession *client, json_t *row)
|
||||
* @param dcb DCB to stream to
|
||||
* @return True if more data is readable, false if all data was sent
|
||||
*/
|
||||
static bool stream_json(AvroSession *client)
|
||||
bool AvroSession::stream_json()
|
||||
{
|
||||
int bytes = 0;
|
||||
MAXAVRO_FILE *file = client->file_handle;
|
||||
DCB *dcb = client->dcb;
|
||||
|
||||
do
|
||||
{
|
||||
json_t *row;
|
||||
int rc = 1;
|
||||
while (rc > 0 && (row = maxavro_record_read_json(file)))
|
||||
while (rc > 0 && (row = maxavro_record_read_json(file_handle)))
|
||||
{
|
||||
rc = send_row(dcb, row);
|
||||
set_current_gtid(client, row);
|
||||
set_current_gtid(row);
|
||||
json_decref(row);
|
||||
}
|
||||
bytes += file->buffer_size;
|
||||
bytes += file_handle->buffer_size;
|
||||
}
|
||||
while (maxavro_next_block(file) && bytes < AVRO_DATA_BURST_SIZE);
|
||||
while (maxavro_next_block(file_handle) && bytes < AVRO_DATA_BURST_SIZE);
|
||||
|
||||
return bytes >= AVRO_DATA_BURST_SIZE;
|
||||
}
|
||||
@ -620,18 +607,16 @@ static bool stream_json(AvroSession *client)
|
||||
* @param dcb DCB to stream to
|
||||
* @return True if streaming was successful, false if an error occurred
|
||||
*/
|
||||
static bool stream_binary(AvroSession *client)
|
||||
bool AvroSession::stream_binary()
|
||||
{
|
||||
GWBUF *buffer;
|
||||
uint64_t bytes = 0;
|
||||
int rc = 1;
|
||||
MAXAVRO_FILE *file = client->file_handle;
|
||||
DCB *dcb = client->dcb;
|
||||
|
||||
while (rc > 0 && bytes < AVRO_DATA_BURST_SIZE)
|
||||
{
|
||||
bytes += file->buffer_size;
|
||||
if ((buffer = maxavro_record_read_binary(file)))
|
||||
bytes += file_handle->buffer_size;
|
||||
if ((buffer = maxavro_record_read_binary(file_handle)))
|
||||
{
|
||||
rc = dcb->func.write(dcb, buffer);
|
||||
}
|
||||
@ -660,24 +645,23 @@ static int sqlite_cb(void* data, int rows, char** values, char** names)
|
||||
static const char select_template[] = "SELECT max(position) FROM gtid WHERE domain=%lu "
|
||||
"AND server_id=%lu AND sequence <= %lu AND avrofile=\"%s\";";
|
||||
|
||||
static bool seek_to_index_pos(AvroSession *client, MAXAVRO_FILE* file)
|
||||
bool AvroSession::seek_to_index_pos()
|
||||
{
|
||||
char *name = strrchr(client->file_handle->filename, '/');
|
||||
char *name = strrchr(file_handle->filename, '/');
|
||||
ss_dassert(name);
|
||||
name++;
|
||||
|
||||
char sql[sizeof(select_template) + NAME_MAX + 80];
|
||||
snprintf(sql, sizeof(sql), select_template, client->gtid.domain,
|
||||
client->gtid.server_id, client->gtid.seq, name);
|
||||
snprintf(sql, sizeof(sql), select_template, gtid.domain, gtid.server_id, gtid.seq, name);
|
||||
|
||||
long offset = -1;
|
||||
char *errmsg = NULL;
|
||||
bool rval = false;
|
||||
|
||||
if (sqlite3_exec(client->sqlite_handle, sql, sqlite_cb, &offset, &errmsg) == SQLITE_OK)
|
||||
if (sqlite3_exec(sqlite_handle, sql, sqlite_cb, &offset, &errmsg) == SQLITE_OK)
|
||||
{
|
||||
rval = true;
|
||||
if (offset > 0 && !maxavro_record_set_pos(file, offset))
|
||||
if (offset > 0 && !maxavro_record_set_pos(file_handle, offset))
|
||||
{
|
||||
rval = false;
|
||||
}
|
||||
@ -685,48 +669,42 @@ static bool seek_to_index_pos(AvroSession *client, MAXAVRO_FILE* file)
|
||||
else
|
||||
{
|
||||
MXS_ERROR("Failed to query index position for GTID %lu-%lu-%lu: %s",
|
||||
client->gtid.domain, client->gtid.server_id, client->gtid.seq, errmsg);
|
||||
gtid.domain, gtid.server_id, gtid.seq, errmsg);
|
||||
}
|
||||
sqlite3_free(errmsg);
|
||||
return rval;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param client
|
||||
* @param file
|
||||
*/
|
||||
static bool seek_to_gtid(AvroSession *client, MAXAVRO_FILE* file)
|
||||
bool AvroSession::seek_to_gtid()
|
||||
{
|
||||
bool seeking = true;
|
||||
|
||||
do
|
||||
{
|
||||
json_t *row;
|
||||
while ((row = maxavro_record_read_json(file)))
|
||||
while ((row = maxavro_record_read_json(file_handle)))
|
||||
{
|
||||
json_t *obj = json_object_get(row, avro_sequence);
|
||||
ss_dassert(json_is_integer(obj));
|
||||
uint64_t value = json_integer_value(obj);
|
||||
|
||||
/** If a larger GTID is found, use that */
|
||||
if (value >= client->gtid.seq)
|
||||
if (value >= gtid.seq)
|
||||
{
|
||||
obj = json_object_get(row, avro_server_id);
|
||||
ss_dassert(json_is_integer(obj));
|
||||
value = json_integer_value(obj);
|
||||
|
||||
if (value == client->gtid.server_id)
|
||||
if (value == gtid.server_id)
|
||||
{
|
||||
obj = json_object_get(row, avro_domain);
|
||||
ss_dassert(json_is_integer(obj));
|
||||
value = json_integer_value(obj);
|
||||
|
||||
if (value == client->gtid.domain)
|
||||
if (value == gtid.domain)
|
||||
{
|
||||
MXS_INFO("Found GTID %lu-%lu-%lu for %s@%s",
|
||||
client->gtid.domain, client->gtid.server_id,
|
||||
client->gtid.seq, client->dcb->user, client->dcb->remote);
|
||||
MXS_INFO("Found GTID %lu-%lu-%lu for %s@%s", gtid.domain,
|
||||
gtid.server_id, gtid.seq, dcb->user, dcb->remote);
|
||||
seeking = false;
|
||||
}
|
||||
}
|
||||
@ -736,13 +714,13 @@ static bool seek_to_gtid(AvroSession *client, MAXAVRO_FILE* file)
|
||||
* read the row into memory */
|
||||
if (!seeking)
|
||||
{
|
||||
send_row(client->dcb, row);
|
||||
send_row(dcb, row);
|
||||
}
|
||||
|
||||
json_decref(row);
|
||||
}
|
||||
}
|
||||
while (seeking && maxavro_next_block(file));
|
||||
while (seeking && maxavro_next_block(file_handle));
|
||||
|
||||
return !seeking;
|
||||
}
|
||||
@ -750,122 +728,94 @@ static bool seek_to_gtid(AvroSession *client, MAXAVRO_FILE* file)
|
||||
/**
|
||||
* Print JSON output from selected AVRO file
|
||||
*
|
||||
* @param router The router instance
|
||||
* @param client The specific client data
|
||||
* @param avro_file The requested AVRO file
|
||||
* @return True if more data needs to be read
|
||||
*/
|
||||
static bool avro_client_stream_data(AvroSession *client)
|
||||
bool AvroSession::stream_data()
|
||||
{
|
||||
bool read_more = false;
|
||||
Avro *router = client->router;
|
||||
|
||||
if (strnlen(client->avro_binfile, 1))
|
||||
if (!avro_binfile.empty())
|
||||
{
|
||||
char filename[PATH_MAX + 1];
|
||||
snprintf(filename, PATH_MAX, "%s/%s", router->avrodir.c_str(), client->avro_binfile);
|
||||
|
||||
bool ok = true;
|
||||
std::string filename = router->avrodir + '/' + avro_binfile;
|
||||
|
||||
if (client->file_handle == NULL &&
|
||||
(client->file_handle = maxavro_file_open(filename)) == NULL)
|
||||
if (!file_handle && !(file_handle = maxavro_file_open(filename.c_str())))
|
||||
{
|
||||
ok = false;
|
||||
}
|
||||
|
||||
if (ok)
|
||||
{
|
||||
switch (client->format)
|
||||
switch (format)
|
||||
{
|
||||
case AVRO_FORMAT_JSON:
|
||||
/** Currently only JSON format supports seeking to a GTID */
|
||||
if (client->requested_gtid &&
|
||||
seek_to_index_pos(client, client->file_handle) &&
|
||||
seek_to_gtid(client, client->file_handle))
|
||||
if (requested_gtid && seek_to_index_pos() && seek_to_gtid())
|
||||
{
|
||||
client->requested_gtid = false;
|
||||
requested_gtid = false;
|
||||
}
|
||||
|
||||
read_more = stream_json(client);
|
||||
read_more = stream_json();
|
||||
break;
|
||||
|
||||
case AVRO_FORMAT_AVRO:
|
||||
read_more = stream_binary(client);
|
||||
read_more = stream_binary();
|
||||
break;
|
||||
|
||||
default:
|
||||
MXS_ERROR("Unexpected format: %d", client->format);
|
||||
MXS_ERROR("Unexpected format: %d", format);
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
if (maxavro_get_error(client->file_handle) != MAXAVRO_ERR_NONE)
|
||||
if (maxavro_get_error(file_handle) != MAXAVRO_ERR_NONE)
|
||||
{
|
||||
MXS_ERROR("Reading Avro file failed with error '%s'.",
|
||||
maxavro_get_error_string(client->file_handle));
|
||||
maxavro_get_error_string(file_handle));
|
||||
}
|
||||
|
||||
client->last_sent_pos = client->file_handle->records_read;
|
||||
last_sent_pos = file_handle->records_read;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
fprintf(stderr, "No file specified\n");
|
||||
dcb_printf(client->dcb, "ERR avro file not specified");
|
||||
dcb_printf(dcb, "ERR avro file not specified");
|
||||
}
|
||||
|
||||
return read_more;
|
||||
}
|
||||
|
||||
GWBUF* read_avro_json_schema(const char *avrofile, const char* dir)
|
||||
GWBUF* read_avro_json_schema(std::string avrofile, std::string dir)
|
||||
{
|
||||
GWBUF* rval = NULL;
|
||||
const char *suffix = strrchr(avrofile, '.');
|
||||
|
||||
if (suffix)
|
||||
// Copy the name and swap the suffix from .avro to .avsc
|
||||
std::string schemafile = dir + "/" + avrofile.substr(0, avrofile.length() - 2) + "sc";
|
||||
std::ifstream file(schemafile);
|
||||
|
||||
if (file.good())
|
||||
{
|
||||
char buffer[PATH_MAX + 1];
|
||||
snprintf(buffer, sizeof(buffer), "%s/%.*s.avsc", dir, (int)(suffix - avrofile),
|
||||
avrofile);
|
||||
FILE *file = fopen(buffer, "rb");
|
||||
|
||||
if (file)
|
||||
{
|
||||
int nread;
|
||||
while ((nread = fread(buffer, 1, sizeof(buffer) - 1, file)) > 0)
|
||||
{
|
||||
while (isspace(buffer[nread - 1]))
|
||||
{
|
||||
nread--;
|
||||
}
|
||||
|
||||
buffer[nread++] = '\n';
|
||||
|
||||
GWBUF * newbuf = gwbuf_alloc_and_load(nread, buffer);
|
||||
|
||||
if (newbuf)
|
||||
{
|
||||
rval = gwbuf_append(rval, newbuf);
|
||||
}
|
||||
}
|
||||
|
||||
fclose(file);
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("Failed to open file '%s': %d, %s", buffer, errno,
|
||||
mxs_strerror(errno));
|
||||
}
|
||||
std::stringstream ss;
|
||||
ss << file.rdbuf();
|
||||
std::string text = ss.str();
|
||||
mxs::Buffer buffer(std::vector<uint8_t>(text.begin(), text.end()));
|
||||
rval = buffer.release();
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("Failed to open file '%s': %d, %s", schemafile.c_str(), errno,
|
||||
mxs_strerror(errno));
|
||||
}
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
GWBUF* read_avro_binary_schema(const char *avrofile, const char* dir)
|
||||
GWBUF* read_avro_binary_schema(std::string avrofile, std::string dir)
|
||||
{
|
||||
GWBUF* rval = NULL;
|
||||
char buffer[PATH_MAX + 1];
|
||||
snprintf(buffer, sizeof(buffer), "%s/%s", dir, avrofile);
|
||||
MAXAVRO_FILE *file = maxavro_file_open(buffer);
|
||||
std::string filename = dir + '/' + avrofile;
|
||||
MAXAVRO_FILE *file = maxavro_file_open(filename.c_str());
|
||||
|
||||
if (file)
|
||||
{
|
||||
@ -874,7 +824,7 @@ GWBUF* read_avro_binary_schema(const char *avrofile, const char* dir)
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("Failed to open file '%s'.", buffer);
|
||||
MXS_ERROR("Failed to open file '%s'.", filename.c_str());
|
||||
}
|
||||
|
||||
return rval;
|
||||
@ -885,34 +835,23 @@ GWBUF* read_avro_binary_schema(const char *avrofile, const char* dir)
|
||||
* @param client Avro client session
|
||||
* @param fullname Absolute path to the file to rotate to
|
||||
*/
|
||||
static void rotate_avro_file(AvroSession *client, char *fullname)
|
||||
void AvroSession::rotate_avro_file(std::string fullname)
|
||||
{
|
||||
char *filename = strrchr(fullname, '/') + 1;
|
||||
size_t len = strlen(filename);
|
||||
if (len > AVRO_MAX_FILENAME_LEN)
|
||||
{
|
||||
// TODO: This function is in need of a return value. It would
|
||||
// TODO: be better to abort if the name is too long and also
|
||||
// TODO: if the opening of the file fails.
|
||||
MXS_ERROR("Filename %s of length %lu is longer than maximum allowed "
|
||||
"length %d. Trailing data will be cut.",
|
||||
filename, len, AVRO_MAX_FILENAME_LEN);
|
||||
len = AVRO_MAX_FILENAME_LEN;
|
||||
}
|
||||
strncpy(client->avro_binfile, filename, len);
|
||||
client->avro_binfile[len] = 0;
|
||||
client->last_sent_pos = 0;
|
||||
auto pos = fullname.find_last_of('/');
|
||||
ss_dassert(pos != std::string::npos);
|
||||
avro_binfile = fullname.substr(pos + 1);
|
||||
last_sent_pos = 0;
|
||||
|
||||
maxavro_file_close(client->file_handle);
|
||||
maxavro_file_close(file_handle);
|
||||
|
||||
if ((client->file_handle = maxavro_file_open(fullname)) == NULL)
|
||||
if ((file_handle = maxavro_file_open(fullname.c_str())) == NULL)
|
||||
{
|
||||
MXS_ERROR("Failed to open file: %s", filename);
|
||||
MXS_ERROR("Failed to open file: %s", fullname.c_str());
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_INFO("Rotated '%s'@'%s' to file: %s", client->dcb->user,
|
||||
client->dcb->remote, fullname);
|
||||
MXS_INFO("Rotated '%s'@'%s' to file: %s", dcb->user,
|
||||
dcb->remote, fullname.c_str());
|
||||
}
|
||||
}
|
||||
|
||||
@ -923,25 +862,25 @@ static void rotate_avro_file(AvroSession *client, char *fullname)
|
||||
* @param dest Destination where the full path to the file is printed
|
||||
* @param len Size of @p dest
|
||||
*/
|
||||
static void print_next_filename(const char *file, const char *dir, char *dest, size_t len)
|
||||
static std::string get_next_filename(std::string file, std::string dir)
|
||||
{
|
||||
char buffer[strlen(file) + 1];
|
||||
strcpy(buffer, file);
|
||||
char *ptr = strrchr(buffer, '.');
|
||||
// Find the last and second to last dot
|
||||
auto last = file.find_last_of('.');
|
||||
auto almost_last = file.find_last_of('.', last);
|
||||
ss_dassert(last != std::string::npos && almost_last != std::string::npos);
|
||||
|
||||
if (ptr)
|
||||
{
|
||||
ptr--;
|
||||
while (ptr > buffer && *(ptr) != '.')
|
||||
{
|
||||
ptr--;
|
||||
}
|
||||
// Extract the number between the dots
|
||||
std::string number_part = file.substr(almost_last + 1, last);
|
||||
int filenum = strtol(number_part.c_str(), NULL, 10);
|
||||
|
||||
int filenum = strtol(ptr + 1, NULL, 10);
|
||||
*ptr = '\0';
|
||||
snprintf(dest, len, "%s/%s.%06d.avro",
|
||||
dir, buffer, filenum + 1);
|
||||
}
|
||||
std::string file_part = file.substr(0, almost_last);
|
||||
|
||||
// Print it out the new filename with the file number incremented by one
|
||||
char outbuf[PATH_MAX + 1];
|
||||
snprintf(outbuf, sizeof(outbuf), "%s/%s.%06d.avro",
|
||||
dir.c_str(), file_part.c_str(), filenum + 1);
|
||||
|
||||
return std::string(outbuf);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -956,89 +895,59 @@ int avro_client_callback(DCB *dcb, DCB_REASON reason, void *userdata)
|
||||
{
|
||||
if (reason == DCB_REASON_DRAINED)
|
||||
{
|
||||
AvroSession *client = (AvroSession*)userdata;
|
||||
|
||||
spinlock_acquire(&client->catch_lock);
|
||||
if (client->cstate & AVRO_CS_BUSY)
|
||||
{
|
||||
spinlock_release(&client->catch_lock);
|
||||
return 0;
|
||||
}
|
||||
|
||||
client->cstate |= AVRO_CS_BUSY;
|
||||
spinlock_release(&client->catch_lock);
|
||||
|
||||
if (client->last_sent_pos == 0)
|
||||
{
|
||||
/** Send the schema of the current file */
|
||||
GWBUF *schema = NULL;
|
||||
|
||||
switch (client->format)
|
||||
{
|
||||
case AVRO_FORMAT_JSON:
|
||||
schema = read_avro_json_schema(client->avro_binfile, client->router->avrodir.c_str());
|
||||
break;
|
||||
|
||||
case AVRO_FORMAT_AVRO:
|
||||
schema = read_avro_binary_schema(client->avro_binfile, client->router->avrodir.c_str());
|
||||
break;
|
||||
|
||||
default:
|
||||
MXS_ERROR("Unknown client format: %d", client->format);
|
||||
}
|
||||
|
||||
if (schema)
|
||||
{
|
||||
client->dcb->func.write(client->dcb, schema);
|
||||
}
|
||||
}
|
||||
|
||||
/** Stream the data to the client */
|
||||
bool read_more = avro_client_stream_data(client);
|
||||
|
||||
char filename[PATH_MAX + 1];
|
||||
print_next_filename(client->avro_binfile, client->router->avrodir.c_str(),
|
||||
filename, sizeof(filename));
|
||||
|
||||
bool next_file;
|
||||
/** If the next file is available, send it to the client */
|
||||
if ((next_file = (access(filename, R_OK) == 0)))
|
||||
{
|
||||
rotate_avro_file(client, filename);
|
||||
}
|
||||
|
||||
spinlock_acquire(&client->catch_lock);
|
||||
client->cstate &= ~AVRO_CS_BUSY;
|
||||
client->cstate |= AVRO_WAIT_DATA;
|
||||
|
||||
if (next_file || read_more)
|
||||
{
|
||||
#ifdef SS_DEBUG
|
||||
if (read_more)
|
||||
{
|
||||
MXS_DEBUG("Burst limit hit, need to read more data.");
|
||||
}
|
||||
#endif
|
||||
avro_notify_client(client);
|
||||
}
|
||||
spinlock_release(&client->catch_lock);
|
||||
AvroSession *client = static_cast<AvroSession*>(userdata);
|
||||
client->client_callback();
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Notify a client that new data is available
|
||||
*
|
||||
* The client catch_lock must be held when calling this function.
|
||||
*
|
||||
* @param client Client to notify
|
||||
*/
|
||||
void avro_notify_client(AvroSession *client)
|
||||
void AvroSession::client_callback()
|
||||
{
|
||||
/* Add fake event that will call the avro_client_callback() routine */
|
||||
poll_fake_write_event(client->dcb);
|
||||
client->cstate &= ~AVRO_WAIT_DATA;
|
||||
if (last_sent_pos == 0)
|
||||
{
|
||||
// TODO: Don't use DCB callbacks to stream the data
|
||||
last_sent_pos = 1;
|
||||
|
||||
/** Send the schema of the current file */
|
||||
GWBUF *schema = NULL;
|
||||
|
||||
switch (format)
|
||||
{
|
||||
case AVRO_FORMAT_JSON:
|
||||
schema = read_avro_json_schema(avro_binfile, router->avrodir);
|
||||
break;
|
||||
|
||||
case AVRO_FORMAT_AVRO:
|
||||
schema = read_avro_binary_schema(avro_binfile, router->avrodir);
|
||||
break;
|
||||
|
||||
default:
|
||||
MXS_ERROR("Unknown client format: %d", format);
|
||||
break;
|
||||
}
|
||||
|
||||
if (schema)
|
||||
{
|
||||
dcb->func.write(dcb, schema);
|
||||
}
|
||||
}
|
||||
|
||||
/** Stream the data to the client */
|
||||
bool read_more = stream_data();
|
||||
std::string filename = get_next_filename(avro_binfile, router->avrodir);
|
||||
|
||||
bool next_file;
|
||||
/** If the next file is available, send it to the client */
|
||||
if ((next_file = (access(filename.c_str(), R_OK) == 0)))
|
||||
{
|
||||
rotate_avro_file(filename);
|
||||
}
|
||||
|
||||
if (next_file || read_more)
|
||||
{
|
||||
poll_fake_write_event(dcb);
|
||||
}
|
||||
}
|
||||
|
||||
// static
|
||||
@ -1072,15 +981,12 @@ AvroSession::AvroSession(Avro* instance, MXS_SESSION* session, sqlite3* handle):
|
||||
dcb(session->client_dcb),
|
||||
state(AVRO_CLIENT_UNREGISTERED),
|
||||
format(AVRO_FORMAT_UNDEFINED),
|
||||
uuid(NULL),
|
||||
catch_lock(SPINLOCK_INIT),
|
||||
router(instance),
|
||||
file_handle(NULL),
|
||||
last_sent_pos(0),
|
||||
connect_time(time(NULL)),
|
||||
avro_binfile{0},
|
||||
requested_gtid(false),
|
||||
cstate(0),
|
||||
sqlite_handle(handle)
|
||||
{
|
||||
}
|
||||
@ -1090,7 +996,6 @@ AvroSession::~AvroSession()
|
||||
ss_debug(int prev_val = )atomic_add(&router->stats.n_clients, -1);
|
||||
ss_dassert(prev_val > 0);
|
||||
|
||||
free(uuid);
|
||||
maxavro_file_close(file_handle);
|
||||
sqlite3_close_v2(sqlite_handle);
|
||||
}
|
||||
|
@ -432,8 +432,7 @@ bool notify_cb(DCB* dcb, void* data)
|
||||
|
||||
if (dcb->service == service && dcb->dcb_role == DCB_ROLE_CLIENT_HANDLER)
|
||||
{
|
||||
AvroSession* ses = reinterpret_cast<AvroSession*>(dcb->session->router_session);
|
||||
avro_notify_client(ses);
|
||||
poll_fake_write_event(dcb);
|
||||
}
|
||||
|
||||
return true;
|
||||
|
@ -307,21 +307,45 @@ public:
|
||||
DCB* dcb; /*< The client DCB */
|
||||
int state; /*< The state of this client */
|
||||
enum avro_data_format format; /*< Stream JSON or Avro data */
|
||||
char* uuid; /*< Client UUID */
|
||||
std::string uuid; /*< Client UUID */
|
||||
SPINLOCK catch_lock; /*< Event catchup lock */
|
||||
Avro* router; /*< Pointer to the owning router */
|
||||
MAXAVRO_FILE* file_handle; /*< Current open file handle */
|
||||
uint64_t last_sent_pos; /*< The last record we sent */
|
||||
time_t connect_time; /*< Connect time of slave */
|
||||
char avro_binfile[AVRO_MAX_FILENAME_LEN + 1];
|
||||
std::string avro_binfile;
|
||||
bool requested_gtid; /*< If the client requested */
|
||||
gtid_pos_t gtid; /*< Current/requested GTID */
|
||||
gtid_pos_t gtid_start; /*< First sent GTID */
|
||||
unsigned int cstate; /*< Catch up state */
|
||||
sqlite3* sqlite_handle;
|
||||
|
||||
/**
|
||||
* Process a client request
|
||||
*
|
||||
* @param Buffer The incoming request packet
|
||||
*
|
||||
* @return 1 on success, 0 on error
|
||||
*/
|
||||
int routeQuery(GWBUF* buffer);
|
||||
|
||||
/**
|
||||
* Handler for the EPOLLOUT event
|
||||
*/
|
||||
void client_callback();
|
||||
|
||||
private:
|
||||
AvroSession(Avro* instance, MXS_SESSION* session, sqlite3* handle);
|
||||
|
||||
int do_registration(GWBUF *data);
|
||||
void process_command(GWBUF *queue);
|
||||
void send_gtid_info(gtid_pos_t *gtid_pos);
|
||||
void set_current_gtid(json_t *row);
|
||||
bool stream_json();
|
||||
bool stream_binary();
|
||||
bool seek_to_index_pos();
|
||||
bool seek_to_gtid();
|
||||
bool stream_data();
|
||||
void rotate_avro_file(std::string fullname);
|
||||
};
|
||||
|
||||
extern void read_table_info(uint8_t *ptr, uint8_t post_header_len, uint64_t *table_id,
|
||||
|
Loading…
x
Reference in New Issue
Block a user