Files
oceanbase/docs/docs-cn/7.developer-guide-1/5.migrate-data-to-oceanbase/3.examples-of-the-datax-reader-and-writer.md
2022-02-10 14:51:49 +08:00

14 KiB

不同数据源的 DataX 读写插件示例

DataX 官网支持绝大部分主流数据源的读写插件,并且有详细的使用文档。

CSV 文件的读写插件

csv 文件就是文本文件,用 txtreader 和 txtwriter 读写。配置文件详细语法请参见 DataX官网说明

txtreader配置示例:

       "reader":{
                    "name":"txtfilereader",
                    "parameter":{
                        "path":["文件全路径"],
                        "encoding":"UTF-8",
                        "column":[
                            {    "index":0,    "type":"long"    }
                            ,{    "index":1,    "type":"long"    }
                            ,{    "index":2,    "type":"string"    }
                            ,{    "index":3,    "type":"double"    }
                            ,{    "index":4,    "type":"string"    }
                        ],
                        "fieldDelimiter":"||",
                        "fileFormat":"text"
                    }
                }

txtwriter 配置示例:

            "writer":{
                    "name":"txtfilewriter",
                    "parameter":{
                        "path":"文件全路径",
                        "fileName":"文件名",
                        "writeMode":"truncate",
                        "dateFormat":"yyyy-MM-dd",
                        "charset":"UTF-8",
                        "nullFormat":"",
                        "fileDelimiter":"||"
                    }
                }

MySQL 数据库的读写插件

针对 MySQL 数据库,用 mysqlreader 和 mysqlwriter 插件读写。

mysqlreader 配置示例:

                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "root",
                        "column": [
                            "id",
                            "name"
                        ],
                        "splitPk": "db_id",
                        "connection": [
                            {
                                "table": [
                                    "table"
                                ],
                                "jdbcUrl": [
     "jdbc:mysql://127.0.0.1:3306/database"
                                ]
                            }
                        ]
                    }
                }

mysqlwriter 配置示例:

                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "insert",
                        "username": "root",
                        "password": "root",
                        "column": [
                            "id",
                            "name"
                        ],
                        "session": [
                            "set session sql_mode='ANSI'"
                        ],
                        "preSql": [
                            "delete from test"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/datax?useUnicode=true&characterEncoding=gbk",
                                "table": [
                                    "test"
                                ]
                            }
                        ]
                    }
                }

Oracle 数据库的读写插件

针对 Oracle 数据库,用 oraclereader 和 oraclewriter 插件来读写。

oraclereader 配置示例:

                "reader": {
                    "name": "oraclereader",
                    "parameter": {
                        // 数据库连接用户名
                        "username": "root",
                        // 数据库连接密码
                        "password": "root",
                        "column": [
                            "id","name"
                        ],
                        //切分主键
                        "splitPk": "db_id",
                        "connection": [
                            {
                                "table": [
                                    "table"
                                ],
                                "jdbcUrl": [
     "jdbc:oracle:thin:@[HOST_NAME]:PORT:[DATABASE_NAME]"
                                ]
                            }
                        ]
                    }
                }

oraclewriter 配置示例:

                "writer": {
                    "name": "oraclewriter",
                    "parameter": {
                        "username": "root",
                        "password": "root",
                        "column": [
                            "id",
                            "name"
                        ],
                        "preSql": [
                            "delete from test"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:oracle:thin:@[HOST_NAME]:PORT:[DATABASE_NAME]",
                                "table": [
                                    "test"
                                ]
                            }
                        ]
                    }
                }

DB2 数据库的读写插件

db2reader 配置示例:

"reader":{
                    "name":"db2reader",
                    "parameter":{
                        "username":"SRC_DB_UESRNAME",
                        "password":"SRC_DB_PASSWORD",
                        "column":[
                            "SRC_COLUMN_LIST"
                        ],
                        "connection":[
                            {
                                "table":[
                                    "SRC_TABLE_NAME"
                                ],
                                "jdbcUrl":[
"jdbc:db2://SRC_DB_IP:SRC_DB_PORT/SRC_DB_NAME"
                                ]
                            }
                        ]
                    }
                }

db2writer 配置示例:

OceanBase 数据库的读写插件

OceanBase 数据库使用插件 oceanbasev10reader 和 oceanbasev10writer 来读写。该插件由 OceanBase 产品团队单独提供。

  • oceanbasev10reader 配置示例
                "reader":{
                    "name":"oceanbasev10reader",
                    "parameter":{
                        "where":"",
                        "timeout":10000,
                        "readBatchSize":100000,
                        "readByPartition":"true",
                        "column": [
                        "列名1","列名2"
                        ],
                        "connection":[
                            {
                                "jdbcUrl":["||_dsc_ob10_dsc_||集群名:租户名||_dsc_ob10_dsc_||jdbc:oceanbase://连接IP:连接端口/模式名或数据库名"],
                                "table":["表名"]
                            }
                        ],
                        "username":"租户内用户名",
                        "password":"密码"
                    }
                }

