MXS-2106: Add null value support to cdc_connector
The cdc_connector had no explicit null value detection which is required now that the null value handling is fixed.
This commit is contained in:
@ -1,7 +1,7 @@
|
|||||||
# Shared version of the library
|
# Shared version of the library
|
||||||
add_library(cdc_connector SHARED cdc_connector.cpp)
|
add_library(cdc_connector SHARED cdc_connector.cpp)
|
||||||
target_link_libraries(cdc_connector ${JANSSON_LIBRARIES} crypto)
|
target_link_libraries(cdc_connector ${JANSSON_LIBRARIES} crypto)
|
||||||
set_target_properties(cdc_connector PROPERTIES VERSION "1.0.0")
|
set_target_properties(cdc_connector PROPERTIES VERSION "1.0.1")
|
||||||
add_dependencies(cdc_connector jansson)
|
add_dependencies(cdc_connector jansson)
|
||||||
|
|
||||||
# Static version of the library
|
# Static version of the library
|
||||||
|
@ -360,6 +360,7 @@ void Connection::process_schema(json_t* json)
|
|||||||
|
|
||||||
SRow Connection::process_row(json_t* js)
|
SRow Connection::process_row(json_t* js)
|
||||||
{
|
{
|
||||||
|
std::set<size_t> nulls;
|
||||||
ValueVector values;
|
ValueVector values;
|
||||||
values.reserve(m_keys->size());
|
values.reserve(m_keys->size());
|
||||||
m_error.clear();
|
m_error.clear();
|
||||||
@ -371,6 +372,11 @@ SRow Connection::process_row(json_t* js)
|
|||||||
|
|
||||||
if (v)
|
if (v)
|
||||||
{
|
{
|
||||||
|
if (json_is_null(v))
|
||||||
|
{
|
||||||
|
nulls.insert(values.size());
|
||||||
|
}
|
||||||
|
|
||||||
values.push_back(json_to_string(v));
|
values.push_back(json_to_string(v));
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -385,7 +391,7 @@ SRow Connection::process_row(json_t* js)
|
|||||||
|
|
||||||
if (m_error.empty())
|
if (m_error.empty())
|
||||||
{
|
{
|
||||||
rval = SRow(new Row(m_keys, m_types, values));
|
rval = SRow(new Row(m_keys, m_types, values, nulls));
|
||||||
}
|
}
|
||||||
|
|
||||||
return rval;
|
return rval;
|
||||||
|
@ -23,6 +23,7 @@
|
|||||||
#include <vector>
|
#include <vector>
|
||||||
#include <deque>
|
#include <deque>
|
||||||
#include <map>
|
#include <map>
|
||||||
|
#include <set>
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
#include <jansson.h>
|
#include <jansson.h>
|
||||||
|
|
||||||
@ -199,6 +200,31 @@ public:
|
|||||||
return m_values.at(it - m_keys->begin());
|
return m_values.at(it - m_keys->begin());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if a field has a NULL value
|
||||||
|
*
|
||||||
|
* @param i The field index
|
||||||
|
*
|
||||||
|
* @return True if the field has a NULL value
|
||||||
|
*/
|
||||||
|
bool is_null(size_t i) const
|
||||||
|
{
|
||||||
|
return m_nulls.count(i);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if a field has a NULL value
|
||||||
|
*
|
||||||
|
* @param str The field name
|
||||||
|
*
|
||||||
|
* @return True if the field has a NULL value
|
||||||
|
*/
|
||||||
|
bool is_null(const std::string& str) const
|
||||||
|
{
|
||||||
|
ValueVector::const_iterator it = std::find(m_keys->begin(), m_keys->end(), str);
|
||||||
|
return m_nulls.count(it - m_keys->begin());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the GTID of this row
|
* Get the GTID of this row
|
||||||
*
|
*
|
||||||
@ -243,15 +269,18 @@ private:
|
|||||||
SValueVector m_keys;
|
SValueVector m_keys;
|
||||||
SValueVector m_types;
|
SValueVector m_types;
|
||||||
ValueVector m_values;
|
ValueVector m_values;
|
||||||
|
std::set<size_t> m_nulls;
|
||||||
|
|
||||||
// Only a Connection should construct an InternalRow
|
// Only a Connection should construct an InternalRow
|
||||||
friend class Connection;
|
friend class Connection;
|
||||||
|
|
||||||
Row(SValueVector& keys,
|
Row(SValueVector& keys,
|
||||||
SValueVector& types,
|
SValueVector& types,
|
||||||
ValueVector& values):
|
ValueVector& values,
|
||||||
|
std::set<size_t>& nulls):
|
||||||
m_keys(keys),
|
m_keys(keys),
|
||||||
m_types(types)
|
m_types(types),
|
||||||
|
m_nulls(nulls)
|
||||||
{
|
{
|
||||||
m_values.swap(values);
|
m_values.swap(values);
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user