If a connection has not been fully established (i.e. authentication has been completed) then it should not be considered as a connection pool candidate.
		
			
				
	
	
		
			453 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
			
		
		
	
	
			453 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
/*
 | 
						|
 * Copyright (c) 2016 MariaDB Corporation Ab
 | 
						|
 *
 | 
						|
 * Use of this software is governed by the Business Source License included
 | 
						|
 * in the LICENSE.TXT file and at www.mariadb.com/bsl11.
 | 
						|
 *
 | 
						|
 * Change Date: 2019-07-01
 | 
						|
 *
 | 
						|
 * On the date above, in accordance with the Business Source License, use
 | 
						|
 * of this software will be governed by version 2 or later of the General
 | 
						|
 * Public License.
 | 
						|
 */
 | 
						|
 | 
						|
/**
 | 
						|
 * @file cdc.c - Change Data Capture Listener protocol module
 | 
						|
 *
 | 
						|
 * The change data capture protocol module is intended as a mechanism to allow connections
 | 
						|
 * into maxscale for the purpose of accessing information within
 | 
						|
 * the maxscale with a Change Data Capture API interface (supporting Avro right now)
 | 
						|
 * databases.
 | 
						|
 *
 | 
						|
 * In the first instance it is intended to connect, authenticate and retieve data in the Avro format
 | 
						|
 * as requested by compatible clients.
 | 
						|
 *
 | 
						|
 * @verbatim
 | 
						|
 * Revision History
 | 
						|
 * Date     Who         Description
 | 
						|
 * 11/01/2016   Massimiliano Pinto  Initial implementation
 | 
						|
 *
 | 
						|
 * @endverbatim
 | 
						|
 */
 | 
						|
 | 
						|
#define MXS_MODULE_NAME "CDC"
 | 
						|
 | 
						|
#include <cdc.h>
 | 
						|
#include <maxscale/alloc.h>
 | 
						|
#include <maxscale/modinfo.h>
 | 
						|
#include <maxscale/log_manager.h>
 | 
						|
#include <maxscale/protocol.h>
 | 
						|
#include <maxscale/modinfo.h>
 | 
						|
#include <maxscale/poll.h>
 | 
						|
 | 
						|
#define ISspace(x) isspace((int)(x))
 | 
						|
#define CDC_SERVER_STRING "MaxScale(c) v.1.0.0"
 | 
						|
 | 
						|
static int cdc_read_event(DCB* dcb);
 | 
						|
static int cdc_write_event(DCB *dcb);
 | 
						|
static int cdc_write(DCB *dcb, GWBUF *queue);
 | 
						|
static int cdc_error(DCB *dcb);
 | 
						|
static int cdc_hangup(DCB *dcb);
 | 
						|
static int cdc_accept(DCB *dcb);
 | 
						|
static int cdc_close(DCB *dcb);
 | 
						|
static int cdc_listen(DCB *dcb, char *config);
 | 
						|
static CDC_protocol *cdc_protocol_init(DCB* dcb);
 | 
						|
static void cdc_protocol_done(DCB* dcb);
 | 
						|
static int do_auth(DCB *dcb, GWBUF *buffer, void *data);
 | 
						|
static void write_auth_ack(DCB *dcb);
 | 
						|
static void write_auth_err(DCB *dcb);
 | 
						|
 | 
						|