示例:OceanBase 表 ware 导出到 csv 文件

[admin@*** /home/admin/datax3]
$cat job/ob_tpcc_ware_2_csv.json
{
        "job":{
                "setting":{
                        "speed":{
                                "channel":10
                        },
                        "errorLimit":{
                                "record":0, "percentage": 0.02
                        }
                },
                "content":[
                        {
                                "reader":{
                                        "name":"oceanbasev10reader",
                                        "parameter":{
                                                "where":"",
                                                "timeout":10000,
                                                "readBatchSize":100000,
                                                "readByPartition":"true",
                                                "column": [
"W_ID","W_YTD","W_TAX","W_NAME","W_STREET_1","W_STREET_2","W_CITY","W_STATE","W_ZIP"
                        ],
                                                "connection":[
                                                        {
                                                                "jdbcUrl":["||_dsc_ob10_dsc_||obdemo:obbmsql||_dsc_ob10_dsc_||jdbc:oceanbase://127.1:2883/tpcc"],
                                                                "table":["ware"]
                                                        }
                                                ],
                                                "username":"tpcc",
                                                "password":"123456"
                                        }
                                },
                                "writer":{
                                        "name":"txtfilewriter",
                                        "parameter":{
                                                "path":"/home/admin/csvdata/",
                                                "fileName":"ware",
                                                "writeMode":"truncate",
                                                "dateFormat":"yyyy-MM-dd",
                                                "charset":"UTF-8",
                                                "nullFormat":"",
                                                "fileDelimiter":"||"
                                        }
                                }
                        }
                ]
        }
}

[admin@*** /home/admin/datax3]
$bin/datax.py job/ob_tpcc_ware_2_csv.json

image.png

  • oceanbasev10writer 配置示例

使用 DataX 向 OceanBase 里写入时,要避免写入速度过快导致 OceanBase 的增量内存耗尽。通常建议 DataX 配置文件里针对写入做一个写入限速设置。关键字是 memstoreThreshold :

                "writer": {
                    "name": "oceanbasev10writer",
                    "parameter": {
                        "username": "租户内的用户名",
                        "password": "密码",
                      "writeMode": "insert",
                        "column": [
                          "列名1","列名2" 
                         ],
                        "preSql": [
                            ""
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "||_dsc_ob10_dsc_||集群名:租户名||_dsc_ob10_dsc_||jdbc:oceanbase://连接IP:连接端口(默认2883)/模式名或数据库名",
                                "table": [
                                    "表名"
                                ]
                            }
                        ],
                        "batchSize": 1024,
                        "memstoreThreshold": "90"
                    }
                }

示例:从 csv 文件导入到 OceanBase 表中

[admin@*** /home/admin/datax3]
$cat job/csv_2_ob_tpcc_ware2.json
{
    "job":{
        "setting":{
            "speed":{
                "channel":32
            },
            "errorLimit":{
                "record":0, "percentage": 0.02
            }
        },
        "content":[
            {
                "reader":{
                    "name":"txtfilereader",
                    "parameter":{
                        "path":["/home/admin/csvdata/ware*"],
                        "encoding":"UTF-8",
                        "column":[
                            {    "index":0,    "type":"long"    }
                            ,{    "index":1,    "type":"long"    }
                            ,{    "index":2,    "type":"long"    }
                            ,{    "index":3,    "type":"string"    }
                            ,{    "index":4,    "type":"string"    }
                            ,{    "index":5,    "type":"string"    }
                            ,{    "index":6,    "type":"string"    }
                            ,{    "index":7,    "type":"string"    }
                            ,{    "index":8,    "type":"string"    }
                        ],
                        "fieldDelimiter":",",
                        "fileFormat":"text"
                    }
                },
                "writer":{
                    "name":"oceanbasev10writer",
                    "parameter":{
                        "writeMode":"insert",
                        "column":[
                                "W_ID","W_YTD","W_TAX","W_NAME","W_STREET_1","W_STREET_2","W_CITY","W_STATE","W_ZIP"
                        ],
                        "connection":[
                            {
"jdbcUrl":"||_dsc_ob10_dsc_||obdemo:obbmsql||_dsc_ob10_dsc_||jdbc:oceanbase://127.1:2883/tpcc",
                                "table":["WARE2"]
                            }
                        ],
                        "username":"tpcc",
                        "password":"123456",
                        "batchSize":256,
                        " memstoreThreshold":"90"
                    }
                }
            }
        ]
    }
}


[admin@*** /home/admin/datax3]
$bin/datax.py job/csv_2_ob_tpcc_ware2.json

image.png

image.png