Move AvroSession initialization inside the class

The AvroSession now has the AvroSession::create method for creation new
sessions. The destructor handles the freeing of all allocated resources.
This commit is contained in:
Markus Mäkelä
2018-05-22 17:36:11 +03:00
parent fb53bbf4da
commit d5760f4301
4 changed files with 84 additions and 66 deletions

View File

@ -55,9 +55,6 @@ using namespace maxscale;
#define AVRO_TASK_DELAY_MAX 15
static const char* index_task_name = "avro_indexing";
static const char* avro_index_name = "avro.index";
/** For detection of CREATE/ALTER TABLE statements */
static const char* create_table_regex =
"(?i)create[a-z0-9[:space:]_]+table";
@ -622,45 +619,8 @@ createInstance(SERVICE *service, char **options)
static MXS_ROUTER_SESSION *
newSession(MXS_ROUTER *instance, MXS_SESSION *session)
{
Avro *inst = (Avro *) instance;
AvroSession *client = new (std::nothrow) AvroSession;
if (client == NULL)
{
return NULL;
}
atomic_add(&inst->stats.n_clients, 1);
client->dcb = session->client_dcb;
client->state = AVRO_CLIENT_UNREGISTERED;
client->format = AVRO_FORMAT_UNDEFINED;
client->uuid = NULL;
spinlock_init(&client->catch_lock);
client->router = inst;
client->file_handle = NULL; /*< Current open file handle */
client->last_sent_pos = 0; /*< The last record we sent */
client->connect_time = time(0);
client->avro_binfile[0] = '\0';
client->requested_gtid = false; /*< If the client requested */
memset(&client->gtid, 0, sizeof(client->gtid));
memset(&client->gtid_start, 0, sizeof(client->gtid_start));
client->cstate = 0;
client->sqlite_handle = NULL;
char dbpath[PATH_MAX + 1];
snprintf(dbpath, sizeof(dbpath), "/%s/%s", inst->avrodir, avro_index_name);
/** A new handle for each client allows thread-safe use of the sqlite database */
if (sqlite3_open_v2(dbpath, &client->sqlite_handle,
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL) != SQLITE_OK)
{
MXS_ERROR("Failed to open SQLite database '%s': %s", dbpath,
sqlite3_errmsg(inst->sqlite_handle));
sqlite3_close_v2(client->sqlite_handle);
}
return reinterpret_cast<MXS_ROUTER_SESSION*>(client);
Avro* inst = reinterpret_cast<Avro*>(instance);
return AvroSession::create(inst, session);
}
/**
@ -677,16 +637,7 @@ newSession(MXS_ROUTER *instance, MXS_SESSION *session)
*/
static void freeSession(MXS_ROUTER* router_instance, MXS_ROUTER_SESSION* router_client_ses)
{
Avro *router = (Avro *) router_instance;
AvroSession *client = (AvroSession *) router_client_ses;
ss_debug(int prev_val = )atomic_add(&router->stats.n_clients, -1);
ss_dassert(prev_val > 0);
free(client->uuid);
maxavro_file_close(client->file_handle);
sqlite3_close_v2(client->sqlite_handle);
delete client;
}
@ -699,14 +650,6 @@ static void freeSession(MXS_ROUTER* router_instance, MXS_ROUTER_SESSION* router_
*/
static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session)
{
Avro *router = (Avro *) instance;
AvroSession *client = (AvroSession *) router_session;
spinlock_acquire(&client->catch_lock);
client->state = AVRO_CLIENT_UNREGISTERED;
spinlock_release(&client->catch_lock);
}
/**
@ -898,7 +841,7 @@ bool converter_func(Worker::Call::action_t action, Avro* router)
router->task_delay = MXS_MIN(router->task_delay + 1, AVRO_TASK_DELAY_MAX);
MXS_INFO("Stopped processing file %s at position %lu. Waiting until"
" more data is written before continuing. Next check in %d seconds.",
" more data is written before continuing. Next check in %d seconds.",
router->binlog_name, router->current_pos, router->task_delay);
}

View File

@ -486,7 +486,7 @@ avro_client_process_command(Avro *router, AvroSession *client, GWBUF *queue)
/** Return requested GTID */
else if (strstr((char *)data, req_gtid))
{
gtid_pos_t gtid = {0, 0, 0, 0, 0};
gtid_pos_t gtid;
extract_gtid_request(&gtid, (char*)data + sizeof(req_gtid),
GWBUF_LENGTH(queue) - sizeof(req_gtid));
send_gtid_info(router, &gtid, client->dcb);
@ -1040,3 +1040,57 @@ void avro_notify_client(AvroSession *client)
poll_fake_write_event(client->dcb);
client->cstate &= ~AVRO_WAIT_DATA;
}
// static
AvroSession* AvroSession::create(Avro* inst, MXS_SESSION* session)
{
AvroSession* client = NULL;
sqlite3* handle;
char dbpath[PATH_MAX + 1];
snprintf(dbpath, sizeof(dbpath), "/%s/%s", inst->avrodir, avro_index_name);
if (sqlite3_open_v2(dbpath, &handle,
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL) != SQLITE_OK)
{
MXS_ERROR("Failed to open SQLite database '%s': %s", dbpath, sqlite3_errmsg(handle));
sqlite3_close_v2(handle);
}
else if ((client = new (std::nothrow) AvroSession(inst, session, handle)) == NULL)
{
MXS_OOM();
sqlite3_close_v2(handle);
}
else
{
atomic_add(&inst->stats.n_clients, 1);
}
return client;
}
AvroSession::AvroSession(Avro* instance, MXS_SESSION* session, sqlite3* handle):
dcb(session->client_dcb),
state(AVRO_CLIENT_UNREGISTERED),
format(AVRO_FORMAT_UNDEFINED),
uuid(NULL),
catch_lock(SPINLOCK_INIT),
router(instance),
file_handle(NULL),
last_sent_pos(0),
connect_time(time(NULL)),
avro_binfile{0},
requested_gtid(false),
cstate(0),
sqlite_handle(handle)
{
}
AvroSession::~AvroSession()
{
ss_debug(int prev_val = )atomic_add(&router->stats.n_clients, -1);
ss_dassert(prev_val > 0);
free(uuid);
maxavro_file_close(file_handle);
sqlite3_close_v2(sqlite_handle);
}

