From e35d9dfc102d8d0ec64925464f3bfcf583750029 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Mon, 28 May 2018 12:49:17 +0300 Subject: [PATCH] Move client functionality into AvroSession Most of the code is unchanged. Some of the code that relied on modifiable char pointers was modified to use std::string. --- server/modules/routing/avrorouter/avro.cc | 5 +- .../modules/routing/avrorouter/avro_client.cc | 533 +++++++----------- .../modules/routing/avrorouter/avro_file.cc | 3 +- .../modules/routing/avrorouter/avrorouter.hh | 30 +- 4 files changed, 250 insertions(+), 321 deletions(-) diff --git a/server/modules/routing/avrorouter/avro.cc b/server/modules/routing/avrorouter/avro.cc index 80c29cea1..cec7d5694 100644 --- a/server/modules/routing/avrorouter/avro.cc +++ b/server/modules/routing/avrorouter/avro.cc @@ -539,6 +539,8 @@ Avro::Avro(SERVICE* service, MXS_CONFIG_PARAMETER* params, sqlite3* handle, SERV avrodir(config_get_string(params, "avrodir")), current_pos(4), binlog_fd(-1), + event_types(0), + event_type_hdr_lens{0}, trx_count(0), trx_target(config_get_integer(params, "group_trx")), row_count(0), @@ -644,10 +646,9 @@ static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_sessio static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queue) { - Avro *router = (Avro *) instance; AvroSession *client = (AvroSession *) router_session; - return avro_client_handle_request(router, client, queue); + return client->routeQuery(queue); } /** diff --git a/server/modules/routing/avrorouter/avro_client.cc b/server/modules/routing/avrorouter/avro_client.cc index a6aec06c6..d84bae44d 100644 --- a/server/modules/routing/avrorouter/avro_client.cc +++ b/server/modules/routing/avrorouter/avro_client.cc @@ -21,6 +21,9 @@ #include #include #include +#include +#include +#include #include #include #include @@ -30,75 +33,64 @@ #include #include #include -#include #include +#include +#include extern char *blr_extract_column(GWBUF *buf, int col); extern uint32_t extract_field(uint8_t *src, int bits); /* AVRO */ -static int avro_client_do_registration(Avro *, AvroSession *, GWBUF *); int avro_client_callback(DCB *dcb, DCB_REASON reason, void *data); -static void avro_client_process_command(Avro *router, AvroSession *client, GWBUF *queue); -static bool avro_client_stream_data(AvroSession *client); void avro_notify_client(AvroSession *client); void poll_fake_write_event(DCB *dcb); GWBUF* read_avro_json_schema(const char *avrofile, const char* dir); GWBUF* read_avro_binary_schema(const char *avrofile, const char* dir); -const char* get_avrofile_name(const char *file_ptr, int data_len, char *dest); +std::pair get_avrofile_and_gtid(std::string file); -/** - * Process a request packet from the slave server. - * - * @param router The router instance this defines the master for this replication chain - * @param client The client specific data - * @param queue The incoming request packet - * @return 1 on success, 0 on error or failure - */ -int -avro_client_handle_request(Avro *router, AvroSession *client, GWBUF *queue) +int AvroSession::routeQuery(GWBUF *queue) { int rval = 1; - switch (client->state) + switch (state) { case AVRO_CLIENT_ERRORED: /* force disconnection */ return 0; break; case AVRO_CLIENT_UNREGISTERED: - if (avro_client_do_registration(router, client, queue) == 0) + if (do_registration(queue) == 0) { - client->state = AVRO_CLIENT_ERRORED; - dcb_printf(client->dcb, "ERR, code 12, msg: Registration failed\n"); + state = AVRO_CLIENT_ERRORED; + dcb_printf(dcb, "ERR, code 12, msg: Registration failed\n"); /* force disconnection */ - dcb_close(client->dcb); + dcb_close(dcb); rval = 0; } else { /* Send OK ack to client */ - dcb_printf(client->dcb, "OK\n"); + dcb_printf(dcb, "OK\n"); - client->state = AVRO_CLIENT_REGISTERED; + state = AVRO_CLIENT_REGISTERED; MXS_INFO("%s: Client [%s] has completed REGISTRATION action", - client->dcb->service->name, - client->dcb->remote != NULL ? client->dcb->remote : ""); + dcb->service->name, + dcb->remote != NULL ? dcb->remote : ""); } break; case AVRO_CLIENT_REGISTERED: case AVRO_CLIENT_REQUEST_DATA: - if (client->state == AVRO_CLIENT_REGISTERED) + if (state == AVRO_CLIENT_REGISTERED) { - client->state = AVRO_CLIENT_REQUEST_DATA; + state = AVRO_CLIENT_REQUEST_DATA; } /* Process command from client */ - avro_client_process_command(router, client, queue); + process_command(queue); break; default: - client->state = AVRO_CLIENT_ERRORED; + state = AVRO_CLIENT_ERRORED; rval = 0; break; } @@ -109,15 +101,13 @@ avro_client_handle_request(Avro *router, AvroSession *client, GWBUF *queue) } /** - * Handle the REGISTRATION command + * Handle client registration * - * @param dcb DCB with allocateid protocol - * @param data GWBUF with registration message - * @return 1 for successful registration 0 otherwise + * @param data Buffer with registration message * + * @return 1 on successful registration, 0 on error */ -static int -avro_client_do_registration(Avro *router, AvroSession *client, GWBUF *data) +int AvroSession::do_registration(GWBUF *data) { const char reg_uuid[] = "REGISTER UUID="; const int reg_uuid_len = strlen(reg_uuid); @@ -130,31 +120,31 @@ avro_client_do_registration(Avro *router, AvroSession *client, GWBUF *data) char *sep_ptr; int uuid_len = (data_len > CDC_UUID_LEN) ? CDC_UUID_LEN : data_len; /* 36 +1 */ - char uuid[uuid_len + 1]; - memcpy(uuid, request + reg_uuid_len, uuid_len); - uuid[uuid_len] = '\0'; + char client_uuid[uuid_len + 1]; + memcpy(client_uuid, request + reg_uuid_len, uuid_len); + client_uuid[uuid_len] = '\0'; - if ((sep_ptr = strchr(uuid, ',')) != NULL) + if ((sep_ptr = strchr(client_uuid, ',')) != NULL) { *sep_ptr = '\0'; } - if ((sep_ptr = strchr(uuid + strlen(uuid), ' ')) != NULL) + if ((sep_ptr = strchr(client_uuid + strlen(client_uuid), ' ')) != NULL) { *sep_ptr = '\0'; } - if ((sep_ptr = strchr(uuid, ' ')) != NULL) + if ((sep_ptr = strchr(client_uuid, ' ')) != NULL) { *sep_ptr = '\0'; } - if (strlen(uuid) < static_cast(uuid_len)) + if (strlen(client_uuid) < static_cast(uuid_len)) { - data_len -= (uuid_len - strlen(uuid)); + data_len -= (uuid_len - strlen(client_uuid)); } - uuid_len = strlen(uuid); + uuid_len = strlen(client_uuid); - client->uuid = MXS_STRDUP_A(uuid); + uuid = client_uuid; if (data_len > 0) { @@ -165,14 +155,14 @@ avro_client_do_registration(Avro *router, AvroSession *client, GWBUF *data) if (memcmp(tmp_ptr + 5, "AVRO", 4) == 0) { ret = 1; - client->state = AVRO_CLIENT_REGISTERED; - client->format = AVRO_FORMAT_AVRO; + state = AVRO_CLIENT_REGISTERED; + format = AVRO_FORMAT_AVRO; } else if (memcmp(tmp_ptr + 5, "JSON", 4) == 0) { ret = 1; - client->state = AVRO_CLIENT_REGISTERED; - client->format = AVRO_FORMAT_JSON; + state = AVRO_CLIENT_REGISTERED; + format = AVRO_FORMAT_JSON; } else { @@ -382,7 +372,7 @@ void add_timestamp(sqlite3 *handle, json_t* obj, gtid_pos_t* gtid) * @param router Router instance * @param dcb Client DCB */ -void send_gtid_info(Avro *router, gtid_pos_t *gtid_pos, DCB *dcb) +void AvroSession::send_gtid_info(gtid_pos_t* gtid_pos) { json_t *obj = json_object(); @@ -426,13 +416,10 @@ bool file_in_dir(const char *dir, const char *file) /** * Process command from client * - * @param router The router instance - * @param client The specific client data - * @param data GWBUF with command + * @param data Buffer containing the command * */ -static void -avro_client_process_command(Avro *router, AvroSession *client, GWBUF *queue) +void AvroSession::process_command(GWBUF *queue) { const char req_data[] = "REQUEST-DATA"; const char req_last_gtid[] = "QUERY-LAST-TRANSACTION"; @@ -451,37 +438,39 @@ avro_client_process_command(Avro *router, AvroSession *client, GWBUF *queue) if (data_len > 1) { - const char *gtid_ptr = get_avrofile_name(file_ptr, data_len, client->avro_binfile); + auto file_and_gtid = get_avrofile_and_gtid(file_ptr); - if (gtid_ptr) + if (!file_and_gtid.second.empty()) { - client->requested_gtid = true; - extract_gtid_request(&client->gtid, gtid_ptr, data_len - (gtid_ptr - file_ptr)); - memcpy(&client->gtid_start, &client->gtid, sizeof(client->gtid_start)); + requested_gtid = true; + extract_gtid_request(>id, file_and_gtid.second.c_str(), file_and_gtid.second.size()); + memcpy(>id_start, >id, sizeof(gtid_start)); } - if (file_in_dir(router->avrodir.c_str(), client->avro_binfile)) + avro_binfile = file_and_gtid.first; + + if (file_in_dir(router->avrodir.c_str(), avro_binfile.c_str())) { /* set callback routine for data sending */ - dcb_add_callback(client->dcb, DCB_REASON_DRAINED, avro_client_callback, client); + dcb_add_callback(dcb, DCB_REASON_DRAINED, avro_client_callback, this); /* Add fake event that will call the avro_client_callback() routine */ - poll_fake_write_event(client->dcb); + poll_fake_write_event(dcb); } else { - dcb_printf(client->dcb, "ERR NO-FILE File '%s' not found.", client->avro_binfile); + dcb_printf(dcb, "ERR NO-FILE File '%s' not found.", avro_binfile.c_str()); } } else { - dcb_printf(client->dcb, "ERR REQUEST-DATA with no data"); + dcb_printf(dcb, "ERR REQUEST-DATA with no data"); } } /* Return last GTID info */ else if (strstr((char *)data, req_last_gtid)) { - send_gtid_info(router, &router->gtid, client->dcb); + send_gtid_info(&router->gtid); } /** Return requested GTID */ else if (strstr((char *)data, req_gtid)) @@ -489,14 +478,14 @@ avro_client_process_command(Avro *router, AvroSession *client, GWBUF *queue) gtid_pos_t gtid; extract_gtid_request(>id, (char*)data + sizeof(req_gtid), GWBUF_LENGTH(queue) - sizeof(req_gtid)); - send_gtid_info(router, >id, client->dcb); + send_gtid_info(>id); } else { GWBUF *reply = gwbuf_alloc(5); memcpy(GWBUF_DATA(reply), "ECHO:", 5); reply = gwbuf_append(reply, gwbuf_clone(queue)); - client->dcb->func.write(client->dcb, reply); + dcb->func.write(dcb, reply); } } @@ -508,41 +497,41 @@ avro_client_process_command(Avro *router, AvroSession *client, GWBUF *queue) * @param dest Destination where the file name is stored. Must be at least * @p data_len + 1 bytes. */ -const char* get_avrofile_name(const char *file_ptr, int data_len, char *dest) +std::pair get_avrofile_and_gtid(std::string file) { - while (isspace(*file_ptr)) + mxs::ltrim(file); + auto pos = file.find_first_of(' '); + std::string filename; + std::string gtid; + + if (pos != std::string::npos) { - file_ptr++; - data_len--; + // Client requests a specific GTID + filename = file.substr(0, pos); + gtid = file.substr(pos + 1); } - - char avro_file[data_len + 1]; - memcpy(avro_file, file_ptr, data_len); - avro_file[data_len] = '\0'; - - char *cmd_sep = strchr(avro_file, ' '); - const char *rval = NULL; - - if (cmd_sep) - { - *cmd_sep++ = '\0'; - rval = file_ptr + (cmd_sep - avro_file); - ss_dassert(rval < file_ptr + data_len); - } - - /** Exact file version specified */ - if ((cmd_sep = strchr(avro_file, '.')) && (cmd_sep = strchr(cmd_sep + 1, '.')) && - strlen(cmd_sep + 1) > 0) - { - snprintf(dest, AVRO_MAX_FILENAME_LEN, "%s.avro", avro_file); - } - /** No version specified, send all files */ else { - snprintf(dest, AVRO_MAX_FILENAME_LEN, "%s.000001.avro", avro_file); + filename = file; } - return rval; + auto first_dot = filename.find_first_of('.'); + auto last_dot = filename.find_last_of('.'); + + if (first_dot != std::string::npos && + last_dot != std::string::npos && + first_dot != last_dot) + { + // Exact file version specified e.g. test.t1.000002 + filename += ".avro"; + } + else + { + // No version specified, send first file + filename += ".000001.avro"; + } + + return std::make_pair(filename, gtid); } static int send_row(DCB *dcb, json_t* row) @@ -568,19 +557,19 @@ static int send_row(DCB *dcb, json_t* row) return rc; } -static void set_current_gtid(AvroSession *client, json_t *row) +void AvroSession::set_current_gtid(json_t *row) { json_t *obj = json_object_get(row, avro_sequence); ss_dassert(json_is_integer(obj)); - client->gtid.seq = json_integer_value(obj); + gtid.seq = json_integer_value(obj); obj = json_object_get(row, avro_server_id); ss_dassert(json_is_integer(obj)); - client->gtid.server_id = json_integer_value(obj); + gtid.server_id = json_integer_value(obj); obj = json_object_get(row, avro_domain); ss_dassert(json_is_integer(obj)); - client->gtid.domain = json_integer_value(obj); + gtid.domain = json_integer_value(obj); } /** @@ -590,25 +579,23 @@ static void set_current_gtid(AvroSession *client, json_t *row) * @param dcb DCB to stream to * @return True if more data is readable, false if all data was sent */ -static bool stream_json(AvroSession *client) +bool AvroSession::stream_json() { int bytes = 0; - MAXAVRO_FILE *file = client->file_handle; - DCB *dcb = client->dcb; do { json_t *row; int rc = 1; - while (rc > 0 && (row = maxavro_record_read_json(file))) + while (rc > 0 && (row = maxavro_record_read_json(file_handle))) { rc = send_row(dcb, row); - set_current_gtid(client, row); + set_current_gtid(row); json_decref(row); } - bytes += file->buffer_size; + bytes += file_handle->buffer_size; } - while (maxavro_next_block(file) && bytes < AVRO_DATA_BURST_SIZE); + while (maxavro_next_block(file_handle) && bytes < AVRO_DATA_BURST_SIZE); return bytes >= AVRO_DATA_BURST_SIZE; } @@ -620,18 +607,16 @@ static bool stream_json(AvroSession *client) * @param dcb DCB to stream to * @return True if streaming was successful, false if an error occurred */ -static bool stream_binary(AvroSession *client) +bool AvroSession::stream_binary() { GWBUF *buffer; uint64_t bytes = 0; int rc = 1; - MAXAVRO_FILE *file = client->file_handle; - DCB *dcb = client->dcb; while (rc > 0 && bytes < AVRO_DATA_BURST_SIZE) { - bytes += file->buffer_size; - if ((buffer = maxavro_record_read_binary(file))) + bytes += file_handle->buffer_size; + if ((buffer = maxavro_record_read_binary(file_handle))) { rc = dcb->func.write(dcb, buffer); } @@ -660,24 +645,23 @@ static int sqlite_cb(void* data, int rows, char** values, char** names) static const char select_template[] = "SELECT max(position) FROM gtid WHERE domain=%lu " "AND server_id=%lu AND sequence <= %lu AND avrofile=\"%s\";"; -static bool seek_to_index_pos(AvroSession *client, MAXAVRO_FILE* file) +bool AvroSession::seek_to_index_pos() { - char *name = strrchr(client->file_handle->filename, '/'); + char *name = strrchr(file_handle->filename, '/'); ss_dassert(name); name++; char sql[sizeof(select_template) + NAME_MAX + 80]; - snprintf(sql, sizeof(sql), select_template, client->gtid.domain, - client->gtid.server_id, client->gtid.seq, name); + snprintf(sql, sizeof(sql), select_template, gtid.domain, gtid.server_id, gtid.seq, name); long offset = -1; char *errmsg = NULL; bool rval = false; - if (sqlite3_exec(client->sqlite_handle, sql, sqlite_cb, &offset, &errmsg) == SQLITE_OK) + if (sqlite3_exec(sqlite_handle, sql, sqlite_cb, &offset, &errmsg) == SQLITE_OK) { rval = true; - if (offset > 0 && !maxavro_record_set_pos(file, offset)) + if (offset > 0 && !maxavro_record_set_pos(file_handle, offset)) { rval = false; } @@ -685,48 +669,42 @@ static bool seek_to_index_pos(AvroSession *client, MAXAVRO_FILE* file) else { MXS_ERROR("Failed to query index position for GTID %lu-%lu-%lu: %s", - client->gtid.domain, client->gtid.server_id, client->gtid.seq, errmsg); + gtid.domain, gtid.server_id, gtid.seq, errmsg); } sqlite3_free(errmsg); return rval; } -/** - * - * @param client - * @param file - */ -static bool seek_to_gtid(AvroSession *client, MAXAVRO_FILE* file) +bool AvroSession::seek_to_gtid() { bool seeking = true; do { json_t *row; - while ((row = maxavro_record_read_json(file))) + while ((row = maxavro_record_read_json(file_handle))) { json_t *obj = json_object_get(row, avro_sequence); ss_dassert(json_is_integer(obj)); uint64_t value = json_integer_value(obj); /** If a larger GTID is found, use that */ - if (value >= client->gtid.seq) + if (value >= gtid.seq) { obj = json_object_get(row, avro_server_id); ss_dassert(json_is_integer(obj)); value = json_integer_value(obj); - if (value == client->gtid.server_id) + if (value == gtid.server_id) { obj = json_object_get(row, avro_domain); ss_dassert(json_is_integer(obj)); value = json_integer_value(obj); - if (value == client->gtid.domain) + if (value == gtid.domain) { - MXS_INFO("Found GTID %lu-%lu-%lu for %s@%s", - client->gtid.domain, client->gtid.server_id, - client->gtid.seq, client->dcb->user, client->dcb->remote); + MXS_INFO("Found GTID %lu-%lu-%lu for %s@%s", gtid.domain, + gtid.server_id, gtid.seq, dcb->user, dcb->remote); seeking = false; } } @@ -736,13 +714,13 @@ static bool seek_to_gtid(AvroSession *client, MAXAVRO_FILE* file) * read the row into memory */ if (!seeking) { - send_row(client->dcb, row); + send_row(dcb, row); } json_decref(row); } } - while (seeking && maxavro_next_block(file)); + while (seeking && maxavro_next_block(file_handle)); return !seeking; } @@ -750,122 +728,94 @@ static bool seek_to_gtid(AvroSession *client, MAXAVRO_FILE* file) /** * Print JSON output from selected AVRO file * - * @param router The router instance - * @param client The specific client data - * @param avro_file The requested AVRO file * @return True if more data needs to be read */ -static bool avro_client_stream_data(AvroSession *client) +bool AvroSession::stream_data() { bool read_more = false; - Avro *router = client->router; - if (strnlen(client->avro_binfile, 1)) + if (!avro_binfile.empty()) { - char filename[PATH_MAX + 1]; - snprintf(filename, PATH_MAX, "%s/%s", router->avrodir.c_str(), client->avro_binfile); - bool ok = true; + std::string filename = router->avrodir + '/' + avro_binfile; - if (client->file_handle == NULL && - (client->file_handle = maxavro_file_open(filename)) == NULL) + if (!file_handle && !(file_handle = maxavro_file_open(filename.c_str()))) { ok = false; } if (ok) { - switch (client->format) + switch (format) { case AVRO_FORMAT_JSON: /** Currently only JSON format supports seeking to a GTID */ - if (client->requested_gtid && - seek_to_index_pos(client, client->file_handle) && - seek_to_gtid(client, client->file_handle)) + if (requested_gtid && seek_to_index_pos() && seek_to_gtid()) { - client->requested_gtid = false; + requested_gtid = false; } - read_more = stream_json(client); + read_more = stream_json(); break; case AVRO_FORMAT_AVRO: - read_more = stream_binary(client); + read_more = stream_binary(); break; default: - MXS_ERROR("Unexpected format: %d", client->format); + MXS_ERROR("Unexpected format: %d", format); break; } - if (maxavro_get_error(client->file_handle) != MAXAVRO_ERR_NONE) + if (maxavro_get_error(file_handle) != MAXAVRO_ERR_NONE) { MXS_ERROR("Reading Avro file failed with error '%s'.", - maxavro_get_error_string(client->file_handle)); + maxavro_get_error_string(file_handle)); } - client->last_sent_pos = client->file_handle->records_read; + last_sent_pos = file_handle->records_read; } } else { fprintf(stderr, "No file specified\n"); - dcb_printf(client->dcb, "ERR avro file not specified"); + dcb_printf(dcb, "ERR avro file not specified"); } return read_more; } -GWBUF* read_avro_json_schema(const char *avrofile, const char* dir) +GWBUF* read_avro_json_schema(std::string avrofile, std::string dir) { GWBUF* rval = NULL; - const char *suffix = strrchr(avrofile, '.'); - if (suffix) + // Copy the name and swap the suffix from .avro to .avsc + std::string schemafile = dir + "/" + avrofile.substr(0, avrofile.length() - 2) + "sc"; + std::ifstream file(schemafile); + + if (file.good()) { - char buffer[PATH_MAX + 1]; - snprintf(buffer, sizeof(buffer), "%s/%.*s.avsc", dir, (int)(suffix - avrofile), - avrofile); - FILE *file = fopen(buffer, "rb"); - - if (file) - { - int nread; - while ((nread = fread(buffer, 1, sizeof(buffer) - 1, file)) > 0) - { - while (isspace(buffer[nread - 1])) - { - nread--; - } - - buffer[nread++] = '\n'; - - GWBUF * newbuf = gwbuf_alloc_and_load(nread, buffer); - - if (newbuf) - { - rval = gwbuf_append(rval, newbuf); - } - } - - fclose(file); - } - else - { - MXS_ERROR("Failed to open file '%s': %d, %s", buffer, errno, - mxs_strerror(errno)); - } + std::stringstream ss; + ss << file.rdbuf(); + std::string text = ss.str(); + mxs::Buffer buffer(std::vector(text.begin(), text.end())); + rval = buffer.release(); } + else + { + MXS_ERROR("Failed to open file '%s': %d, %s", schemafile.c_str(), errno, + mxs_strerror(errno)); + } + return rval; } -GWBUF* read_avro_binary_schema(const char *avrofile, const char* dir) +GWBUF* read_avro_binary_schema(std::string avrofile, std::string dir) { GWBUF* rval = NULL; - char buffer[PATH_MAX + 1]; - snprintf(buffer, sizeof(buffer), "%s/%s", dir, avrofile); - MAXAVRO_FILE *file = maxavro_file_open(buffer); + std::string filename = dir + '/' + avrofile; + MAXAVRO_FILE *file = maxavro_file_open(filename.c_str()); if (file) { @@ -874,7 +824,7 @@ GWBUF* read_avro_binary_schema(const char *avrofile, const char* dir) } else { - MXS_ERROR("Failed to open file '%s'.", buffer); + MXS_ERROR("Failed to open file '%s'.", filename.c_str()); } return rval; @@ -885,34 +835,23 @@ GWBUF* read_avro_binary_schema(const char *avrofile, const char* dir) * @param client Avro client session * @param fullname Absolute path to the file to rotate to */ -static void rotate_avro_file(AvroSession *client, char *fullname) +void AvroSession::rotate_avro_file(std::string fullname) { - char *filename = strrchr(fullname, '/') + 1; - size_t len = strlen(filename); - if (len > AVRO_MAX_FILENAME_LEN) - { - // TODO: This function is in need of a return value. It would - // TODO: be better to abort if the name is too long and also - // TODO: if the opening of the file fails. - MXS_ERROR("Filename %s of length %lu is longer than maximum allowed " - "length %d. Trailing data will be cut.", - filename, len, AVRO_MAX_FILENAME_LEN); - len = AVRO_MAX_FILENAME_LEN; - } - strncpy(client->avro_binfile, filename, len); - client->avro_binfile[len] = 0; - client->last_sent_pos = 0; + auto pos = fullname.find_last_of('/'); + ss_dassert(pos != std::string::npos); + avro_binfile = fullname.substr(pos + 1); + last_sent_pos = 0; - maxavro_file_close(client->file_handle); + maxavro_file_close(file_handle); - if ((client->file_handle = maxavro_file_open(fullname)) == NULL) + if ((file_handle = maxavro_file_open(fullname.c_str())) == NULL) { - MXS_ERROR("Failed to open file: %s", filename); + MXS_ERROR("Failed to open file: %s", fullname.c_str()); } else { - MXS_INFO("Rotated '%s'@'%s' to file: %s", client->dcb->user, - client->dcb->remote, fullname); + MXS_INFO("Rotated '%s'@'%s' to file: %s", dcb->user, + dcb->remote, fullname.c_str()); } } @@ -923,25 +862,25 @@ static void rotate_avro_file(AvroSession *client, char *fullname) * @param dest Destination where the full path to the file is printed * @param len Size of @p dest */ -static void print_next_filename(const char *file, const char *dir, char *dest, size_t len) +static std::string get_next_filename(std::string file, std::string dir) { - char buffer[strlen(file) + 1]; - strcpy(buffer, file); - char *ptr = strrchr(buffer, '.'); + // Find the last and second to last dot + auto last = file.find_last_of('.'); + auto almost_last = file.find_last_of('.', last); + ss_dassert(last != std::string::npos && almost_last != std::string::npos); - if (ptr) - { - ptr--; - while (ptr > buffer && *(ptr) != '.') - { - ptr--; - } + // Extract the number between the dots + std::string number_part = file.substr(almost_last + 1, last); + int filenum = strtol(number_part.c_str(), NULL, 10); - int filenum = strtol(ptr + 1, NULL, 10); - *ptr = '\0'; - snprintf(dest, len, "%s/%s.%06d.avro", - dir, buffer, filenum + 1); - } + std::string file_part = file.substr(0, almost_last); + + // Print it out the new filename with the file number incremented by one + char outbuf[PATH_MAX + 1]; + snprintf(outbuf, sizeof(outbuf), "%s/%s.%06d.avro", + dir.c_str(), file_part.c_str(), filenum + 1); + + return std::string(outbuf); } /** @@ -956,89 +895,59 @@ int avro_client_callback(DCB *dcb, DCB_REASON reason, void *userdata) { if (reason == DCB_REASON_DRAINED) { - AvroSession *client = (AvroSession*)userdata; - - spinlock_acquire(&client->catch_lock); - if (client->cstate & AVRO_CS_BUSY) - { - spinlock_release(&client->catch_lock); - return 0; - } - - client->cstate |= AVRO_CS_BUSY; - spinlock_release(&client->catch_lock); - - if (client->last_sent_pos == 0) - { - /** Send the schema of the current file */ - GWBUF *schema = NULL; - - switch (client->format) - { - case AVRO_FORMAT_JSON: - schema = read_avro_json_schema(client->avro_binfile, client->router->avrodir.c_str()); - break; - - case AVRO_FORMAT_AVRO: - schema = read_avro_binary_schema(client->avro_binfile, client->router->avrodir.c_str()); - break; - - default: - MXS_ERROR("Unknown client format: %d", client->format); - } - - if (schema) - { - client->dcb->func.write(client->dcb, schema); - } - } - - /** Stream the data to the client */ - bool read_more = avro_client_stream_data(client); - - char filename[PATH_MAX + 1]; - print_next_filename(client->avro_binfile, client->router->avrodir.c_str(), - filename, sizeof(filename)); - - bool next_file; - /** If the next file is available, send it to the client */ - if ((next_file = (access(filename, R_OK) == 0))) - { - rotate_avro_file(client, filename); - } - - spinlock_acquire(&client->catch_lock); - client->cstate &= ~AVRO_CS_BUSY; - client->cstate |= AVRO_WAIT_DATA; - - if (next_file || read_more) - { -#ifdef SS_DEBUG - if (read_more) - { - MXS_DEBUG("Burst limit hit, need to read more data."); - } -#endif - avro_notify_client(client); - } - spinlock_release(&client->catch_lock); + AvroSession *client = static_cast(userdata); + client->client_callback(); } return 0; } -/** - * @brief Notify a client that new data is available - * - * The client catch_lock must be held when calling this function. - * - * @param client Client to notify - */ -void avro_notify_client(AvroSession *client) +void AvroSession::client_callback() { - /* Add fake event that will call the avro_client_callback() routine */ - poll_fake_write_event(client->dcb); - client->cstate &= ~AVRO_WAIT_DATA; + if (last_sent_pos == 0) + { + // TODO: Don't use DCB callbacks to stream the data + last_sent_pos = 1; + + /** Send the schema of the current file */ + GWBUF *schema = NULL; + + switch (format) + { + case AVRO_FORMAT_JSON: + schema = read_avro_json_schema(avro_binfile, router->avrodir); + break; + + case AVRO_FORMAT_AVRO: + schema = read_avro_binary_schema(avro_binfile, router->avrodir); + break; + + default: + MXS_ERROR("Unknown client format: %d", format); + break; + } + + if (schema) + { + dcb->func.write(dcb, schema); + } + } + + /** Stream the data to the client */ + bool read_more = stream_data(); + std::string filename = get_next_filename(avro_binfile, router->avrodir); + + bool next_file; + /** If the next file is available, send it to the client */ + if ((next_file = (access(filename.c_str(), R_OK) == 0))) + { + rotate_avro_file(filename); + } + + if (next_file || read_more) + { + poll_fake_write_event(dcb); + } } // static @@ -1072,15 +981,12 @@ AvroSession::AvroSession(Avro* instance, MXS_SESSION* session, sqlite3* handle): dcb(session->client_dcb), state(AVRO_CLIENT_UNREGISTERED), format(AVRO_FORMAT_UNDEFINED), - uuid(NULL), catch_lock(SPINLOCK_INIT), router(instance), file_handle(NULL), last_sent_pos(0), connect_time(time(NULL)), - avro_binfile{0}, requested_gtid(false), - cstate(0), sqlite_handle(handle) { } @@ -1090,7 +996,6 @@ AvroSession::~AvroSession() ss_debug(int prev_val = )atomic_add(&router->stats.n_clients, -1); ss_dassert(prev_val > 0); - free(uuid); maxavro_file_close(file_handle); sqlite3_close_v2(sqlite_handle); } diff --git a/server/modules/routing/avrorouter/avro_file.cc b/server/modules/routing/avrorouter/avro_file.cc index 903d10e68..89bb1393b 100644 --- a/server/modules/routing/avrorouter/avro_file.cc +++ b/server/modules/routing/avrorouter/avro_file.cc @@ -432,8 +432,7 @@ bool notify_cb(DCB* dcb, void* data) if (dcb->service == service && dcb->dcb_role == DCB_ROLE_CLIENT_HANDLER) { - AvroSession* ses = reinterpret_cast(dcb->session->router_session); - avro_notify_client(ses); + poll_fake_write_event(dcb); } return true; diff --git a/server/modules/routing/avrorouter/avrorouter.hh b/server/modules/routing/avrorouter/avrorouter.hh index 6e2ac6868..0c2b00bb5 100644 --- a/server/modules/routing/avrorouter/avrorouter.hh +++ b/server/modules/routing/avrorouter/avrorouter.hh @@ -307,21 +307,45 @@ public: DCB* dcb; /*< The client DCB */ int state; /*< The state of this client */ enum avro_data_format format; /*< Stream JSON or Avro data */ - char* uuid; /*< Client UUID */ + std::string uuid; /*< Client UUID */ SPINLOCK catch_lock; /*< Event catchup lock */ Avro* router; /*< Pointer to the owning router */ MAXAVRO_FILE* file_handle; /*< Current open file handle */ uint64_t last_sent_pos; /*< The last record we sent */ time_t connect_time; /*< Connect time of slave */ - char avro_binfile[AVRO_MAX_FILENAME_LEN + 1]; + std::string avro_binfile; bool requested_gtid; /*< If the client requested */ gtid_pos_t gtid; /*< Current/requested GTID */ gtid_pos_t gtid_start; /*< First sent GTID */ - unsigned int cstate; /*< Catch up state */ sqlite3* sqlite_handle; + /** + * Process a client request + * + * @param Buffer The incoming request packet + * + * @return 1 on success, 0 on error + */ + int routeQuery(GWBUF* buffer); + + /** + * Handler for the EPOLLOUT event + */ + void client_callback(); + private: AvroSession(Avro* instance, MXS_SESSION* session, sqlite3* handle); + + int do_registration(GWBUF *data); + void process_command(GWBUF *queue); + void send_gtid_info(gtid_pos_t *gtid_pos); + void set_current_gtid(json_t *row); + bool stream_json(); + bool stream_binary(); + bool seek_to_index_pos(); + bool seek_to_gtid(); + bool stream_data(); + void rotate_avro_file(std::string fullname); }; extern void read_table_info(uint8_t *ptr, uint8_t post_header_len, uint64_t *table_id,