Remove redundant or unused avrorouter code
The code in avrorouter that returned the current transaction was not very useful and it can be acquired via the REST API in a more convenient format. The number of created sessions is tracked on the service level so there is no need to track it in the avrorouter. Removed declarations for functions that do not exist and moved code around to reduce the scope.
This commit is contained in:
@ -37,17 +37,16 @@
|
||||
#include <maxscale/buffer.hh>
|
||||
#include <maxscale/utils.hh>
|
||||
|
||||
extern char *blr_extract_column(GWBUF *buf, int col);
|
||||
extern uint32_t extract_field(uint8_t *src, int bits);
|
||||
|
||||
/* AVRO */
|
||||
int avro_client_callback(DCB *dcb, DCB_REASON reason, void *data);
|
||||
void avro_notify_client(AvroSession *client);
|
||||
void poll_fake_write_event(DCB *dcb);
|
||||
GWBUF* read_avro_json_schema(const char *avrofile, const char* dir);
|
||||
GWBUF* read_avro_binary_schema(const char *avrofile, const char* dir);
|
||||
std::pair<std::string, std::string> get_avrofile_and_gtid(std::string file);
|
||||
|
||||
enum
|
||||
{
|
||||
AVRO_CLIENT_UNREGISTERED,
|
||||
AVRO_CLIENT_REGISTERED,
|
||||
AVRO_CLIENT_REQUEST_DATA,
|
||||
AVRO_CLIENT_ERRORED,
|
||||
};
|
||||
|
||||
int AvroSession::routeQuery(GWBUF *queue)
|
||||
{
|
||||
int rval = 1;
|
||||
@ -220,183 +219,6 @@ void extract_gtid_request(gtid_pos_t *gtid, const char *start, int len)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Callback for GTID retrieval
|
||||
* @param data User data
|
||||
* @param ncolumns Number of columns
|
||||
* @param values Row data
|
||||
* @param names Field names
|
||||
* @return 0 on success
|
||||
*/
|
||||
int gtid_query_cb(void* data, int ncolumns, char** values, char** names)
|
||||
{
|
||||
json_t *arr = (json_t*)data;
|
||||
|
||||
if (values[0])
|
||||
{
|
||||
json_array_append_new(arr, json_string(values[0]));
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Callback for GTID retrieval
|
||||
* @param data User data
|
||||
* @param ncolumns Number of columns
|
||||
* @param values Row data
|
||||
* @param names Field names
|
||||
* @return 0 on success
|
||||
*/
|
||||
int gtid_query_cb_plain(void* data, int ncolumns, char** values, char** names)
|
||||
{
|
||||
DCB *dcb = (DCB *)data;
|
||||
if (values[0])
|
||||
{
|
||||
dcb_printf(dcb, "%s ", values[0]);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add the tables involved in the latest transaction to a JSON object
|
||||
*
|
||||
* @param handle SQLite3 handle
|
||||
* @param obj JSON object to add values to
|
||||
* @param gtid GTID of the last transaction
|
||||
*/
|
||||
void add_used_tables(sqlite3 *handle, json_t* obj, gtid_pos_t* gtid)
|
||||
{
|
||||
char sql[AVRO_SQL_BUFFER_SIZE];
|
||||
snprintf(sql, sizeof(sql), "SELECT table_name FROM " USED_TABLES_TABLE_NAME
|
||||
" WHERE domain = %lu AND server_id = %lu AND sequence = %lu",
|
||||
gtid->domain, gtid->server_id, gtid->seq);
|
||||
|
||||
char* errmsg;
|
||||
json_t *arr = json_array();
|
||||
|
||||
if (sqlite3_exec(handle, sql, gtid_query_cb, arr,
|
||||
&errmsg) != SQLITE_OK)
|
||||
{
|
||||
json_decref(arr);
|
||||
MXS_ERROR("Failed to execute query: %s", errmsg);
|
||||
}
|
||||
else
|
||||
{
|
||||
json_object_set_new(obj, "tables", arr);
|
||||
}
|
||||
sqlite3_free(errmsg);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the tables involved in the latest transaction.
|
||||
*
|
||||
* Sqlite3 callback routine calls dcb_printf()
|
||||
*
|
||||
* @param router The AVRO router instance
|
||||
* @param dcb The dcb to write data
|
||||
*/
|
||||
void avro_get_used_tables(Avro *router, DCB* dcb)
|
||||
{
|
||||
sqlite3 *handle = router->sqlite_handle;
|
||||
char sql[AVRO_SQL_BUFFER_SIZE];
|
||||
snprintf(sql, sizeof(sql), "SELECT table_name FROM " USED_TABLES_TABLE_NAME
|
||||
" WHERE domain = %lu AND server_id = %lu AND sequence = %lu",
|
||||
router->gtid.domain, router->gtid.server_id, router->gtid.seq);
|
||||
|
||||
char* errmsg;
|
||||
|
||||
/* call dcb_printf via callback */
|
||||
if (sqlite3_exec(handle, sql, gtid_query_cb_plain, dcb,
|
||||
&errmsg) != SQLITE_OK)
|
||||
{
|
||||
MXS_ERROR("Failed to execute query: %s", errmsg);
|
||||
}
|
||||
sqlite3_free(errmsg);
|
||||
}
|
||||
|
||||
/**
|
||||
* Callback for timestamp retrieval
|
||||
*
|
||||
* @param data User data
|
||||
* @param ncolumns Number of columns
|
||||
* @param values Row data
|
||||
* @param names Field names
|
||||
* @return 0 on success
|
||||
*/
|
||||
int timestamp_query_cb(void* data, int ncolumns, char** values, char** names)
|
||||
{
|
||||
long *val = (long*)data;
|
||||
|
||||
if (values[0])
|
||||
{
|
||||
*val = strtol(values[0], NULL, 10);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add the GTID timestamp to a JSON object
|
||||
*
|
||||
* @param handle
|
||||
* @param obj
|
||||
* @param gtid
|
||||
*/
|
||||
void add_timestamp(sqlite3 *handle, json_t* obj, gtid_pos_t* gtid)
|
||||
{
|
||||
char sql[AVRO_SQL_BUFFER_SIZE];
|
||||
snprintf(sql, sizeof(sql), "SELECT DISTINCT binlog_timestamp FROM " USED_TABLES_TABLE_NAME
|
||||
" WHERE domain = %lu AND server_id = %lu AND sequence = %lu",
|
||||
gtid->domain, gtid->server_id, gtid->seq);
|
||||
|
||||
char* errmsg;
|
||||
long ts = 0;
|
||||
|
||||
if (sqlite3_exec(handle, sql, timestamp_query_cb, &ts,
|
||||
&errmsg) == SQLITE_OK)
|
||||
{
|
||||
json_object_set_new(obj, "timestamp", json_integer(ts));
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("Failed to execute query: %s", errmsg);
|
||||
}
|
||||
sqlite3_free(errmsg);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Send information about the current GTID being processed
|
||||
* @param router Router instance
|
||||
* @param dcb Client DCB
|
||||
*/
|
||||
void AvroSession::send_gtid_info(gtid_pos_t* gtid_pos)
|
||||
{
|
||||
json_t *obj = json_object();
|
||||
|
||||
if (obj)
|
||||
{
|
||||
char gtid[256];
|
||||
snprintf(gtid, sizeof(gtid), "%lu-%lu-%lu", gtid_pos->domain,
|
||||
gtid_pos->server_id, gtid_pos->seq);
|
||||
json_object_set_new(obj, "GTID", json_string(gtid));
|
||||
|
||||
// TODO: Store number of events in the database
|
||||
json_object_set_new(obj, "events", json_integer(gtid_pos->event_num));
|
||||
|
||||
add_timestamp(router->sqlite_handle, obj, gtid_pos);
|
||||
add_used_tables(router->sqlite_handle, obj, gtid_pos);
|
||||
|
||||
char *js = json_dumps(obj, 0);
|
||||
size_t size = strlen(js);
|
||||
GWBUF *buffer = gwbuf_alloc_and_load(size, js);
|
||||
MXS_FREE(js);
|
||||
dcb->func.write(dcb, buffer);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Check if a file exists in a directory
|
||||
*
|
||||
@ -414,79 +236,22 @@ bool file_in_dir(const char *dir, const char *file)
|
||||
}
|
||||
|
||||
/**
|
||||
* Process command from client
|
||||
*
|
||||
* @param data Buffer containing the command
|
||||
* @brief The client callback for sending data
|
||||
*
|
||||
* @param dcb Client DCB
|
||||
* @param reason Why the callback was called
|
||||
* @param userdata Data provided when the callback was added
|
||||
* @return Always 0
|
||||
*/
|
||||
void AvroSession::process_command(GWBUF *queue)
|
||||
int avro_client_callback(DCB *dcb, DCB_REASON reason, void *userdata)
|
||||
{
|
||||
const char req_data[] = "REQUEST-DATA";
|
||||
const char req_last_gtid[] = "QUERY-LAST-TRANSACTION";
|
||||
const char req_gtid[] = "QUERY-TRANSACTION";
|
||||
const size_t req_data_len = sizeof(req_data) - 1;
|
||||
size_t buflen = gwbuf_length(queue);
|
||||
uint8_t data[buflen + 1];
|
||||
gwbuf_copy_data(queue, 0, buflen, data);
|
||||
data[buflen] = '\0';
|
||||
char *command_ptr = strstr((char *)data, req_data);
|
||||
|
||||
if (command_ptr != NULL)
|
||||
if (reason == DCB_REASON_DRAINED)
|
||||
{
|
||||
char *file_ptr = command_ptr + req_data_len;
|
||||
int data_len = GWBUF_LENGTH(queue) - req_data_len;
|
||||
|
||||
if (data_len > 1)
|
||||
{
|
||||
auto file_and_gtid = get_avrofile_and_gtid(file_ptr);
|
||||
|
||||
if (!file_and_gtid.second.empty())
|
||||
{
|
||||
requested_gtid = true;
|
||||
extract_gtid_request(>id, file_and_gtid.second.c_str(), file_and_gtid.second.size());
|
||||
memcpy(>id_start, >id, sizeof(gtid_start));
|
||||
}
|
||||
|
||||
avro_binfile = file_and_gtid.first;
|
||||
|
||||
if (file_in_dir(router->avrodir.c_str(), avro_binfile.c_str()))
|
||||
{
|
||||
/* set callback routine for data sending */
|
||||
dcb_add_callback(dcb, DCB_REASON_DRAINED, avro_client_callback, this);
|
||||
|
||||
/* Add fake event that will call the avro_client_callback() routine */
|
||||
poll_fake_write_event(dcb);
|
||||
}
|
||||
else
|
||||
{
|
||||
dcb_printf(dcb, "ERR NO-FILE File '%s' not found.", avro_binfile.c_str());
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
dcb_printf(dcb, "ERR REQUEST-DATA with no data");
|
||||
}
|
||||
}
|
||||
/* Return last GTID info */
|
||||
else if (strstr((char *)data, req_last_gtid))
|
||||
{
|
||||
send_gtid_info(&router->gtid);
|
||||
}
|
||||
/** Return requested GTID */
|
||||
else if (strstr((char *)data, req_gtid))
|
||||
{
|
||||
gtid_pos_t gtid;
|
||||
extract_gtid_request(>id, (char*)data + sizeof(req_gtid),
|
||||
GWBUF_LENGTH(queue) - sizeof(req_gtid));
|
||||
send_gtid_info(>id);
|
||||
}
|
||||
else
|
||||
{
|
||||
GWBUF *reply = gwbuf_alloc(5);
|
||||
memcpy(GWBUF_DATA(reply), "ECHO:", 5);
|
||||
reply = gwbuf_append(reply, gwbuf_clone(queue));
|
||||
dcb->func.write(dcb, reply);
|
||||
AvroSession *client = static_cast<AvroSession*>(userdata);
|
||||
client->client_callback();
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -534,6 +299,66 @@ std::pair<std::string, std::string> get_avrofile_and_gtid(std::string file)
|
||||
return std::make_pair(filename, gtid);
|
||||
}
|
||||
|
||||
/**
|
||||
* Process command from client
|
||||
*
|
||||
* @param data Buffer containing the command
|
||||
*
|
||||
*/
|
||||
void AvroSession::process_command(GWBUF *queue)
|
||||
{
|
||||
const char req_data[] = "REQUEST-DATA";
|
||||
const size_t req_data_len = sizeof(req_data) - 1;
|
||||
size_t buflen = gwbuf_length(queue);
|
||||
uint8_t data[buflen + 1];
|
||||
gwbuf_copy_data(queue, 0, buflen, data);
|
||||
data[buflen] = '\0';
|
||||
char *command_ptr = strstr((char *)data, req_data);
|
||||
|
||||
if (command_ptr != NULL)
|
||||
{
|
||||
char *file_ptr = command_ptr + req_data_len;
|
||||
int data_len = GWBUF_LENGTH(queue) - req_data_len;
|
||||
|
||||
if (data_len > 1)
|
||||
{
|
||||
auto file_and_gtid = get_avrofile_and_gtid(file_ptr);
|
||||
|
||||
if (!file_and_gtid.second.empty())
|
||||
{
|
||||
requested_gtid = true;
|
||||
extract_gtid_request(>id, file_and_gtid.second.c_str(), file_and_gtid.second.size());
|
||||
memcpy(>id_start, >id, sizeof(gtid_start));
|
||||
}
|
||||
|
||||
avro_binfile = file_and_gtid.first;
|
||||
|
||||
if (file_in_dir(router->avrodir.c_str(), avro_binfile.c_str()))
|
||||
{
|
||||
/* set callback routine for data sending */
|
||||
dcb_add_callback(dcb, DCB_REASON_DRAINED, avro_client_callback, this);
|
||||
|
||||
/* Add fake event that will call the avro_client_callback() routine */
|
||||
poll_fake_write_event(dcb);
|
||||
}
|
||||
else
|
||||
{
|
||||
dcb_printf(dcb, "ERR NO-FILE File '%s' not found.", avro_binfile.c_str());
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
dcb_printf(dcb, "ERR REQUEST-DATA with no data");
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
const char err[] = "ERR: Unknown command";
|
||||
GWBUF *reply = gwbuf_alloc_and_load(sizeof(err), err);
|
||||
dcb->func.write(dcb, reply);
|
||||
}
|
||||
}
|
||||
|
||||
static int send_row(DCB *dcb, json_t* row)
|
||||
{
|
||||
char *json = json_dumps(row, JSON_PRESERVE_ORDER);
|
||||
@ -883,25 +708,6 @@ static std::string get_next_filename(std::string file, std::string dir)
|
||||
return std::string(outbuf);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief The client callback for sending data
|
||||
*
|
||||
* @param dcb Client DCB
|
||||
* @param reason Why the callback was called
|
||||
* @param userdata Data provided when the callback was added
|
||||
* @return Always 0
|
||||
*/
|
||||
int avro_client_callback(DCB *dcb, DCB_REASON reason, void *userdata)
|
||||
{
|
||||
if (reason == DCB_REASON_DRAINED)
|
||||
{
|
||||
AvroSession *client = static_cast<AvroSession*>(userdata);
|
||||
client->client_callback();
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void AvroSession::client_callback()
|
||||
{
|
||||
if (last_sent_pos == 0)
|
||||
@ -969,10 +775,6 @@ AvroSession* AvroSession::create(Avro* inst, MXS_SESSION* session)
|
||||
MXS_OOM();
|
||||
sqlite3_close_v2(handle);
|
||||
}
|
||||
else
|
||||
{
|
||||
atomic_add(&inst->stats.n_clients, 1);
|
||||
}
|
||||
|
||||
return client;
|
||||
}
|
||||
@ -993,9 +795,6 @@ AvroSession::AvroSession(Avro* instance, MXS_SESSION* session, sqlite3* handle):
|
||||
|
||||
AvroSession::~AvroSession()
|
||||
{
|
||||
ss_debug(int prev_val = )atomic_add(&router->stats.n_clients, -1);
|
||||
ss_dassert(prev_val > 0);
|
||||
|
||||
maxavro_file_close(file_handle);
|
||||
sqlite3_close_v2(sqlite_handle);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user