Initial implementation of a Lua filter
This commit is contained in:
parent
2a2fdb2cc6
commit
61ec8458d3
@ -41,6 +41,10 @@ target_link_libraries(namedserverfilter maxscale-common)
|
||||
set_target_properties(namedserverfilter PROPERTIES VERSION "1.1.0")
|
||||
install(TARGETS namedserverfilter DESTINATION ${MAXSCALE_LIBDIR})
|
||||
|
||||
add_library(luafilter SHARED luafilter.cc)
|
||||
target_link_libraries(luafilter log_manager utils lua)
|
||||
install(TARGETS luafilter DESTINATION modules)
|
||||
|
||||
|
||||
if(BUILD_SLAVELAG)
|
||||
add_library(slavelag SHARED slavelag.c)
|
||||
|
@ -29,24 +29,25 @@
|
||||
*/
|
||||
|
||||
#include <my_config.h>
|
||||
#include <skygw_types.h>
|
||||
#include <spinlock.h>
|
||||
#include <plugin.h>
|
||||
#include <skygw_debug.h>
|
||||
#include <log_manager.h>
|
||||
extern "C"
|
||||
{
|
||||
#include <ctype.h>
|
||||
#include <stdio.h>
|
||||
#include <fcntl.h>
|
||||
#include <filter.h>
|
||||
#include <string.h>
|
||||
#include <atomic.h>
|
||||
#include <modutil.h>
|
||||
#include <log_manager.h>
|
||||
#include <spinlock.h>
|
||||
#include <filter.h>
|
||||
#include <session.h>
|
||||
#include <plugin.h>
|
||||
#include <skygw_types.h>
|
||||
|
||||
extern "C"{
|
||||
#include <modutil.h>
|
||||
#include <lua.h>
|
||||
#include <lualib.h>
|
||||
#include <lauxlib.h>
|
||||
}
|
||||
|
||||
|
||||
|
||||
MODULE_INFO info = {
|
||||
MODULE_API_FILTER,
|
||||
@ -56,6 +57,17 @@ MODULE_INFO info = {
|
||||
};
|
||||
|
||||
static char *version_str = "V1.0.0";
|
||||
/**
|
||||
* Implementation of the mandatory version entry point
|
||||
*
|
||||
* @return version string of the module
|
||||
*/
|
||||
char *
|
||||
version()
|
||||
{
|
||||
return version_str;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* The filter entry points
|
||||
@ -70,6 +82,7 @@ static int routeQuery(FILTER *instance, void *fsession, GWBUF *queue);
|
||||
static int clientReply(FILTER *instance, void *fsession, GWBUF *queue);
|
||||
static void diagnostic(FILTER *instance, void *fsession, DCB *dcb);
|
||||
|
||||
|
||||
static FILTER_OBJECT MyObject = {
|
||||
createInstance,
|
||||
newSession,
|
||||
@ -82,6 +95,25 @@ static FILTER_OBJECT MyObject = {
|
||||
diagnostic,
|
||||
};
|
||||
|
||||
/**
|
||||
* The module entry point routine. It is this routine that
|
||||
* must populate the structure that is referred to as the
|
||||
* "module object", this is a structure with the set of
|
||||
* external entry points for this module.
|
||||
*
|
||||
* @return The module object
|
||||
*/
|
||||
FILTER_OBJECT *
|
||||
GetModuleObject()
|
||||
{
|
||||
return &MyObject;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* The Lua filter instance.
|
||||
*/
|
||||
@ -101,16 +133,7 @@ typedef struct {
|
||||
UPSTREAM up;
|
||||
} LUA_SESSION;
|
||||
|
||||
/**
|
||||
* Implementation of the mandatory version entry point
|
||||
*
|
||||
* @return version string of the module
|
||||
*/
|
||||
char *
|
||||
version()
|
||||
{
|
||||
return version_str;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* The module initialisation routine, called when the module
|
||||
@ -121,20 +144,6 @@ ModuleInit()
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* The module entry point routine. It is this routine that
|
||||
* must populate the structure that is referred to as the
|
||||
* "module object", this is a structure with the set of
|
||||
* external entry points for this module.
|
||||
*
|
||||
* @return The module object
|
||||
*/
|
||||
FILTER_OBJECT *
|
||||
GetModuleObject()
|
||||
{
|
||||
return &MyObject;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Create an instance of the filter for a particular service
|
||||
@ -149,7 +158,7 @@ createInstance(char **options, FILTER_PARAMETER **params)
|
||||
{
|
||||
LUA_INSTANCE *my_instance;
|
||||
int i;
|
||||
if ((my_instance = calloc(1, sizeof(LUA_INSTANCE))) == NULL){
|
||||
if ((my_instance = (LUA_INSTANCE*)calloc(1, sizeof(LUA_INSTANCE))) == NULL){
|
||||
return NULL;
|
||||
}
|
||||
|
||||
@ -177,7 +186,10 @@ createInstance(char **options, FILTER_PARAMETER **params)
|
||||
}
|
||||
|
||||
lua_getglobal(my_instance->global_state,"createInstance");
|
||||
lua_pcall(my_instance->global_state,0,0,0);
|
||||
if(lua_pcall(my_instance->global_state,0,0,0)){
|
||||
skygw_log_write(LOGFILE_ERROR, "luafilter: Failed to get global variable 'createInstance': %s.",
|
||||
lua_tostring(my_instance->global_state,-1));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -202,19 +214,23 @@ newSession(FILTER *instance, SESSION *session)
|
||||
LUA_SESSION *my_session;
|
||||
LUA_INSTANCE *my_instance = (LUA_INSTANCE*)instance;
|
||||
|
||||
if ((my_session = calloc(1, sizeof(LUA_SESSION))) == NULL){
|
||||
if ((my_session = (LUA_SESSION*)calloc(1, sizeof(LUA_SESSION))) == NULL){
|
||||
return NULL;
|
||||
}
|
||||
my_session->session = session;
|
||||
my_session->state = luaL_newstate();
|
||||
luaL_openlibs(my_session->state);
|
||||
if(luaL_dofile(my_session->state,my_instance->session_script)){
|
||||
skygw_log_write(LOGFILE_ERROR, "luafilter: Failed to execute session script at '%s'.",my_instance->session_script);
|
||||
skygw_log_write(LOGFILE_ERROR, "luafilter: Failed to execute session script at '%s': %s.",
|
||||
my_instance->session_script,
|
||||
lua_tostring(my_session->state,-1));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
lua_getglobal(my_session->state,"newSession");
|
||||
lua_pcall(my_session->state,0,0,0);
|
||||
if( lua_pcall(my_session->state,0,0,0)){
|
||||
skygw_log_write(LOGFILE_ERROR, "luafilter: Failed to get global variable 'newSession': '%s'.",
|
||||
lua_tostring(my_session->state,-1));
|
||||
}
|
||||
|
||||
return my_session;
|
||||
}
|
||||
@ -232,10 +248,13 @@ static void
|
||||
closeSession(FILTER *instance, void *session)
|
||||
{
|
||||
LUA_SESSION *my_session = (LUA_SESSION *)session;
|
||||
LUA_INSTANCE *my_instance = (LUA_INSTANCE*)instance;
|
||||
//LUA_INSTANCE *my_instance = (LUA_INSTANCE*)instance;
|
||||
|
||||
lua_getglobal(my_session->state,"closeSession");
|
||||
lua_pcall(my_session->state,0,0,0);
|
||||
if(lua_pcall(my_session->state,0,0,0)){
|
||||
skygw_log_write(LOGFILE_ERROR, "luafilter: Failed to get global variable 'closeSession': '%s'.",
|
||||
lua_tostring(my_session->state,-1));
|
||||
}
|
||||
|
||||
lua_close(my_session->state);
|
||||
}
|
||||
@ -282,17 +301,23 @@ static int clientReply(FILTER *instance, void *session, GWBUF *queue)
|
||||
char *fullquery;
|
||||
int qsize;
|
||||
|
||||
qsize = (unsigned char*)queue->end - (unsigned char*)(queue->start + 5);
|
||||
fullquery = calloc(qsize + 1,sizeof(char));
|
||||
memcpy(fullquery,(queue->start + 5),qsize);
|
||||
qsize = (unsigned char*)queue->end - (((unsigned char*)queue->start) + 5);
|
||||
fullquery = (char*)calloc(qsize + 1,sizeof(char));
|
||||
memcpy(fullquery,(((unsigned char*)queue->start) + 5),qsize);
|
||||
|
||||
lua_getglobal(my_session->state,"clientReply");
|
||||
lua_pushlstring(my_session->state,fullquery,qsize + 1);
|
||||
lua_pcall(my_session->state,1,0,0);
|
||||
if(lua_pcall(my_session->state,1,0,0)){
|
||||
skygw_log_write(LOGFILE_ERROR, "luafilter: Session scope call to 'clientReply' failed: '%s'.",
|
||||
lua_tostring(my_session->state,-1));
|
||||
}
|
||||
|
||||
lua_getglobal(my_instance->global_state,"clientReply");
|
||||
lua_pushlstring(my_instance->global_state,fullquery,strlen(fullquery));
|
||||
lua_pcall(my_instance->global_state,1,0,0);
|
||||
if(lua_pcall(my_instance->global_state,1,0,0)){
|
||||
skygw_log_write(LOGFILE_ERROR, "luafilter: Global scope call to 'clientReply' failed: '%s'.",
|
||||
lua_tostring(my_session->state,-1));
|
||||
}
|
||||
|
||||
return my_session->up.clientReply(my_session->up.instance,
|
||||
my_session->up.session, queue);
|
||||
@ -320,18 +345,24 @@ routeQuery(FILTER *instance, void *session, GWBUF *queue)
|
||||
if(modutil_is_SQL(queue)){
|
||||
|
||||
modutil_extract_SQL(queue, &ptr, &qlen);
|
||||
fullquery = malloc((qlen + 1) * sizeof(char));
|
||||
fullquery = (char*)malloc((qlen + 1) * sizeof(char));
|
||||
memcpy(fullquery,ptr,qlen);
|
||||
memset(fullquery + qlen,0,1);
|
||||
|
||||
lua_getglobal(my_session->state,"routeQuery");
|
||||
lua_pushlstring(my_session->state,fullquery,strlen(fullquery));
|
||||
lua_pcall(my_session->state,1,0,0);
|
||||
if(lua_pcall(my_session->state,1,0,0)){
|
||||
skygw_log_write(LOGFILE_ERROR, "luafilter: Session scope call to 'routeQuery' failed: '%s'.",
|
||||
lua_tostring(my_session->state,-1));
|
||||
}
|
||||
|
||||
|
||||
lua_getglobal(my_instance->global_state,"routeQuery");
|
||||
lua_pushlstring(my_instance->global_state,fullquery,strlen(fullquery));
|
||||
lua_pcall(my_instance->global_state,1,0,0);
|
||||
|
||||
if(lua_pcall(my_instance->global_state,1,0,0)){
|
||||
skygw_log_write(LOGFILE_ERROR, "luafilter: Global scope call to 'routeQuery' failed: '%s'.",
|
||||
lua_tostring(my_session->state,-1));
|
||||
}
|
||||
free(fullquery);
|
||||
}
|
||||
|
||||
@ -342,9 +373,6 @@ routeQuery(FILTER *instance, void *session, GWBUF *queue)
|
||||
/**
|
||||
* Diagnostics routine
|
||||
*
|
||||
* Prints the connection details and the names of the exchange,
|
||||
* queue and the routing key.
|
||||
*
|
||||
* @param instance The filter instance
|
||||
* @param fsession Filter session, may be NULL
|
||||
* @param dcb The DCB for diagnostic output
|
||||
@ -359,4 +387,4 @@ diagnostic(FILTER *instance, void *fsession, DCB *dcb)
|
||||
dcb_printf(dcb, "\t\tLua Filter\nglobal script: %s\nsession script: %s\n",my_instance->global_script,my_instance->session_script);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user