diff --git a/include/maxscale/buffer.h b/include/maxscale/buffer.h index dad941222..c0aadc99d 100644 --- a/include/maxscale/buffer.h +++ b/include/maxscale/buffer.h @@ -124,6 +124,9 @@ typedef struct gwbuf BUF_PROPERTY* properties; /*< Buffer properties */ struct server* server; /*< The target server where the buffer is executed */ uint32_t gwbuf_type; /*< buffer's data type information */ +#ifdef SS_DEBUG + int owner; /*< Owner of the thread, only for debugging */ +#endif } GWBUF; /*< diff --git a/server/core/buffer.cc b/server/core/buffer.cc index c7860ee06..fda2b90c4 100644 --- a/server/core/buffer.cc +++ b/server/core/buffer.cc @@ -23,6 +23,9 @@ #include #include #include +#include + +using mxs::RoutingWorker; static void gwbuf_free_one(GWBUF* buf); static buffer_object_t* gwbuf_remove_buffer_object(GWBUF* buf, @@ -63,6 +66,9 @@ GWBUF* gwbuf_alloc(unsigned int size) sbuf->info = GWBUF_INFO_NONE; sbuf->bufobj = NULL; +#ifdef SS_DEBUG + rval->owner = RoutingWorker::get_current_id(); +#endif rval->start = &sbuf->data; rval->end = (void*)((char*)rval->start + size); rval->sbuf = sbuf; @@ -111,6 +117,7 @@ void gwbuf_free(GWBUF* buf) while (buf) { + mxb_assert(buf->owner == RoutingWorker::get_current_id()); nextbuf = buf->next; gwbuf_free_one(buf); buf = nextbuf; @@ -177,7 +184,11 @@ static GWBUF* gwbuf_clone_one(GWBUF* buf) return NULL; } + mxb_assert(buf->owner == RoutingWorker::get_current_id()); mxb::atomic::add(&buf->sbuf->refcount, 1); +#ifdef SS_DEBUG + rval->owner = RoutingWorker::get_current_id(); +#endif rval->server = buf->server; rval->sbuf = buf->sbuf; rval->start = buf->start; @@ -196,6 +207,7 @@ GWBUF* gwbuf_clone(GWBUF* buf) return NULL; } + mxb_assert(buf->owner == RoutingWorker::get_current_id()); GWBUF* rval = gwbuf_clone_one(buf); if (rval) @@ -222,6 +234,7 @@ GWBUF* gwbuf_clone(GWBUF* buf) GWBUF* gwbuf_deep_clone(const GWBUF* buf) { + mxb_assert(buf->owner == RoutingWorker::get_current_id()); GWBUF* rval = NULL; if (buf) @@ -249,6 +262,7 @@ static GWBUF* gwbuf_clone_portion(GWBUF* buf, { GWBUF* clonebuf; + mxb_assert(buf->owner == RoutingWorker::get_current_id()); mxb_assert(start_offset + length <= GWBUF_LENGTH(buf)); if ((clonebuf = (GWBUF*)MXS_MALLOC(sizeof(GWBUF))) == NULL) @@ -256,6 +270,9 @@ static GWBUF* gwbuf_clone_portion(GWBUF* buf, return NULL; } mxb::atomic::add(&buf->sbuf->refcount, 1); +#ifdef SS_DEBUG + clonebuf->owner = RoutingWorker::get_current_id(); +#endif clonebuf->server = buf->server; clonebuf->sbuf = buf->sbuf; clonebuf->gwbuf_type = buf->gwbuf_type; /*< clone info bits too */ @@ -279,6 +296,7 @@ GWBUF* gwbuf_split(GWBUF** buf, size_t length) GWBUF* buffer = *buf; GWBUF* orig_tail = buffer->tail; head = buffer; + mxb_assert(buffer->owner == RoutingWorker::get_current_id()); /** Handle complete buffers */ while (buffer && length && length >= GWBUF_LENGTH(buffer)) @@ -350,6 +368,7 @@ static inline bool gwbuf_get_byte(const GWBUF** buf, size_t* offset, uint8_t* b) // Ignore NULL buffer and walk past empty or too short buffers. while (*buf && (GWBUF_LENGTH(*buf) <= *offset)) { + mxb_assert((*buf)->owner == RoutingWorker::get_current_id()); *offset -= GWBUF_LENGTH(*buf); *buf = (*buf)->next; } @@ -358,6 +377,7 @@ static inline bool gwbuf_get_byte(const GWBUF** buf, size_t* offset, uint8_t* b) if (*buf) { + mxb_assert((*buf)->owner == RoutingWorker::get_current_id()); *b = *(GWBUF_DATA(*buf) + *offset); *offset += 1; @@ -387,6 +407,8 @@ int gwbuf_compare(const GWBUF* lhs, const GWBUF* rhs) } else { + mxb_assert(lhs->owner == RoutingWorker::get_current_id() + && rhs->owner == RoutingWorker::get_current_id()); mxb_assert(lhs && rhs); size_t llen = gwbuf_length(lhs); @@ -448,6 +470,8 @@ GWBUF* gwbuf_append(GWBUF* head, GWBUF* tail) { return head; } + mxb_assert(head->owner == RoutingWorker::get_current_id() + && tail->owner == RoutingWorker::get_current_id()); head->tail->next = tail; head->tail = tail->tail; @@ -458,6 +482,7 @@ GWBUF* gwbuf_consume(GWBUF* head, unsigned int length) { while (head && length > 0) { + mxb_assert(head->owner == RoutingWorker::get_current_id()); unsigned int buflen = GWBUF_LENGTH(head); GWBUF_CONSUME(head, length); @@ -483,11 +508,9 @@ unsigned int gwbuf_length(const GWBUF* head) { int rval = 0; - if (head) - { - } while (head) { + mxb_assert(head->owner == RoutingWorker::get_current_id()); rval += GWBUF_LENGTH(head); head = head->next; } @@ -499,6 +522,7 @@ int gwbuf_count(const GWBUF* head) int result = 0; while (head) { + mxb_assert(head->owner == RoutingWorker::get_current_id()); result++; head = head->next; } @@ -507,6 +531,7 @@ int gwbuf_count(const GWBUF* head) GWBUF* gwbuf_rtrim(GWBUF* head, unsigned int n_bytes) { + mxb_assert(head->owner == RoutingWorker::get_current_id()); GWBUF* rval = head; GWBUF_RTRIM(head, n_bytes); @@ -523,6 +548,7 @@ void gwbuf_set_type(GWBUF* buf, uint32_t type) /** Set type consistenly to all buffers on the list */ while (buf != NULL) { + mxb_assert(buf->owner == RoutingWorker::get_current_id()); buf->gwbuf_type |= type; buf = buf->next; } @@ -533,6 +559,7 @@ void gwbuf_add_buffer_object(GWBUF* buf, void* data, void (* donefun_fp)(void*)) { + mxb_assert(buf->owner == RoutingWorker::get_current_id()); buffer_object_t* newb = (buffer_object_t*)MXS_MALLOC(sizeof(buffer_object_t)); MXS_ABORT_IF_NULL(newb); @@ -554,6 +581,7 @@ void gwbuf_add_buffer_object(GWBUF* buf, void* gwbuf_get_buffer_object_data(GWBUF* buf, bufobj_id_t id) { + mxb_assert(buf->owner == RoutingWorker::get_current_id()); buffer_object_t* bo = buf->sbuf->bufobj; while (bo != NULL && bo->bo_id != id) @@ -569,6 +597,7 @@ void* gwbuf_get_buffer_object_data(GWBUF* buf, bufobj_id_t id) */ static buffer_object_t* gwbuf_remove_buffer_object(GWBUF* buf, buffer_object_t* bufobj) { + mxb_assert(buf->owner == RoutingWorker::get_current_id()); buffer_object_t* next; next = bufobj->bo_next; @@ -580,6 +609,7 @@ static buffer_object_t* gwbuf_remove_buffer_object(GWBUF* buf, buffer_object_t* bool gwbuf_add_property(GWBUF* buf, const char* name, const char* value) { + mxb_assert(buf->owner == RoutingWorker::get_current_id()); char* my_name = MXS_STRDUP(name); char* my_value = MXS_STRDUP(value); BUF_PROPERTY* prop = (BUF_PROPERTY*)MXS_MALLOC(sizeof(BUF_PROPERTY)); @@ -602,6 +632,7 @@ bool gwbuf_add_property(GWBUF* buf, const char* name, const char* value) char* gwbuf_get_property(GWBUF* buf, const char* name) { + mxb_assert(buf->owner == RoutingWorker::get_current_id()); BUF_PROPERTY* prop = buf->properties; while (prop && strcmp(prop->name, name) != 0) @@ -619,6 +650,9 @@ GWBUF* gwbuf_make_contiguous(GWBUF* orig) mxb_assert_message(!true, "gwbuf_make_contiguous: NULL buffer"); return NULL; } + + mxb_assert(orig->owner == RoutingWorker::get_current_id()); + if (orig->next == NULL) { return orig; @@ -649,6 +683,7 @@ size_t gwbuf_copy_data(const GWBUF* buffer, size_t offset, size_t bytes, uint8_t /** Skip unrelated buffers */ while (buffer && offset && offset >= (buflen = GWBUF_LENGTH(buffer))) { + mxb_assert(buffer->owner == RoutingWorker::get_current_id()); offset -= buflen; buffer = buffer->next; } @@ -657,6 +692,7 @@ size_t gwbuf_copy_data(const GWBUF* buffer, size_t offset, size_t bytes, uint8_t if (buffer) { + mxb_assert(buffer->owner == RoutingWorker::get_current_id()); uint8_t* ptr = (uint8_t*) GWBUF_DATA(buffer) + offset; uint32_t bytes_left = GWBUF_LENGTH(buffer) - offset; @@ -696,12 +732,14 @@ uint8_t* gwbuf_byte_pointer(GWBUF* buffer, size_t offset) // Ignore NULL buffer and walk past empty or too short buffers. while (buffer && (GWBUF_LENGTH(buffer) <= offset)) { + mxb_assert(buffer->owner == RoutingWorker::get_current_id()); offset -= GWBUF_LENGTH(buffer); buffer = buffer->next; } if (buffer != NULL) { + mxb_assert(buffer->owner == RoutingWorker::get_current_id()); rval = (GWBUF_DATA(buffer) + offset); } @@ -710,6 +748,7 @@ uint8_t* gwbuf_byte_pointer(GWBUF* buffer, size_t offset) static std::string dump_one_buffer(GWBUF* buffer) { + mxb_assert(buffer->owner == RoutingWorker::get_current_id()); std::string rval; int len = GWBUF_LENGTH(buffer); uint8_t* data = GWBUF_DATA(buffer); @@ -738,6 +777,7 @@ static std::string dump_one_buffer(GWBUF* buffer) void gwbuf_hexdump(GWBUF* buffer, int log_level) { + mxb_assert(buffer->owner == RoutingWorker::get_current_id()); std::stringstream ss; ss << "Buffer " << buffer << ":\n";