 7723e7c933
			
		
	
	7723e7c933
	
	
	
		
			
			Repurposed the Replicator from the CDC integration project as a replication event processing service. It is similar to the CDC version of the Replicator and is still in the same namespace but it lacks all of the cross-thread communication that was a part of the integration project.
		
			
				
	
	
		
			111 lines
		
	
	
		
			2.4 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			111 lines
		
	
	
		
			2.4 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /*
 | |
|  * Copyright (c) 2019 MariaDB Corporation Ab
 | |
|  *
 | |
|  * Use of this software is governed by the Business Source License included
 | |
|  * in the LICENSE.TXT file and at www.mariadb.com/bsl11.
 | |
|  *
 | |
|  * Change Date: 2022-01-01
 | |
|  *
 | |
|  * On the date above, in accordance with the Business Source License, use
 | |
|  * of this software will be governed by version 2 or later of the General
 | |
|  * Public License.
 | |
|  */
 | |
| 
 | |
| #include "sql.hh"
 | |
| 
 | |
| SQL::SQL(MYSQL* mysql, const cdc::Server& server)
 | |
|     : m_mysql(mysql)
 | |
|     , m_server(server)
 | |
| {
 | |
| }
 | |
| 
 | |
| SQL::~SQL()
 | |
| {
 | |
|     mysql_free_result(m_res);
 | |
|     mariadb_rpl_close(m_rpl);
 | |
|     mysql_close(m_mysql);
 | |
| }
 | |
| 
 | |
| std::pair<std::string, std::unique_ptr<SQL>> SQL::connect(const std::vector<cdc::Server>& servers,
 | |
|                                                           int connect_timeout, int read_timeout)
 | |
| {
 | |
|     std::unique_ptr<SQL> rval;
 | |
|     MYSQL* mysql = nullptr;
 | |
|     std::string error;
 | |
| 
 | |
|     if (servers.empty())
 | |
|     {
 | |
|         error = "No servers defined";
 | |
|     }
 | |
| 
 | |
|     for (const auto& server : servers)
 | |
|     {
 | |
|         if (!(mysql = mysql_init(nullptr)))
 | |
|         {
 | |
|             error = "Connection initialization failed";
 | |
|             break;
 | |
|         }
 | |
| 
 | |
|         mysql_optionsv(mysql, MYSQL_OPT_CONNECT_TIMEOUT, &connect_timeout);
 | |
|         mysql_optionsv(mysql, MYSQL_OPT_READ_TIMEOUT, &read_timeout);
 | |
| 
 | |
|         if (!mysql_real_connect(mysql, server.host.c_str(), server.user.c_str(), server.password.c_str(),
 | |
|                                 nullptr, server.port, nullptr, 0))
 | |
|         {
 | |
|             error = "Connection creation failed: " + std::string(mysql_error(mysql));
 | |
|             mysql_close(mysql);
 | |
|             mysql = nullptr;
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             // Successful connection
 | |
|             rval.reset(new SQL(mysql, server));
 | |
|             error.clear();
 | |
|             break;
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     return {error, std::move(rval)};
 | |
| }
 | |
| 
 | |
| bool SQL::query(const std::string& sql)
 | |
| {
 | |
|     if (m_res)
 | |
|     {
 | |
|         mysql_free_result(m_res);
 | |
|         m_res = nullptr;
 | |
|     }
 | |
| 
 | |
|     return mysql_query(m_mysql, sql.c_str()) == 0;
 | |
| }
 | |
| 
 | |
| bool SQL::query(const std::vector<std::string>& sql)
 | |
| {
 | |
|     for (const auto& a : sql)
 | |
|     {
 | |
|         if (!query(a.c_str()))
 | |
|         {
 | |
|             return false;
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     return true;
 | |
| }
 | |
| 
 | |
| bool SQL::replicate(int server_id)
 | |
| {
 | |
|     if (!(m_rpl = mariadb_rpl_init(m_mysql)))
 | |
|     {
 | |
|         return false;
 | |
|     }
 | |
| 
 | |
|     mariadb_rpl_optionsv(m_rpl, MARIADB_RPL_SERVER_ID, &server_id);
 | |
| 
 | |
|     if (mariadb_rpl_open(m_rpl))
 | |
|     {
 | |
|         return false;
 | |
|     }
 | |
| 
 | |
|     return true;
 | |
| }
 |