美文网首页DataX
DataX mongodb导出数据到mysql

DataX mongodb导出数据到mysql

作者: Victor_bigdata | 来源:发表于2019-06-13 19:55 被阅读0次

    Python版本要为2
    cmd乱码解决:输入CHCP 65001
    数据库中的数据中文乱码解决:在json文件中jdbcUrl项加上:?characterEncoding=utf8

    DataX介绍

    安装DataX

    DataX下载地址
    下载完成解压至某个路径下即可

    查看配置模板

    python datax.py -r {YOUR_READER} -w {YOUR_WRITER}

    例如mysql:

    C:\DataX\bin>python datax.py -r mysqlreader -w mysqlwriter
    
    DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
    Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
    
    
    Please refer to the mysqlreader document:
         https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md
    
    Please refer to the mysqlwriter document:
         https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md
    
    Please save the following configuration as a json file and  use
         python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
    to run the job.
    
    {
        "job": {
            "content": [
                {
                    "reader": {
                        "name": "mysqlreader",
                        "parameter": {
                            "column": [],
                            "connection": [
                                {
                                    "jdbcUrl": [],
                                    "table": []
                                }
                            ],
                            "password": "",
                            "username": "",
                            "where": ""
                        }
                    },
                    "writer": {
                        "name": "mysqlwriter",
                        "parameter": {
                            "column": [],
                            "connection": [
                                {
                                    "jdbcUrl": "",
                                    "table": []
                                }
                            ],
                            "password": "",
                            "preSql": [],
                            "session": [],
                            "username": "",
                            "writeMode": ""
                        }
                    }
                }
            ],
            "setting": {
                "speed": {
                    "channel": ""
                }
            }
        }
    }
    

    配置mongodb2mysql.json文件

    {
        "job": {
            "setting": {
                "speed": {
                    "channel": 1
                }
            },
            "content": [{
                "reader": {
                    "name": "mongodbreader",
                    "parameter": {
                        "address": ["*.*.*.*:27017"],
                        "userName": "root",
                        "userPassword": "123456",
                        "dbName": "weixin",
                        "collectionName": "fileids_wxpy",
                        "column": [{
                            "index":0,
                            "name": "_id",
                            "type": "string"
                        }, {
                            "index":1,
                            "name": "crawler_time",
                            "type": "string"
                        }, {
                            "index":2,
                            "name": "file_url",
                            "type": "string"
                        }, {
                            "index":3,
                            "name": "flag",
                            "type": "string"
                        }, {
                            "index":4,
                            "name": "logo_url",
                            "type": "string"
                        }, {
                            "index":5,
                            "name": "source",
                            "type": "string"
                        }, {
                            "index":6,
                            "name": "update_date",
                            "type": "string"
                        }, {
                            "index":7,
                            "name": "update_time",
                            "type": "long"
                        }, {
                            "index":8,
                            "name": "wx_id",
                            "type": "string"
                        }, {
                            "index":9,
                            "name": "wx_name",
                            "type": "string"
                        }]
                    }
                },
                 "writer": {
                        "name": "mysqlwriter", 
                        "parameter": {
                            "column": [
                            "id",
                            "crawler_time",
                            "file_url",
                            "flag",
                            "logo_url",
                            "source",
                            "update_date",
                            "update_time",
                            "wx_id",
                            "wx_name"
                            ], 
                            "connection": [
                                {
                                    "jdbcUrl": "jdbc:mysql://*.*.*.*:3306/weixin?characterEncoding=utf8", 
                                    "table": ["fileids_wxpy"]
                                }
                            ], 
                            "password": "123456", 
                            "username": "root"
                        }
                    }
            }]
        }
    
    }
    

    mysql新建数据库 、表

    create DATABASE weixin;
    use weixin;
    DROP TABLE IF EXISTS `fileids_wxpy`;
    CREATE TABLE `fileids_wxpy` (
      `id` bigint(20) unsigned NOT NULL,
      `crawler_time` int(10) unsigned NOT NULL,
      `file_url` varchar(255) NOT NULL,
      `flag` varchar(255) NOT NULL,
      `logo_url` varchar(255) NOT NULL,
      `source` varchar(255) NOT NULL,
      `update_date` int(10) unsigned NOT NULL,
      `update_time` int(10) unsigned NOT NULL,
      `wx_id` varchar(255) NOT NULL,
      `wx_name` varchar(255) NOT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    

    启动

    C:\DataX\bin>python datax.py mongodb2mysql.json
    

    报错

    Caused by: com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting for a server that matches ReadPreferenceServerSelector{readPreference=primary}. Client view of cluster state is {type=UNKNOWN, servers=[{address=*.*.*.*:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSecurityException: Exception authenticating MongoCredential{mechanism=null, userName='root', source='weixin', password=<hidden>, mechanismProperties={}}}, caused by {com.mongodb.MongoCommandException: Command failed with error 18: 'Authentication failed.' on server*.*.*.*:27017. The full response is { "ok" : 0.0, "errmsg" : "Authentication failed.", "code" : 18, "codeName" : "AuthenticationFailed" }}}]
            at com.mongodb.connection.BaseCluster.createTimeoutException(BaseCluster.java:369)
            at com.mongodb.connection.BaseCluster.selectServer(BaseCluster.java:101)
            at com.mongodb.binding.ClusterBinding$ClusterBindingConnectionSource.<init>(ClusterBinding.java:75)
            at com.mongodb.binding.ClusterBinding$ClusterBindingConnectionSource.<init>(ClusterBinding.java:71)
            at com.mongodb.binding.ClusterBinding.getReadConnectionSource(ClusterBinding.java:63)
            at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:201)
            at com.mongodb.operation.CountOperation.execute(CountOperation.java:206)
            at com.mongodb.operation.CountOperation.execute(CountOperation.java:53)
            at com.mongodb.Mongo.execute(Mongo.java:772)
            at com.mongodb.Mongo$2.execute(Mongo.java:759)
            at com.mongodb.MongoCollectionImpl.count(MongoCollectionImpl.java:185)
            at com.mongodb.MongoCollectionImpl.count(MongoCollectionImpl.java:165)
            at com.alibaba.datax.plugin.reader.mongodbreader.util.CollectionSplitUtil.doSplitInterval(CollectionSplitUtil.java:55)
            at com.alibaba.datax.plugin.reader.mongodbreader.util.CollectionSplitUtil.doSplit(CollectionSplitUtil.java:37)
            at com.alibaba.datax.plugin.reader.mongodbreader.MongoDBReader$Job.split(MongoDBReader.java:37)
            at com.alibaba.datax.core.job.JobContainer.doReaderSplit(JobContainer.java:732)
            at com.alibaba.datax.core.job.JobContainer.split(JobContainer.java:393)
            at com.alibaba.datax.core.job.JobContainer.start(JobContainer.java:117)
            ... 3 more
    

    原因

    MongoDB中每个数据库之间是相互独立的,都有独立的权限,正确的做法是使用root账号在【将要操作的数据库】中创建一个【子账号】,在用这个子账号连接mongo

    解决办法

    >use admin
    
    switched to db admin
    
    >db.auth("root","******")
    
    1
    
    >show dbs
    
    admin
    
    local
    
    weixin
    
    >use weixin
    
    switched to db weixin
    
    >db.createUser(
            {
                user:"DataXTest",
                pwd:"123456",
                roles:[{role:"dbOwner",db:"weixin"}]
            }
    )
    
    Successfully added user: {
        "user" : "DataXTest",
        "roles" : [
            {
                "role" : "dbOwner",
                "db" : "weixin"
            }
        ]
    
    }
    

    使用DataXTest来替换jsono配置文件中mongodb的账号root后,再次运行

    C:\DataX\bin>python datax.py mongodb2mysql.json
    
    2019-06-13 14:39:40.218 [job-0] INFO  JobContainer - PerfTrace not enable!
    2019-06-13 14:39:40.219 [job-0] INFO  StandAloneJobContainerCommunicator - Total 50115 records, 17716504 bytes | Speed 36.04KB/s, 104 records/s | Error 12 records, 3513 bytes |  All Task WaitWriterTime 259.684s |  All Task WaitReaderTime 207.041s | Percentage 100.00%
    2019-06-13 14:39:40.221 [job-0] INFO  JobContainer -
    任务启动时刻                    : 2019-06-13 14:31:33
    任务结束时刻                    : 2019-06-13 14:39:40
    任务总计耗时                    :                487s
    任务平均流量                    :           36.04KB/s
    记录写入速度                    :            104rec/s
    读出记录总数                    :               50115
    读写失败总数                    :                  12
    

    注: 此处错误的12条记录是由于id 长度超过19位

    相关文章

      网友评论

        本文标题:DataX mongodb导出数据到mysql

        本文链接:https://www.haomeiwen.com/subject/fxksfctx.html