美文网首页
使用Canal 从 MySQL 的Binlog 抽取数据 数据

使用Canal 从 MySQL 的Binlog 抽取数据 数据

作者: Mark_ZSQ | 来源:发表于2020-09-14 22:57 被阅读0次

    数据抽取是 ETL 流程的第一步。我们会将数据从 RDBMS 或日志服务器等外部系统抽取至数据仓库,进行清洗、转换、聚合等操作。在现代网站技术栈中,MySQL 是最常见的数据库管理系统,我们会从多个不同的 MySQL 实例中抽取数据,存入一个中心节点,或直接进入 Hive。市面上已有多种成熟的、基于 SQL 查询的抽取软件,如著名的开源项目 Apache Sqoop,然而这些工具并不支持实时的数据抽取。MySQL Binlog 则是一种实时的数据流,用于主从节点之间的数据复制,我们可以利用它来进行数据抽取。借助阿里巴巴开源的 Canal 项目,我们能够非常便捷地将 MySQL 中的数据抽取到任意目标存储中。

    Mysql Binlog是二进制格式的日志文件,但是不能把binlog文件等同于OS系统某目录下的具体文件,这是狭隘的。Binlog是用来记录Mysql内部对数据库的改动(只记录对数据的修改操作),主要用于数据库的主从复制以及增量恢复

    image.png

    1. Canal采集程序搭建

    使用java语言将canal中的binlog日志解析,并写入到Kafka中

    2.mysql开启binLog日志

    • 2.1使用vim打开 /etc/my.cnf
    • 2.2添加以下配置
    #配置binlog日志的存放路径为/var/lib/mysql目录,文件以mysql-bin开头 
    log-bin=/var/lib/mysql/mysql-bin 
    # 配置mysql中每一行记录的变化都会详细记录下来 
    binlog-format=ROW 
    # 配置当前机器器的服务ID(如果是mysql集群,不能重复) 
    server_id=1
    
    • 2.3重启mysql
    service mysqld restart
    
    • 2.4.进入Mysql客户端输入mysql> show variables like 'log_%'; 查看日志是否开启
    show variables like '%log_bin%';
    
    image.png
    • 2.5.MySQL 里配置Canal Server权限: 一定要加SHOW VIEW这个东西,不然后面报错
    CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
    GRANT SHOW VIEW, SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    FLUSH PRIVILEGES;
    
    • 2.6 下载canal的压缩包并解压:
      官网https://github.com/alibaba/canal/releases 下载 canal.deployer-1.1.1.tar.gz
      mkdir /usr/local/canal
      下载好的压缩包在上面的目录中解压 : tar –zxvf canal.deployer-1.1.1.tar.gz

    • 2.7.配置修改
      在解压好的目录下:
      vim conf/example/instance.properties
      修改这三个地方即可

    # mysql serverId,随便设置 ,slaveId 不能与 my.cnf 中的 server-id 项重复
    canal.instance.mysql.slaveId = 1234
    
    #position info,需要改成自己的数据库信息
    canal.instance.master.address = 127.0.0.1:3306
    canal.instance.dbUsername = canal
    canal.instance.dbPassword = canal
    canal.instance.connectionCharset = UTF-8
    
    #一定要注释掉下面这个参数,这样就会扫描全库
    #canal.instance.defaultDatabaseName = test
    
    # 订阅实例中所有的数据库和表canal.instance.filter.regex = .*\\..*
    
    • 2.8.启动:
    sh bin/startup.sh
    

    查看日志:

    vim logs/canal/canal.log
    

    有下面的这三句话即表示成功

    [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
    [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
    [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
    
    • 2.9.建一个java程序验证是否搭建好了:
      首先第一次我们要:
    vim conf/canal.properties
    

    找到canal.instance.parser.parallelThreadSize = 16
    把这一行的注释符号去掉(java才能成功运行)
    然后重启canal:

    sh bin/restart.sh
    

    3.java项目搭建

    3.1、导入pom依赖
     <dependencies>
            <dependency>
                <groupId>com.alibaba.otter</groupId>
                <artifactId>canal.client</artifactId>
                <version>1.0.24</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.11</artifactId>
                <version>0.9.0.1</version>
            </dependency>
    
            <!--对象和json 互相转换的-->
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.44</version>
            </dependency>
        </dependencies>
    
    3.2、编写GlobalConfigUtil工具类,读取application.properties配置文件

    步骤

    创建 GlobalConfigUtil 工具类,读取 application.properties 中的 canal 和 kafka 配置
    添加main方法,测试是否能正确读取配置

    • GlobalConfigUtil.java
    import java.util.ResourceBundle;
    
    /**
     * 配置文件的公共类
     */
    public class GlobalConfigUtil {
    
        //读取application.properties文件
        private static ResourceBundle resourceBundle = ResourceBundle.getBundle("application");
    
        public static String canalHost= resourceBundle.getString("canal.host");
        public static String canalPort = resourceBundle.getString("canal.port");
        public static String canalInstance = resourceBundle.getString("canal.instance");
        public static String mysqlUsername = resourceBundle.getString("mysql.username");
        public static String mysqlPassword=  resourceBundle.getString("mysql.password");
        public static String kafkaBootstrap= resourceBundle.getString("kafka.bootstrap.servers");
        public static String kafkaZookeeper= resourceBundle.getString("kafka.zookeeper.connect");
        public static String kafkaInput = resourceBundle.getString("kafka.input.topic");
    
        public static void main(String[] args) {
            System.out.println(canalHost);
        }
    }
    
    
    • 创建KafkaSender 工具类,实现将数据发送到kafka
      KafkaSender.java
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    import kafka.serializer.StringEncoder;
    
    import java.util.Properties;
    
    /**
     * Kafka生产消息工具类
     */
    public class KafkaSender {
        private String topic;
    
        public KafkaSender(String topic){
            super();
            this.topic = topic;
        }
    
        /**
         * 发送消息到Kafka指定topic
         *
         * @param topic topic名字
         * @param key 键值
         * @param data 数据
         */
        public static void sendMessage(String topic , String key , String data){
            Producer<String, String> producer = createProducer();
            producer.send(new KeyedMessage<String , String>(topic , key , data));
        }
    
        /**
         * 创建生产者实例
         * @return
         */
        private static Producer<String , String> createProducer(){
            Properties properties = new Properties();
    
            properties.put("metadata.broker.list" , GlobalConfigUtil.kafkaBootstrap);
            properties.put("zookeeper.connect" , GlobalConfigUtil.kafkaZookeeper);
            properties.put("serializer.class" , StringEncoder.class.getName());
    
            return new Producer<String, String>(new ProducerConfig(properties));
        }
    }
    
    • 编写Canal客户端类,解析binlog日志并将数据发送至kafka
      CanalClient.java
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.client.CanalConnectors;
    import com.alibaba.otter.canal.protocol.CanalEntry;
    import com.alibaba.otter.canal.protocol.Message;
    import com.itheima.canal_kafka.util.GlobalConfigUtil;
    import com.itheima.canal_kafka.util.KafkaSender;
    
    import java.net.InetSocketAddress;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.UUID;
    
    /**
     * Canal解析binlog日志工具类
     */
    public class CanalClient {
    
        static class ColumnValuePair {
            private String columnName;
            private String columnValue;
            private Boolean isValid;
    
            public ColumnValuePair(String columnName, String columnValue, Boolean isValid) {
                this.columnName = columnName;
                this.columnValue = columnValue;
                this.isValid = isValid;
            }
    
            public String getColumnName() { return columnName; }
            public void setColumnName(String columnName) { this.columnName = columnName; }
            public String getColumnValue() { return columnValue; }
            public void setColumnValue(String columnValue) { this.columnValue = columnValue; }
            public Boolean getIsValid() { return isValid; }
            public void setIsValid(Boolean isValid) { this.isValid = isValid; }
        }
    
        /**
         * 获取Canal连接
         *
         * @param host     主机名
         * @param port     端口号
         * @param instance Canal实例名
         * @param username 用户名
         * @param password 密码
         * @return Canal连接器
         */
        public static CanalConnector getConn(String host, int port, String instance, String username, String password) {
            CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(host, port), instance, username, password);
            return canalConnector;
        }
    
        /**
         * 解析Binlog日志
         *
         * @param entries    Binlog消息实体
         * @param emptyCount 操作的序号
         */
        public static void analysis(List<CanalEntry.Entry> entries, int emptyCount) {
            for (CanalEntry.Entry entry : entries) {
                // 只解析mysql事务的操作,其他的不解析
                if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN ||
                        entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                    continue;
                }
    
                // 那么解析binlog
                CanalEntry.RowChange rowChange = null;
    
                try {
                    rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    e.printStackTrace();
                }
    
                // 获取操作类型字段(增加  删除  修改)
                CanalEntry.EventType eventType = rowChange.getEventType();
                // 获取binlog文件名称
                String logfileName = entry.getHeader().getLogfileName();
                // 读取当前操作在binlog文件的位置
                long logfileOffset = entry.getHeader().getLogfileOffset();
                // 获取当前操作所属的数据库
                String dbName = entry.getHeader().getSchemaName();
                // 获取当前操作所属的表
                String tableName = entry.getHeader().getTableName();//当前操作的是哪一张表
                long timestamp = entry.getHeader().getExecuteTime();//执行时间
    
                // 解析操作的行数据
                for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                    // 删除操作
                    if (eventType == CanalEntry.EventType.DELETE) {
                        // 获取删除之前的所有列数据
                        dataDetails(rowData.getBeforeColumnsList(), logfileName, logfileOffset, dbName, tableName, eventType, emptyCount,timestamp);
                    }
                    // 新增操作
                    else if (eventType == CanalEntry.EventType.INSERT) {
                        // 获取新增之后的所有列数据
                        dataDetails(rowData.getAfterColumnsList(), logfileName, logfileOffset, dbName, tableName, eventType, emptyCount,timestamp);
                    }
                    // 更新操作
                    else {
                        // 获取更新之后的所有列数据
                        dataDetails(rowData.getAfterColumnsList(), logfileName, logfileOffset, dbName, tableName, eventType, emptyCount,timestamp);
                    }
                }
            }
        }
    
        /**
         * 解析具体一条Binlog消息的数据
         *
         * @param columns       当前行所有的列数据
         * @param logFileName   binlog文件名
         * @param logFileOffset 当前操作在binlog中的位置
         * @param dbName        当前操作所属数据库名称
         * @param tableName     当前操作所属表名称
         * @param eventType     当前操作类型(新增、修改、删除)
         * @param emptyCount    操作的序号
         */
        private static void dataDetails(List<CanalEntry.Column> columns,
                                        String logFileName,
                                        Long logFileOffset,
                                        String dbName,
                                        String tableName,
                                        CanalEntry.EventType eventType,
                                        int emptyCount,
                                        long timestamp) {
    
            // 找到当前那些列发生了改变  以及改变的值
            List<ColumnValuePair> columnValueList = new ArrayList<ColumnValuePair>();
    
            for (CanalEntry.Column column : columns) {
                ColumnValuePair columnValuePair = new ColumnValuePair(column.getName(), column.getValue(), column.getUpdated());
                columnValueList.add(columnValuePair);
            }
    
            String key = UUID.randomUUID().toString();
    
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("logFileName", logFileName);
            jsonObject.put("logFileOffset", logFileOffset);
            jsonObject.put("dbName", dbName);
            jsonObject.put("tableName", tableName);
            jsonObject.put("eventType", eventType);
            jsonObject.put("columnValueList", columnValueList);
            jsonObject.put("emptyCount", emptyCount);
            jsonObject.put("timestamp", timestamp);
    
    
            // 拼接所有binlog解析的字段
            String data = JSON.toJSONString(jsonObject);
    
            System.out.println(data);
    
            // 解析后的数据发送到kafka
            KafkaSender.sendMessage(GlobalConfigUtil.kafkaInput, key, data);
        }
    
        /**
         * 客户端入口方法
         * @param args
         */
        public static void main(String[] args) {
            // 加载配置文件
            String host = GlobalConfigUtil.canalHost;
            int port = Integer.parseInt(GlobalConfigUtil.canalPort);
            String instance = GlobalConfigUtil.canalInstance;
            String username = GlobalConfigUtil.mysqlUsername;
            String password = GlobalConfigUtil.mysqlPassword;
    
            // 获取Canal连接
            CanalConnector conn = getConn(host, port, instance, username, password);
    
            // 从binlog中读取数据
            int batchSize = 100;
            int emptyCount = 1;
    
            try {
                conn.connect();
                conn.subscribe(".*\\..*");
                conn.rollback();
    
                int totalCount = 120; //循环次数
    
                while (totalCount > emptyCount) {
                    // 获取数据
                    Message message = conn.getWithoutAck(batchSize);
    
                    long id = message.getId();
                    int size = message.getEntries().size();
                    if (id == -1 || size == 0) {
                        //没有读取到任何数据
                    } else {
                        //有数据,那么解析binlog日志
                        analysis(message.getEntries(), emptyCount);
                        emptyCount++;
                    }
                    // 确认消息
                    conn.ack(message.getId());
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                conn.disconnect();
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:使用Canal 从 MySQL 的Binlog 抽取数据 数据

          本文链接:https://www.haomeiwen.com/subject/btptyktx.html