219 lines
		
	
	
		
			6.8 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
			
		
		
	
	
			219 lines
		
	
	
		
			6.8 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
| /*
 | |
|  * Copyright (c) 2016 MariaDB Corporation Ab
 | |
|  *
 | |
|  * Use of this software is governed by the Business Source License included
 | |
|  * in the LICENSE.TXT file and at www.mariadb.com/bsl.
 | |
|  *
 | |
|  * Change Date: 2019-07-01
 | |
|  *
 | |
|  * On the date above, in accordance with the Business Source License, use
 | |
|  * of this software will be governed by version 2 or later of the General
 | |
|  * Public License.
 | |
|  */
 | |
| 
 | |
| /**
 | |
|  * @file queuemanager.c  -  Logic for FIFO queue handling
 | |
|  *
 | |
|  * MaxScale contains a number of FIFO queues. This code attempts to provide
 | |
|  * standard functions for handling them.
 | |
|  *
 | |
|  * @verbatim
 | |
|  * Revision History
 | |
|  *
 | |
|  * Date         Who                     Description
 | |
|  * 27/04/16     Martin Brampton         Initial implementation
 | |
|  *
 | |
|  * @endverbatim
 | |
|  */
 | |
| #include <stdlib.h>
 | |
| #include <stdbool.h>
 | |
| #include <stdio.h>
 | |
| #include <maxscale/queuemanager.h>
 | |
| #include <maxscale/alloc.h>
 | |
| #include <maxscale/spinlock.h>
 | |
| #include <maxscale/log_manager.h>
 | |
| #include <maxscale/hk_heartbeat.h>
 | |
| #include <maxscale/debug.h>
 | |
| 
 | |
| #if defined(SS_DEBUG)
 | |
| int debug_check_fail = 0;
 | |
| #endif /* SS_DEBUG */
 | |
| 
 | |
| /**
 | |
|  * @brief Allocate a new queue
 | |
|  *
 | |
|  * Provides for FIFO queues, this is the first operation to be requested
 | |
|  * for the use of a queue.
 | |
|  *
 | |
|  * @param limit         The maximum size of the queue
 | |
|  * @param timeout       The maximum time for which an entry is valid
 | |
|  * @return QUEUE_CONFIG A queue configuration and anchor structure
 | |
|  */
 | |
| QUEUE_CONFIG
 | |
| *mxs_queue_alloc(int limit, int timeout)
 | |