static char* cdc_default_auth()
 | 
						|
{
 | 
						|
    return "CDCPlainAuth";
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * The module entry point routine. It is this routine that
 | 
						|
 * must populate the structure that is referred to as the
 | 
						|
 * "module object", this is a structure with the set of
 | 
						|
 * external entry points for this module.
 | 
						|
 *
 | 
						|
 * @return The module object
 | 
						|
 */
 | 
						|
MXS_MODULE* MXS_CREATE_MODULE()
 | 
						|
{
 | 
						|
    static MXS_PROTOCOL MyObject =
 | 
						|
    {
 | 
						|
        cdc_read_event, /* Read - EPOLLIN handler        */
 | 
						|
        cdc_write, /* Write - data from gateway     */
 | 
						|
        cdc_write_event, /* WriteReady - EPOLLOUT handler */
 | 
						|
        cdc_error, /* Error - EPOLLERR handler      */
 | 
						|
        cdc_hangup, /* HangUp - EPOLLHUP handler     */
 | 
						|
        cdc_accept, /* Accept                        */
 | 
						|
        NULL, /* Connect                       */
 | 
						|
        cdc_close, /* Close                         */
 | 
						|
        cdc_listen, /* Create a listener             */
 | 
						|
        NULL, /* Authentication                */
 | 
						|
        NULL, /* Session                       */
 | 
						|
        cdc_default_auth, /* default authentication */
 | 
						|
        NULL,
 | 
						|
        NULL,
 | 
						|
    };
 | 
						|
 | 
						|
    static MXS_MODULE info =
 | 
						|
    {
 | 
						|
        MXS_MODULE_API_PROTOCOL,
 | 
						|
        MXS_MODULE_IN_DEVELOPMENT,
 | 
						|
        MXS_PROTOCOL_VERSION,
 | 
						|
        "A Change Data Capture Listener implementation for use in binlog events retrieval",
 | 
						|
        "V1.0.0",
 | 
						|
        &MyObject,
 | 
						|
        NULL, /* Process init. */
 | 
						|
        NULL, /* Process finish. */
 | 
						|
        NULL, /* Thread init. */
 | 
						|
        NULL, /* Thread finish. */
 | 
						|
        {
 | 
						|
            {MXS_END_MODULE_PARAMS}
 | 
						|
        }
 | 
						|
    };
 | 
						|
 | 
						|
    return &info;
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * Read event for EPOLLIN on the CDC protocol module.
 | 
						|
 *
 | 
						|
 * @param dcb    The descriptor control block
 | 
						|
 * @return
 | 
						|
 */
 | 
						|
static int
 | 
						|
cdc_read_event(DCB* dcb)
 | 
						|
{
 | 
						|
    MXS_SESSION *session = dcb->session;
 | 
						|
    CDC_protocol *protocol = (CDC_protocol *) dcb->protocol;
 | 
						|
    int n, auth_val, rc = 0;
 | 
						|
    GWBUF *head = NULL;
 | 
						|
    CDC_session *client_data = (CDC_session *) dcb->data;
 | 
						|
 | 
						|
    if ((n = dcb_read(dcb, &head, 0))  > 0)
 | 
						|
    {
 | 
						|
        switch (protocol->state)
 | 
						|
        {
 | 
						|
        case CDC_STATE_WAIT_FOR_AUTH:
 | 
						|
            if (CDC_STATE_AUTH_OK == (
 | 
						|
                    /* Fill CDC_session from incoming packet */
 | 
						|
                    auth_val = dcb->authfunc.extract(dcb, head)))
 | 
						|
            {
 | 
						|
                /* Call protocol authentication */
 | 
						|
                auth_val = dcb->authfunc.authenticate(dcb);
 | 
						|
            }
 | 
						|
 | 
						|
            /* Discard input buffer */
 | 
						|
            gwbuf_free(head);
 | 
						|
 | 
						|
            if (auth_val == CDC_STATE_AUTH_OK)
 | 
						|
            {
 | 
						|
                /* start a real session */
 | 
						|
                session = session_alloc(dcb->service, dcb);
 | 
						|
 | 
						|
                if (session != NULL)
 | 
						|
                {
 | 
						|
                    protocol->state = CDC_STATE_HANDLE_REQUEST;
 | 
						|
 | 
						|
                    write_auth_ack(dcb);
 | 
						|
 | 
						|
                    MXS_INFO("%s: Client [%s] authenticated with user [%s]",
 | 
						|
                             dcb->service->name, dcb->remote != NULL ? dcb->remote : "",
 | 
						|
                             client_data->user);
 | 
						|
                }
 | 
						|
                else
 | 
						|
                {
 | 
						|
                    auth_val = CDC_STATE_AUTH_NO_SESSION;
 | 
						|
                }
 | 
						|
            }
 | 
						|
 | 
						|
            if (auth_val != CDC_STATE_AUTH_OK)
 | 
						|
            {
 | 
						|
                protocol->state = CDC_STATE_AUTH_ERR;
 | 
						|
 | 
						|
                write_auth_err(dcb);
 | 
						|
                MXS_ERROR("%s: authentication failure from [%s], user [%s]",
 | 
						|
                          dcb->service->name, dcb->remote != NULL ? dcb->remote : "",
 | 
						|
                          client_data->user);
 | 
						|
 | 
						|
                /* force the client connection close */
 | 
						|
                dcb_close(dcb);
 | 
						|
 | 
						|
            }
 | 
						|
            break;
 | 
						|
 | 
						|
        case CDC_STATE_HANDLE_REQUEST:
 | 
						|
            // handle CLOSE command, it shoudl be routed as well and client connection closed after last transmission
 | 
						|
            if (strncmp((char*)GWBUF_DATA(head), "CLOSE", GWBUF_LENGTH(head)) == 0)
 | 
						|
            {
 | 
						|
                MXS_INFO("%s: Client [%s] has requested CLOSE action",
 | 
						|
                         dcb->service->name, dcb->remote != NULL ? dcb->remote : "");
 | 
						|
 | 
						|
                // gwbuf_set_type(head, GWBUF_TYPE_CDC);
 | 
						|
                // the router will close the client connection
 | 
						|
                //rc = MXS_SESSION_ROUTE_QUERY(session, head);
 | 
						|
 | 
						|
                // buffer not handled by router right now, consume it
 | 
						|
                gwbuf_free(head);
 | 
						|
 | 
						|
                /* right now, just force the client connection close */
 | 
						|
                dcb_close(dcb);
 | 
						|
            }
 | 
						|
            else
 | 
						|
            {
 | 
						|
                MXS_INFO("%s: Client [%s] requested [%.*s] action",
 | 
						|
                         dcb->service->name, dcb->remote != NULL ? dcb->remote : "",
 | 
						|
                         (int)GWBUF_LENGTH(head), (char*)GWBUF_DATA(head));
 | 
						|
 | 
						|
                // gwbuf_set_type(head, GWBUF_TYPE_CDC);
 | 
						|
                rc = MXS_SESSION_ROUTE_QUERY(session, head);
 | 
						|
            }
 | 
						|
            break;
 | 
						|
 | 
						|
        default:
 | 
						|
            MXS_INFO("%s: Client [%s] in unknown state %d", dcb->service->name,
 | 
						|
                     dcb->remote != NULL ? dcb->remote : "", protocol->state);
 | 
						|
            gwbuf_free(head);
 | 
						|
 | 
						|
            break;
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    return rc;
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * EPOLLOUT handler for the CDC protocol module.
 | 
						|
 *
 | 
						|
 * @param dcb    The descriptor control block
 | 
						|
 * @return
 | 
						|
 */
 | 
						|
static int
 | 
						|
cdc_write_event(DCB *dcb)
 | 
						|
{
 | 
						|
    return dcb_drain_writeq(dcb);
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * Write routine for the CDC protocol module.
 | 
						|
 *
 | 
						|
 * Writes the content of the buffer queue to the socket
 | 
						|
 * observing the non-blocking principles of the gateway.
 | 
						|
 *
 | 
						|
 * @param dcb   Descriptor Control Block for the socket
 | 
						|
 * @param queue Linked list of buffes to write
 | 
						|
 */
 | 
						|
static int
 | 
						|
cdc_write(DCB *dcb, GWBUF *queue)
 | 
						|
{
 | 
						|
    int rc;
 | 
						|
    rc = dcb_write(dcb, queue);
 | 
						|
    return rc;
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * Handler for the EPOLLERR event.
 | 
						|
 *
 | 
						|
 * @param dcb    The descriptor control block
 | 
						|
 */
 | 
						|
static int
 | 
						|
cdc_error(DCB *dcb)
 | 
						|
{
 | 
						|
    dcb_close(dcb);
 | 
						|
    return 0;
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * Handler for the EPOLLHUP event.
 | 
						|
 *
 | 
						|
 * @param dcb    The descriptor control block
 | 
						|
 */
 | 
						|
static int
 | 
						|
cdc_hangup(DCB *dcb)
 | 
						|
{
 | 
						|
    dcb_close(dcb);
 | 
						|
    return 0;
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * Handler for the EPOLLIN event when the DCB refers to the listening
 | 
						|
 * socket for the protocol.
 | 
						|
 *
 | 
						|
 * @param dcb    The descriptor control block
 | 
						|
 */
 | 
						|
static int
 | 
						|
cdc_accept(DCB *listener)
 | 
						|
{
 | 
						|
    int n_connect = 0;
 | 
						|
    DCB *client_dcb;
 | 
						|
 | 
						|
    while ((client_dcb = dcb_accept(listener)) != NULL)
 | 
						|
    {
 | 
						|
        CDC_session *client_data = NULL;
 | 
						|
        CDC_protocol *protocol = NULL;
 | 
						|
 | 
						|
        /* allocating CDC protocol */
 | 
						|
        protocol = cdc_protocol_init(client_dcb);
 | 
						|
        if (protocol == NULL)
 | 
						|
        {
 | 
						|
            client_dcb->protocol = NULL;
 | 
						|
            dcb_close(client_dcb);
 | 
						|
            continue;
 | 
						|
        }
 | 
						|
 | 
						|
        client_dcb->protocol = (CDC_protocol *) protocol;
 | 
						|
 | 
						|
        /* Dummy session */
 | 
						|
        client_dcb->session = session_set_dummy(client_dcb);
 | 
						|
 | 
						|
        if (NULL == client_dcb->session || poll_add_dcb(client_dcb))
 | 
						|
        {
 | 
						|
            dcb_close(client_dcb);
 | 
						|
            continue;
 | 
						|
        }
 | 
						|
 | 
						|
        /* create the session data for CDC */
 | 
						|
        /* this coud be done in anothe routine, let's keep it here for now */
 | 
						|
        client_data = (CDC_session *) MXS_CALLOC(1, sizeof(CDC_session));
 | 
						|
        if (client_data == NULL)
 | 
						|
        {
 | 
						|
            dcb_close(client_dcb);
 | 
						|
            continue;
 | 
						|
        }
 | 
						|
 | 
						|
        client_dcb->data = client_data;
 | 
						|
 | 
						|
        /* client protocol state change to CDC_STATE_WAIT_FOR_AUTH */
 | 
						|
        protocol->state = CDC_STATE_WAIT_FOR_AUTH;
 | 
						|
 | 
						|
        MXS_NOTICE("%s: new connection from [%s]", client_dcb->service->name,
 | 
						|
                   client_dcb->remote != NULL ? client_dcb->remote : "");
 | 
						|
 | 
						|
        n_connect++;
 | 
						|
    }
 | 
						|
 | 
						|
    return n_connect;
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * The close handler for the descriptor. Called by the gateway to
 | 
						|
 * explicitly close a connection.
 | 
						|
 *
 | 
						|
 * @param dcb   The descriptor control block
 | 
						|
 */
 | 
						|
static int
 | 
						|
cdc_close(DCB *dcb)
 | 
						|
{
 | 
						|
    CDC_protocol *p = (CDC_protocol *) dcb->protocol;
 | 
						|
 | 
						|
    if (!p)
 | 
						|
    {
 | 
						|
        return 0;
 | 
						|
    }
 | 
						|
 | 
						|
    /* Add deallocate protocol items*/
 | 
						|
    cdc_protocol_done(dcb);
 | 
						|
 | 
						|
    return 1;
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * CDC protocol listener entry point
 | 
						|
 *
 | 
						|
 * @param   listener    The Listener DCB
 | 
						|
 * @param   config      Configuration (ip:port)
 | 
						|
 */
 | 
						|
static int
 | 
						|
cdc_listen(DCB *listener, char *config)
 | 
						|
{
 | 
						|
    return (dcb_listen(listener, config, "CDC") < 0) ? 0 : 1;
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * Allocate a new CDC protocol structure
 | 
						|
 *
 | 
						|
 * @param  dcb    The DCB where protocol is added
 | 
						|
 * @return        New allocated protocol or NULL on errors
 | 
						|
 *
 | 
						|
 */
 | 
						|
static CDC_protocol *
 | 
						|
cdc_protocol_init(DCB* dcb)
 | 
						|
{
 | 
						|
    CDC_protocol* p;
 | 
						|
 | 
						|
    p = (CDC_protocol *) MXS_CALLOC(1, sizeof(CDC_protocol));
 | 
						|
 | 
						|
    if (p == NULL)
 | 
						|
    {
 | 
						|
        return NULL;
 | 
						|
    }
 | 
						|
 | 
						|
    p->state = CDC_ALLOC;
 | 
						|
 | 
						|
    spinlock_init(&p->lock);
 | 
						|
 | 
						|
    /* memory allocation here */
 | 
						|
    p->state = CDC_STATE_WAIT_FOR_AUTH;
 | 
						|
 | 
						|
#ifdef SS_DEBUG
 | 
						|
    p->protocol_chk_tail = p->protocol_chk_top = CHK_NUM_PROTOCOL;
 | 
						|
#endif
 | 
						|
 | 
						|
    CHK_PROTOCOL(p);
 | 
						|
 | 
						|
    return p;
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * Free resources in CDC protocol
 | 
						|
 *
 | 
						|
 * @param dcb    DCB with allocateid protocol
 | 
						|
 *
 | 
						|
 */
 | 
						|
static void
 | 
						|
cdc_protocol_done(DCB* dcb)
 | 
						|
{
 | 
						|
    CDC_protocol* p = (CDC_protocol *) dcb->protocol;
 | 
						|
 | 
						|
    if (!p)
 | 
						|
    {
 | 
						|
        return;
 | 
						|
    }
 | 
						|
 | 
						|
    p = (CDC_protocol *) dcb->protocol;
 | 
						|
 | 
						|
    spinlock_acquire(&p->lock);
 | 
						|
 | 
						|
    /* deallocate memory here */
 | 
						|
 | 
						|
    p->state = CDC_STATE_CLOSE;
 | 
						|
 | 
						|
    spinlock_release(&p->lock);
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * Writes Authentication ACK, success
 | 
						|
 *
 | 
						|
 * @param dcb    Current client DCB
 | 
						|
 *
 | 
						|
 */
 | 
						|
static void
 | 
						|
write_auth_ack(DCB *dcb)
 | 
						|
{
 | 
						|
    dcb_printf(dcb, "OK\n");
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * Writes Authentication ERROR
 | 
						|
 *
 | 
						|
 * @param dcb    Current client DCB
 | 
						|
 *
 | 
						|
 */
 | 
						|
static void
 | 
						|
write_auth_err(DCB *dcb)
 | 
						|
{
 | 
						|
    dcb_printf(dcb, "ERROR: Authentication failed\n");
 | 
						|
}
 | 
						|
 |