Remoeve queuemanager.[h|cc]
This commit is contained in:
@ -1,26 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
/*
|
|
||||||
* Copyright (c) 2016 MariaDB Corporation Ab
|
|
||||||
*
|
|
||||||
* Use of this software is governed by the Business Source License included
|
|
||||||
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
|
||||||
*
|
|
||||||
* Change Date: 2020-01-01
|
|
||||||
*
|
|
||||||
* On the date above, in accordance with the Business Source License, use
|
|
||||||
* of this software will be governed by version 2 or later of the General
|
|
||||||
* Public License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @file queuemanager.h The Queue Manager public header
|
|
||||||
*/
|
|
||||||
|
|
||||||
#include <maxscale/cdefs.h>
|
|
||||||
|
|
||||||
MXS_BEGIN_DECLS
|
|
||||||
|
|
||||||
struct queue_config;
|
|
||||||
typedef struct queue_config QUEUE_CONFIG;
|
|
||||||
|
|
||||||
MXS_END_DECLS
|
|
@ -33,7 +33,6 @@ add_library(maxscale-common SHARED
|
|||||||
paths.cc
|
paths.cc
|
||||||
poll.cc
|
poll.cc
|
||||||
query_classifier.cc
|
query_classifier.cc
|
||||||
queuemanager.cc
|
|
||||||
random_jkiss.cc
|
random_jkiss.cc
|
||||||
resultset.cc
|
resultset.cc
|
||||||
resource.cc
|
resource.cc
|
||||||
|
@ -1,54 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
/*
|
|
||||||
* Copyright (c) 2016 MariaDB Corporation Ab
|
|
||||||
*
|
|
||||||
* Use of this software is governed by the Business Source License included
|
|
||||||
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
|
||||||
*
|
|
||||||
* Change Date: 2020-01-01
|
|
||||||
*
|
|
||||||
* On the date above, in accordance with the Business Source License, use
|
|
||||||
* of this software will be governed by version 2 or later of the General
|
|
||||||
* Public License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @file core/maxscale/queuemanager.h - The private queuemanager interface
|
|
||||||
*/
|
|
||||||
|
|
||||||
#include <maxscale/queuemanager.h>
|
|
||||||
|
|
||||||
#include <maxscale/spinlock.h>
|
|
||||||
|
|
||||||
MXS_BEGIN_DECLS
|
|
||||||
|
|
||||||
typedef struct queue_entry
|
|
||||||
{
|
|
||||||
void *queued_object;
|
|
||||||
long heartbeat;
|
|
||||||
#if defined(SS_DEBUG)
|
|
||||||
long sequence_check;
|
|
||||||
#endif /* SS_DEBUG */
|
|
||||||
} QUEUE_ENTRY;
|
|
||||||
|
|
||||||
struct queue_config
|
|
||||||
{
|
|
||||||
int queue_limit;
|
|
||||||
int start;
|
|
||||||
int end;
|
|
||||||
int timeout;
|
|
||||||
bool has_entries;
|
|
||||||
SPINLOCK queue_lock;
|
|
||||||
QUEUE_ENTRY *queue_array;
|
|
||||||
#if defined(SS_DEBUG)
|
|
||||||
long sequence_number;
|
|
||||||
#endif /* SS_DEBUG */
|
|
||||||
};
|
|
||||||
|
|
||||||
QUEUE_CONFIG *mxs_queue_alloc(int limit, int timeout);
|
|
||||||
void mxs_queue_free(QUEUE_CONFIG *queue_config);
|
|
||||||
bool mxs_enqueue(QUEUE_CONFIG *queue_config, void *new_entry);
|
|
||||||
bool mxs_dequeue(QUEUE_CONFIG *queue_config, QUEUE_ENTRY *result);
|
|
||||||
bool mxs_dequeue_if_expired(QUEUE_CONFIG *queue_config, QUEUE_ENTRY *result);
|
|
||||||
|
|
||||||
MXS_END_DECLS
|
|
@ -1,229 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright (c) 2016 MariaDB Corporation Ab
|
|
||||||
*
|
|
||||||
* Use of this software is governed by the Business Source License included
|
|
||||||
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
|
||||||
*
|
|
||||||
* Change Date: 2020-01-01
|
|
||||||
*
|
|
||||||
* On the date above, in accordance with the Business Source License, use
|
|
||||||
* of this software will be governed by version 2 or later of the General
|
|
||||||
* Public License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @file queuemanager.c - Logic for FIFO queue handling
|
|
||||||
*
|
|
||||||
* MaxScale contains a number of FIFO queues. This code attempts to provide
|
|
||||||
* standard functions for handling them.
|
|
||||||
*
|
|
||||||
* @verbatim
|
|
||||||
* Revision History
|
|
||||||
*
|
|
||||||
* Date Who Description
|
|
||||||
* 27/04/16 Martin Brampton Initial implementation
|
|
||||||
*
|
|
||||||
* @endverbatim
|
|
||||||
*/
|
|
||||||
#include <maxscale/queuemanager.h>
|
|
||||||
|
|
||||||
#include <stdlib.h>
|
|
||||||
#include <stdio.h>
|
|
||||||
|
|
||||||
#include <maxscale/alloc.h>
|
|
||||||
#include <maxscale/debug.h>
|
|
||||||
#include <maxscale/hk_heartbeat.h>
|
|
||||||
#include <maxscale/log_manager.h>
|
|
||||||
#include <maxscale/spinlock.h>
|
|
||||||
|
|
||||||
#include "maxscale/queuemanager.h"
|
|
||||||
|
|
||||||
#if defined(SS_DEBUG)
|
|
||||||
int debug_check_fail = 0;
|
|
||||||
#endif /* SS_DEBUG */
|
|
||||||
|
|
||||||
static inline int mxs_queue_count(QUEUE_CONFIG*);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Allocate a new queue
|
|
||||||
*
|
|
||||||
* Provides for FIFO queues, this is the first operation to be requested
|
|
||||||
* for the use of a queue.
|
|
||||||
*
|
|
||||||
* @param limit The maximum size of the queue
|
|
||||||
* @param timeout The maximum time for which an entry is valid
|
|
||||||
* @return QUEUE_CONFIG A queue configuration and anchor structure
|
|
||||||
*/
|
|
||||||
QUEUE_CONFIG
|
|
||||||
*mxs_queue_alloc(int limit, int timeout)
|
|
||||||
{
|
|
||||||
QUEUE_CONFIG *new_queue = (QUEUE_CONFIG *)MXS_CALLOC(1, sizeof(QUEUE_CONFIG));
|
|
||||||
if (new_queue)
|
|
||||||
{
|
|
||||||
new_queue->queue_array = (QUEUE_ENTRY*)MXS_CALLOC(limit + 1, sizeof(QUEUE_ENTRY));
|
|
||||||
if (new_queue->queue_array)
|
|
||||||
{
|
|
||||||
new_queue->queue_limit = limit;
|
|
||||||
new_queue->timeout = timeout;
|
|
||||||
spinlock_init(&new_queue->queue_lock);
|
|
||||||
#if defined(SS_DEBUG)
|
|
||||||
new_queue->sequence_number = 0;
|
|
||||||
#endif /* SS_DEBUG */
|
|
||||||
return new_queue;
|
|
||||||
}
|
|
||||||
MXS_FREE(new_queue);
|
|
||||||
}
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Free a queue configuration
|
|
||||||
*
|
|
||||||
* Provides for FIFO queues, this is the last operation to be requested, when
|
|
||||||
* there is no further use for the queue.
|
|
||||||
*
|
|
||||||
* @param QUEUE_CONFIG A queue configuration and anchor structure
|
|
||||||
*/
|
|
||||||
void mxs_queue_free(QUEUE_CONFIG *queue_config)
|
|
||||||
{
|
|
||||||
if (queue_config)
|
|
||||||
{
|
|
||||||
MXS_FREE(queue_config->queue_array);
|
|
||||||
MXS_FREE(queue_config);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Add an item to a queue
|
|
||||||
*
|
|
||||||
* Add a new item to a FIFO queue. If the queue config is null, this function
|
|
||||||
* will behave as if the queue is full.
|
|
||||||
*
|
|
||||||
* @param queue_config The configuration and anchor structure for the queue
|
|
||||||
* @param new_entry The new entry, to be added
|
|
||||||
* @return bool Whether the enqueue succeeded
|
|
||||||
*/
|
|
||||||
bool mxs_enqueue(QUEUE_CONFIG *queue_config, void *new_entry)
|
|
||||||
{
|
|
||||||
bool result = false;
|
|
||||||
|
|
||||||
if (queue_config)
|
|
||||||
{
|
|
||||||
spinlock_acquire(&queue_config->queue_lock);
|
|
||||||
if (mxs_queue_count(queue_config) < queue_config->queue_limit)
|
|
||||||
{
|
|
||||||
queue_config->queue_array[queue_config->end].queued_object = new_entry;
|
|
||||||
queue_config->queue_array[queue_config->end].heartbeat = hkheartbeat;
|
|
||||||
#if defined(SS_DEBUG)
|
|
||||||
queue_config->queue_array[queue_config->end].sequence_check = queue_config->sequence_number;
|
|
||||||
queue_config->sequence_number++;
|
|
||||||
#endif /* SS_DEBUG */
|
|
||||||
queue_config->end++;
|
|
||||||
if (queue_config->end > queue_config->queue_limit)
|
|
||||||
{
|
|
||||||
queue_config->end = 0;
|
|
||||||
}
|
|
||||||
queue_config->has_entries = true;
|
|
||||||
result = true;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
result = false;
|
|
||||||
}
|
|
||||||
spinlock_release(&queue_config->queue_lock);
|
|
||||||
}
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Remove an item from a queue
|
|
||||||
*
|
|
||||||
* Remove an item from a FIFO queue. If the queue config is NULL, the function
|
|
||||||
* will behave as if for an empty queue.
|
|
||||||
*
|
|
||||||
* @param queue_config The configuration and anchor structure for the queue
|
|
||||||
* @param result A queue entry structure that will receive the result
|
|
||||||
* @return bool indicating whether an item was successfully dequeued
|
|
||||||
*/
|
|
||||||
bool mxs_dequeue(QUEUE_CONFIG *queue_config, QUEUE_ENTRY *result)
|
|
||||||
{
|
|
||||||
QUEUE_ENTRY *found = NULL;
|
|
||||||
|
|
||||||
if (queue_config && queue_config->has_entries)
|
|
||||||
{
|
|
||||||
spinlock_acquire(&queue_config->queue_lock);
|
|
||||||
if (mxs_queue_count(queue_config) > 0)
|
|
||||||
{
|
|
||||||
found = &(queue_config->queue_array[queue_config->start]);
|
|
||||||
#if defined(SS_DEBUG)
|
|
||||||
ss_dassert((queue_config->sequence_number) == (found->sequence_check + mxs_queue_count(queue_config)));
|
|
||||||
if ((queue_config->sequence_number) != (found->sequence_check + mxs_queue_count(queue_config)))
|
|
||||||
{
|
|
||||||
debug_check_fail++;
|
|
||||||
}
|
|
||||||
#endif /* SS_DEBUG */
|
|
||||||
result->heartbeat = found->heartbeat;
|
|
||||||
result->queued_object = found->queued_object;
|
|
||||||
if (++queue_config->start > queue_config->queue_limit)
|
|
||||||
{
|
|
||||||
queue_config->start = 0;
|
|
||||||
}
|
|
||||||
queue_config->has_entries = (mxs_queue_count(queue_config) > 0);
|
|
||||||
}
|
|
||||||
spinlock_release(&queue_config->queue_lock);
|
|
||||||
}
|
|
||||||
return (found != NULL);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Remove an item from a queue if it has passed the timeout limit
|
|
||||||
*
|
|
||||||
* Remove an item from a FIFO queue if expired. If the queue config is NULL,
|
|
||||||
* the function will behave as for an empty queue.
|
|
||||||
*
|
|
||||||
* @param queue_config The configuration and anchor structure for the queue
|
|
||||||
* @param result A queue entry structure that will receive the result
|
|
||||||
* @return bool indicating whether an item was successfully dequeued
|
|
||||||
*/
|
|
||||||
bool mxs_dequeue_if_expired(QUEUE_CONFIG *queue_config, QUEUE_ENTRY *result)
|
|
||||||
{
|
|
||||||
QUEUE_ENTRY *found = NULL;
|
|
||||||
|
|
||||||
if (queue_config && queue_config->has_entries)
|
|
||||||
{
|
|
||||||
spinlock_acquire(&queue_config->queue_lock);
|
|
||||||
if (mxs_queue_count(queue_config) > 0)
|
|
||||||
{
|
|
||||||
found = &(queue_config->queue_array[queue_config->start]);
|
|
||||||
if (found->heartbeat > hkheartbeat - queue_config->timeout)
|
|
||||||
{
|
|
||||||
found = NULL;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
#if defined(SS_DEBUG)
|
|
||||||
ss_dassert((queue_config->sequence_number) == (found->sequence_check + mxs_queue_count(queue_config)));
|
|
||||||
if ((queue_config->sequence_number) != (found->sequence_check + mxs_queue_count(queue_config)))
|
|
||||||
{
|
|
||||||
debug_check_fail++;
|
|
||||||
}
|
|
||||||
#endif /* SS_DEBUG */
|
|
||||||
result->heartbeat = found->heartbeat;
|
|
||||||
result->queued_object = found->queued_object;
|
|
||||||
if (++queue_config->start > queue_config->queue_limit)
|
|
||||||
{
|
|
||||||
queue_config->start = 0;
|
|
||||||
}
|
|
||||||
queue_config->has_entries = (mxs_queue_count(queue_config) > 0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
spinlock_release(&queue_config->queue_lock);
|
|
||||||
}
|
|
||||||
return (found != NULL);
|
|
||||||
}
|
|
||||||
|
|
||||||
static inline int mxs_queue_count(QUEUE_CONFIG *queue_config)
|
|
||||||
{
|
|
||||||
int count = queue_config->end - queue_config->start;
|
|
||||||
return count < 0 ? (count + queue_config->queue_limit + 1) : count;
|
|
||||||
}
|
|
@ -10,7 +10,6 @@ add_executable(test_logorder testlogorder.cc)
|
|||||||
add_executable(test_logthrottling testlogthrottling.cc)
|
add_executable(test_logthrottling testlogthrottling.cc)
|
||||||
add_executable(test_modutil testmodutil.cc)
|
add_executable(test_modutil testmodutil.cc)
|
||||||
add_executable(test_poll testpoll.cc)
|
add_executable(test_poll testpoll.cc)
|
||||||
add_executable(test_queuemanager testqueuemanager.cc)
|
|
||||||
add_executable(test_semaphore testsemaphore.cc)
|
add_executable(test_semaphore testsemaphore.cc)
|
||||||
add_executable(test_server testserver.cc)
|
add_executable(test_server testserver.cc)
|
||||||
add_executable(test_service testservice.cc)
|
add_executable(test_service testservice.cc)
|
||||||
@ -37,7 +36,6 @@ target_link_libraries(test_logorder maxscale-common)
|
|||||||
target_link_libraries(test_logthrottling maxscale-common)
|
target_link_libraries(test_logthrottling maxscale-common)
|
||||||
target_link_libraries(test_modutil maxscale-common)
|
target_link_libraries(test_modutil maxscale-common)
|
||||||
target_link_libraries(test_poll maxscale-common)
|
target_link_libraries(test_poll maxscale-common)
|
||||||
target_link_libraries(test_queuemanager maxscale-common)
|
|
||||||
target_link_libraries(test_semaphore maxscale-common)
|
target_link_libraries(test_semaphore maxscale-common)
|
||||||
target_link_libraries(test_server maxscale-common)
|
target_link_libraries(test_server maxscale-common)
|
||||||
target_link_libraries(test_service maxscale-common)
|
target_link_libraries(test_service maxscale-common)
|
||||||
@ -66,7 +64,6 @@ add_test(TestMaxScalePCRE2 testmaxscalepcre2)
|
|||||||
add_test(TestModutil test_modutil)
|
add_test(TestModutil test_modutil)
|
||||||
add_test(NAME TestMaxPasswd COMMAND ${CMAKE_CURRENT_SOURCE_DIR}/testmaxpasswd.sh)
|
add_test(NAME TestMaxPasswd COMMAND ${CMAKE_CURRENT_SOURCE_DIR}/testmaxpasswd.sh)
|
||||||
add_test(TestPoll test_poll)
|
add_test(TestPoll test_poll)
|
||||||
add_test(TestQueueManager test_queuemanager)
|
|
||||||
add_test(TestSemaphore test_semaphore)
|
add_test(TestSemaphore test_semaphore)
|
||||||
add_test(TestServer test_server)
|
add_test(TestServer test_server)
|
||||||
add_test(TestService test_service)
|
add_test(TestService test_service)
|
||||||
|
@ -1,259 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright (c) 2016 MariaDB Corporation Ab
|
|
||||||
*
|
|
||||||
* Use of this software is governed by the Business Source License included
|
|
||||||
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
|
||||||
*
|
|
||||||
* Change Date: 2020-01-01
|
|
||||||
*
|
|
||||||
* On the date above, in accordance with the Business Source License, use
|
|
||||||
* of this software will be governed by version 2 or later of the General
|
|
||||||
* Public License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
/**
|
|
||||||
*
|
|
||||||
* @verbatim
|
|
||||||
* Revision History
|
|
||||||
*
|
|
||||||
* Date Who Description
|
|
||||||
* 21/06/2016 Martin Brampton Initial implementation
|
|
||||||
*
|
|
||||||
* @endverbatim
|
|
||||||
*/
|
|
||||||
|
|
||||||
// To ensure that ss_info_assert asserts also when builing in non-debug mode.
|
|
||||||
#if !defined(SS_DEBUG)
|
|
||||||
#define SS_DEBUG
|
|
||||||
int debug_check_fail = 1;
|
|
||||||
#else
|
|
||||||
// This is defined in the queuemanager code but only in debug builds
|
|
||||||
extern int debug_check_fail;
|
|
||||||
#endif
|
|
||||||
#if defined(NDEBUG)
|
|
||||||
#undef NDEBUG
|
|
||||||
#endif
|
|
||||||
#include <stdio.h>
|
|
||||||
#include <stdlib.h>
|
|
||||||
#include <string.h>
|
|
||||||
#include <errno.h>
|
|
||||||
#include <maxscale/random_jkiss.h>
|
|
||||||
#include <maxscale/hk_heartbeat.h>
|
|
||||||
#include <maxscale/alloc.h>
|
|
||||||
|
|
||||||
#include "../maxscale/queuemanager.h"
|
|
||||||
#include "test_utils.h"
|
|
||||||
|
|
||||||
/**
|
|
||||||
* test1 Allocate a queue and do lots of other things
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
|
|
||||||
#define TEST_QUEUE_SIZE 5
|
|
||||||
#define HEARTBEATS_TO_EXPIRE 3
|
|
||||||
#define NUMBER_OF_THREADS 4
|
|
||||||
#define THREAD_TEST_COUNT 1000000
|
|
||||||
|
|
||||||
static QUEUE_CONFIG *thread_queue;
|
|
||||||
|
|
||||||
static int
|
|
||||||
test1()
|
|
||||||
{
|
|
||||||
QUEUE_CONFIG *queue;
|
|
||||||
int filled = 0;
|
|
||||||
int emptied = 0;
|
|
||||||
int expired = 0;
|
|
||||||
int input_counter = 0;
|
|
||||||
int output_counter = 0;
|
|
||||||
|
|
||||||
random_jkiss_init();
|
|
||||||
hkheartbeat = 0;
|
|
||||||
|
|
||||||
queue = mxs_queue_alloc(TEST_QUEUE_SIZE, HEARTBEATS_TO_EXPIRE);
|
|
||||||
|
|
||||||
{
|
|
||||||
QUEUE_ENTRY entry;
|
|
||||||
if (mxs_dequeue(queue, &entry))
|
|
||||||
{
|
|
||||||
ss_dfprintf(stderr, "\nError mxs_dequeue on empty queue did not return false.\n");
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (mxs_dequeue_if_expired(queue, &entry))
|
|
||||||
{
|
|
||||||
ss_dfprintf(stderr, "\nError mxs_dequeue_if_expired on empty queue did not return false.\n");
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
while (filled < 250 || emptied < 250 || expired < 250)
|
|
||||||
{
|
|
||||||
ss_dfprintf(stderr, "Input counter %d and output counter %d\n", input_counter, output_counter);
|
|
||||||
ss_dfprintf(stderr, "Difference between counters %d\n", input_counter - output_counter);
|
|
||||||
ss_dfprintf(stderr, "Filled %d, emptied %d, expired %d\n", filled, emptied, expired);
|
|
||||||
if (random_jkiss() % 2)
|
|
||||||
{
|
|
||||||
int *entrynumber = (int*)MXS_MALLOC(sizeof(int));
|
|
||||||
*entrynumber = input_counter;
|
|
||||||
if (mxs_enqueue(queue, entrynumber))
|
|
||||||
{
|
|
||||||
input_counter++;
|
|
||||||
if ((input_counter - output_counter) > TEST_QUEUE_SIZE)
|
|
||||||
{
|
|
||||||
ss_dfprintf(stderr, "\nQueue full, but mxs_enqueue accepted entry.\n");
|
|
||||||
return 3;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
QUEUE_ENTRY entry;
|
|
||||||
|
|
||||||
if ((input_counter - output_counter) != TEST_QUEUE_SIZE)
|
|
||||||
{
|
|
||||||
ss_dfprintf(stderr, "\nFailed enqueue, but input counter %d and output counter %d do not differ by %d.\n",
|
|
||||||
input_counter,
|
|
||||||
output_counter,
|
|
||||||
TEST_QUEUE_SIZE);
|
|
||||||
return 4;
|
|
||||||
}
|
|
||||||
filled++;
|
|
||||||
if (0 == (random_jkiss() % 5))
|
|
||||||
{
|
|
||||||
if ((mxs_dequeue_if_expired(queue, &entry)))
|
|
||||||
{
|
|
||||||
if ((entry.heartbeat) > (hkheartbeat - HEARTBEATS_TO_EXPIRE))
|
|
||||||
{
|
|
||||||
ss_dfprintf(stderr, "\nReturned an expired entry even though none or not expired.\n");
|
|
||||||
return 5;
|
|
||||||
}
|
|
||||||
if (*(int *)entry.queued_object != output_counter)
|
|
||||||
{
|
|
||||||
ss_dfprintf(stderr, "\nOutput counter was %d, but dequeue gave %d.\n",
|
|
||||||
output_counter,
|
|
||||||
*(int *)entry.queued_object);
|
|
||||||
return 10;
|
|
||||||
}
|
|
||||||
output_counter++;
|
|
||||||
MXS_FREE(entry.queued_object);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
hkheartbeat += (HEARTBEATS_TO_EXPIRE + 1);
|
|
||||||
if (mxs_dequeue_if_expired(queue, &entry))
|
|
||||||
{
|
|
||||||
if (*(int *)entry.queued_object != output_counter)
|
|
||||||
{
|
|
||||||
ss_dfprintf(stderr, "\nOutput counter was %d, but dequeue gave %d.\n",
|
|
||||||
output_counter,
|
|
||||||
*(int *)entry.queued_object);
|
|
||||||
return 6;
|
|
||||||
}
|
|
||||||
output_counter++;
|
|
||||||
MXS_FREE(entry.queued_object);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
ss_dfprintf(stderr, "\nReturned no expired entry even though all are expired.\n");
|
|
||||||
return 7;
|
|
||||||
}
|
|
||||||
expired++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
QUEUE_ENTRY entry;
|
|
||||||
if (mxs_dequeue(queue, &entry))
|
|
||||||
{
|
|
||||||
if (*(int *)entry.queued_object != output_counter)
|
|
||||||
{
|
|
||||||
ss_dfprintf(stderr, "\nOutput counter was %d, but dequeue gave %d.\n",
|
|
||||||
output_counter,
|
|
||||||
*(int *)entry.queued_object);
|
|
||||||
return 8;
|
|
||||||
}
|
|
||||||
output_counter++;
|
|
||||||
MXS_FREE(entry.queued_object);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
if (input_counter != output_counter)
|
|
||||||
{
|
|
||||||
ss_dfprintf(stderr, "\nNULL from dequeue, but input counter %d and output counter %d.\n",
|
|
||||||
input_counter,
|
|
||||||
output_counter);
|
|
||||||
return 9;
|
|
||||||
}
|
|
||||||
emptied++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ss_dfprintf(stderr, "Successfully ended test\n");
|
|
||||||
mxs_queue_free(queue);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void *
|
|
||||||
thread_test(void *arg)
|
|
||||||
{
|
|
||||||
int i;
|
|
||||||
QUEUE_ENTRY entry;
|
|
||||||
int emptied = 0;
|
|
||||||
int filled = 0;
|
|
||||||
|
|
||||||
for (i = 0; i < THREAD_TEST_COUNT; i++)
|
|
||||||
{
|
|
||||||
if (random_jkiss() % 2)
|
|
||||||
{
|
|
||||||
if (!mxs_enqueue(thread_queue, (void *)"Just for test"))
|
|
||||||
{
|
|
||||||
filled++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
if (!mxs_dequeue(thread_queue, &entry))
|
|
||||||
{
|
|
||||||
emptied++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ss_dfprintf(stderr, "Queue was full %d times, empty %d times\n", filled, emptied);
|
|
||||||
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int
|
|
||||||
test2()
|
|
||||||
{
|
|
||||||
pthread_t tid[NUMBER_OF_THREADS];
|
|
||||||
int err, i, limit;
|
|
||||||
|
|
||||||
thread_queue = mxs_queue_alloc(TEST_QUEUE_SIZE, HEARTBEATS_TO_EXPIRE);
|
|
||||||
limit = NUMBER_OF_THREADS;
|
|
||||||
for (i = 0; i < limit; i++)
|
|
||||||
{
|
|
||||||
err = pthread_create(&tid[i], NULL, thread_test, NULL);
|
|
||||||
ss_info_dassert((0 == err), "Must create threads successfully");
|
|
||||||
}
|
|
||||||
for (i = 0; i < limit; i++)
|
|
||||||
{
|
|
||||||
err = pthread_join(tid[i], NULL);
|
|
||||||
ss_info_dassert((0 == err), "Must join threads successfully");
|
|
||||||
ss_dfprintf(stderr, "\nThread %d ended with debug check fail at %d.\n", i, debug_check_fail);
|
|
||||||
}
|
|
||||||
mxs_queue_free(thread_queue);
|
|
||||||
return debug_check_fail ? 1 : 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int main(int argc, char **argv)
|
|
||||||
{
|
|
||||||
int result = 0;
|
|
||||||
|
|
||||||
result += (test1() ? 1 : 0);
|
|
||||||
result += (test2() ? 1 : 0);
|
|
||||||
|
|
||||||
exit(result);
|
|
||||||
}
|
|
Reference in New Issue
Block a user