| {
 | |
|     QUEUE_CONFIG *new_queue = (QUEUE_CONFIG *)MXS_CALLOC(1, sizeof(QUEUE_CONFIG));
 | |
|     if (new_queue)
 | |
|     {
 | |
|         new_queue->queue_array = MXS_CALLOC(limit+1, sizeof(QUEUE_ENTRY));
 | |
|         if (new_queue->queue_array)
 | |
|         {
 | |
|             new_queue->queue_limit = limit;
 | |
|             new_queue->timeout = timeout;
 | |
|             spinlock_init(&new_queue->queue_lock);
 | |
| #if defined(SS_DEBUG)
 | |
|             new_queue->sequence_number = 0;
 | |
| #endif /* SS_DEBUG */
 | |
|             return new_queue;
 | |
|         }
 | |
|         MXS_FREE(new_queue);
 | |
|     }
 | |
|     return NULL;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Free a queue configuration
 | |
|  *
 | |
|  * Provides for FIFO queues, this is the last operation to be requested, when
 | |
|  * there is no further use for the queue.
 | |
|  *
 | |
|  * @param QUEUE_CONFIG A queue configuration and anchor structure
 | |
|  */
 | |
| void mxs_queue_free(QUEUE_CONFIG *queue_config)
 | |
| {
 | |
|     if (queue_config)
 | |
|     {
 | |
|         MXS_FREE(queue_config->queue_array);
 | |
|         MXS_FREE(queue_config);
 | |
|     }
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Add an item to a queue
 | |
|  *
 | |
|  * Add a new item to a FIFO queue. If the queue config is null, this function
 | |
|  * will behave as if the queue is full.
 | |
|  *
 | |
|  * @param queue_config  The configuration and anchor structure for the queue
 | |
|  * @param new_entry     The new entry, to be added
 | |
|  * @return bool         Whether the enqueue succeeded
 | |
|  */
 | |
| bool mxs_enqueue(QUEUE_CONFIG *queue_config, void *new_entry)
 | |
| {
 | |
|     bool result = false;
 | |
| 
 | |
|     if (queue_config)
 | |
|     {
 | |
|         spinlock_acquire(&queue_config->queue_lock);
 | |
|         if (mxs_queue_count(queue_config) < queue_config->queue_limit)
 | |
|         {
 | |
|             queue_config->queue_array[queue_config->end].queued_object = new_entry;
 | |
|             queue_config->queue_array[queue_config->end].heartbeat = hkheartbeat;
 | |
| #if defined(SS_DEBUG)
 | |
|             queue_config->queue_array[queue_config->end].sequence_check = queue_config->sequence_number;
 | |
|             queue_config->sequence_number++;
 | |
| #endif /* SS_DEBUG */
 | |
|             queue_config->end++;
 | |
|             if (queue_config->end > queue_config->queue_limit)
 | |
|             {
 | |
|                 queue_config->end = 0;
 | |
|             }
 | |
|             queue_config->has_entries = true;
 | |
|             result = true;
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             result = false;
 | |
|         }
 | |
|         spinlock_release(&queue_config->queue_lock);
 | |
|     }
 | |
|     return result;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Remove an item from a queue
 | |
|  *
 | |
|  * Remove an item from a FIFO queue. If the queue config is NULL, the function
 | |
|  * will behave as if for an empty queue.
 | |
|  *
 | |
|  * @param queue_config  The configuration and anchor structure for the queue
 | |
|  * @param result        A queue entry structure that will receive the result
 | |
|  * @return bool indicating whether an item was successfully dequeued
 | |
|  */
 | |
| bool mxs_dequeue(QUEUE_CONFIG *queue_config, QUEUE_ENTRY *result)
 | |
| {
 | |
|     QUEUE_ENTRY *found = NULL;
 | |
| 
 | |
|     if (queue_config && queue_config->has_entries)
 | |
|     {
 | |
|         spinlock_acquire(&queue_config->queue_lock);
 | |
|         if (mxs_queue_count(queue_config) > 0)
 | |
|         {
 | |
|             found = &(queue_config->queue_array[queue_config->start]);
 | |
| #if defined(SS_DEBUG)
 | |
|             ss_dassert((queue_config->sequence_number) == (found->sequence_check + mxs_queue_count(queue_config)));
 | |
|             if ((queue_config->sequence_number) != (found->sequence_check + mxs_queue_count(queue_config)))
 | |
|             {
 | |
|                 debug_check_fail++;
 | |
|             }
 | |
| #endif /* SS_DEBUG */
 | |
|             result->heartbeat = found->heartbeat;
 | |
|             result->queued_object = found->queued_object;
 | |
|             if (++queue_config->start > queue_config->queue_limit)
 | |
|             {
 | |
|                 queue_config->start = 0;
 | |
|             }
 | |
|             queue_config->has_entries = (mxs_queue_count(queue_config) > 0);
 | |
|         }
 | |
|         spinlock_release(&queue_config->queue_lock);
 | |
|     }
 | |
|     return (found != NULL);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Remove an item from a queue if it has passed the timeout limit
 | |
|  *
 | |
|  * Remove an item from a FIFO queue if expired. If the queue config is NULL,
 | |
|  * the function will behave as for an empty queue.
 | |
|  *
 | |
|  * @param queue_config  The configuration and anchor structure for the queue
 | |
|  * @param result        A queue entry structure that will receive the result
 | |
|  * @return bool indicating whether an item was successfully dequeued
 | |
|  */
 | |
| bool mxs_dequeue_if_expired(QUEUE_CONFIG *queue_config, QUEUE_ENTRY *result)
 | |
| {
 | |
|     QUEUE_ENTRY *found = NULL;
 | |
| 
 | |
|     if (queue_config && queue_config->has_entries)
 | |
|     {
 | |
|         spinlock_acquire(&queue_config->queue_lock);
 | |
|         if (mxs_queue_count(queue_config) > 0)
 | |
|         {
 | |
|             found = &(queue_config->queue_array[queue_config->start]);
 | |
|             if (found->heartbeat > hkheartbeat - queue_config->timeout)
 | |
|             {
 | |
|                 found = NULL;
 | |
|             }
 | |
|             else
 | |
|             {
 | |
| #if defined(SS_DEBUG)
 | |
|                 ss_dassert((queue_config->sequence_number) == (found->sequence_check + mxs_queue_count(queue_config)));
 | |
|                 if ((queue_config->sequence_number) != (found->sequence_check + mxs_queue_count(queue_config)))
 | |
|                 {
 | |
|                     debug_check_fail++;
 | |
|                 }
 | |
| #endif /* SS_DEBUG */
 | |
|                 result->heartbeat = found->heartbeat;
 | |
|                 result->queued_object = found->queued_object;
 | |
|                 if (++queue_config->start > queue_config->queue_limit)
 | |
|                 {
 | |
|                     queue_config->start = 0;
 | |
|                 }
 | |
|                 queue_config->has_entries = (mxs_queue_count(queue_config) > 0);
 | |
|             }
 | |
|         }
 | |
|         spinlock_release(&queue_config->queue_lock);
 | |
|     }
 | |
|     return (found != NULL);
 | |
| }
 | 