View File

@ -97,7 +97,7 @@ void avro_index_file(Avro *router, const char* filename)
return;
}
gtid_pos_t prev_gtid = {0, 0, 0, 0, 0};
gtid_pos_t prev_gtid;
if (sqlite3_exec(router->sqlite_handle, "BEGIN", NULL, NULL, &errmsg) != SQLITE_OK)
{

View File

@ -58,6 +58,8 @@ MXS_BEGIN_DECLS
/** Name of the file where the binlog to Avro conversion progress is stored */
#define AVRO_PROGRESS_FILE "avro-conversion.ini"
static const char* avro_index_name = "avro.index";
/** Buffer limits */
#define AVRO_SQL_BUFFER_SIZE 2048
@ -214,10 +216,19 @@ enum mxs_avro_codec_type
MXS_AVRO_CODEC_NULL,
MXS_AVRO_CODEC_DEFLATE,
MXS_AVRO_CODEC_SNAPPY, /**< Not yet implemented */
} ;
};
typedef struct gtid_pos
struct gtid_pos_t
{
gtid_pos_t():
timestamp(0),
domain(0),
server_id(0),
seq(0),
event_num(0)
{
}
uint32_t timestamp; /*< GTID event timestamp */
uint64_t domain; /*< Replication domain */
uint64_t server_id; /*< Server ID */
@ -226,7 +237,7 @@ typedef struct gtid_pos
* is an internal representation of the position of
* an event inside a GTID event and it is used to
* rebuild GTID events in the correct order. */
} gtid_pos_t;
};
typedef std::tr1::shared_ptr<TABLE_CREATE> STableCreate;
typedef std::tr1::shared_ptr<AVRO_TABLE> SAvroTable;
@ -276,8 +287,15 @@ struct Avro
uint32_t task_handle; /**< Delayed task handle */
};
struct AvroSession
class AvroSession: public MXS_ROUTER_SESSION
{
AvroSession(const AvroSession&) = delete;
AvroSession& operator=(const AvroSession&) = delete;
public:
static AvroSession* create(Avro* router, MXS_SESSION* session);
~AvroSession();
DCB* dcb; /*< The client DCB */
int state; /*< The state of this client */
enum avro_data_format format; /*< Stream JSON or Avro data */
@ -293,6 +311,9 @@ struct AvroSession
gtid_pos_t gtid_start; /*< First sent GTID */
unsigned int cstate; /*< Catch up state */
sqlite3* sqlite_handle;
private:
AvroSession(Avro* instance, MXS_SESSION* session, sqlite3* handle);
};
extern void read_table_info(uint8_t *ptr, uint8_t post_header_len, uint64_t *table_id,