Mysql 流增量写入 Hdfs(一) –从 mysql 到 kafka

一. 概述

在大数据的静态数据处理中,目前普遍采用的是用 Spark + Hdfs (Hive / Hbase) 的技术架构来对数据进行处理。

但有时候有其他的需求,需要从其他不同数据源不间断得采集数据,然后存储到 Hdfs 中进行处理。而追加(append)这种操作在 Hdfs 里面明显是比较麻烦的一件事。所幸有了 Storm 这么个流数据处理这样的东西问世,可以帮我们解决这些问题。

不过光有 Storm 还不够,我们还需要其他中间件来协助我们,让所有其他数据源都归于一个通道。这样就能实现不同数据源以及 Hhdfs 之间的解耦。而这个中间件 Kafka 无疑是一个很好的选择。

这样我们就可以让 Mysql 的增量数据不停得抛出到 Kafka ,而后再让 storm 不停得从 Kafka 对应的 Topic 读取数据并写入到 Hdfs 中。

二. 基本知识

2.1 Mysql binlog 介绍

binlog 即 Mysql 的二进制日志。它可以说是 Mysql 最重要的日志了,它记录了所有的DDL和DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间,MySQL的二进制日志是事务安全型的。

上面所说的提到了 DDL 和 DML ,可能有些同学不了解,这里顺便说一下:

  • DDL:数据定义语言DDL用来创建数据库中的各种对象—–表、视图、索引、同义词、聚簇等如:CREATE TABLE/VIEW/INDEX/SYN/CLUSTER…
  • DML:数据操纵语言DML主要有三种形式:插入(INSERT), 更新(UPDATE),以及删除(DELETE)。

在 Mysql 中,binlog 默认是不开启的,因为有大约 1% (官方说法)的性能损耗,如果要手动开启,流程如下:

  1. vi编辑打开mysql配置文件:
vi /usr/local/mysql/etc/my.cnf

在[mysqld] 区块设置/添加如下,

log-bin=mysql-bin 

注意一定要在 [mysqld] 下。

  1. 重启 Mysql
pkill mysqld
/usr/local/mysql/bin/mysqld_safe --user=mysql &

2.2 kafka

这里只对 Kafka 做一个基本的介绍,更多的内容可以度娘一波。

上面的图片是 kafka 官方的一个图片,我们目前只需要关注 Producers 和 Consumers 就行了。

Kafka 是一个分布式发布-订阅消息系统。分布式方面由 Zookeeper 进行协同处理。消息订阅其实说白了吧,就是一个队列,分为消费者和生产者,就像上图中的内容,有数据源充当 Producer 生产数据到 kafka 中,而有数据充当 Consumers ,消费 kafka 中的数据。

上图中的 offset 指的是数据的写入以及消费的位置的信息,这是由 Zookeeper 管理的。也就是说,当 Consumers 重启或是怎样,需要重新从 kafka 读取消息时,总不能让它从头开始消费数据吧,这时候就需要有个记录能告诉你从哪里开始重新读取。这就是 offset 。

kafka 中还有一个至关重要的概念,那就是 topic 。不过这个其实还是很好理解的,比如你要订阅一些消息,你肯定是不会订阅所有消息的吧,你只需要订阅你感兴趣的主题,比如摄影,编程,搞笑这些主题。而这里主题的概念其实和 topic 是一样的。总之,可以将 topic 归结为通道,kafka 中有很多个通道,不同的 Producer 向其中一个通道生产数据,也就是抛数据进去这个通道,Comsumers 不停得消费通道中的数据。

而我们要做的就是将 Mysql binlog 产生的数据抛到 kafka 中充当作生产者,然后由 storm 充当消费者,不停得消费数据并写入到 Hdfs 中。

至于怎么将 binlog 的数据抛到 kafka ,别急,下面我们就来介绍。

2.3 maxwell

maxwell 这个工具可以很方便得监听 Mysql 的 binlog ,然后每当 binlog 发生变化时,就会以 json 格式抛出对应的变化数据到 Kafka 中。比如当向 mysql 一张表中插入一条语句的时候,maxwell 就会立刻监听到 binlog 中有对应的记录增加,然后将一些信息包括插入的数据都转化成 json 格式,然后抛到 kafka 指定的 topic 中。

下载地址在这里可以找到。

除了 Kafka 外,其实 maxwell 还支持写入到其他各种中间件,比如 redis。
同时 maxwell 是比较轻量级的工具,只需要在 mysql 中新建一个数据库供它记录一些信息,然后就可以直接运行。

三. 使用 maxwell 监听 binlog

接下来我们将的是如果使用 maxwell ,让它监听 mysql 的 binlog 并抛到 kafka 中。maxwell 主要有两种运行方式。一种是使用配置文件,另一种则是在命令行中添加参数的方式运行。这里追求方便,只使用命令行的方式进行演示。

这里介绍一下简单的将数据抛到 kafka 的命令行脚本吧。

bin/maxwell --user='maxwell' --password='XXXXXX' --host='127.0.0.1' \
   --producer=kafka --kafka.bootstrap.servers=localhost:9092 --kafka_topic=maxwell --port=3306

各项参数说明如下

  • user:mysql 用户名
  • password:mysql 密码
  • host:Mysql 地址
  • producer:指定写入的中间件类型,比如还有 redies
  • kafka.bootstrap.servers:kafka 的地址
  • kafka_topic:指明写入到 kafka 哪个 topic
  • port:mysql 端口

启动之后,maxwell 便开始工作了,当然如果你想要让这条命令可以在后台运行的话,可以使用 Linux 的 nohup 命令,这里就不多赘述,有需要百度即可。

这样配置的话通常会将整个数据库的增删改都给抛到 kafka ,但这样的需求显然不常见,更常见的应该是具体监听对某个库的操作,或是某个表的操作。

在升级到 1.9.2(最新版本)后,maxwell 为我们提供这样一个参数,让我们可以轻松实现上述需求:–filter

这个参数通常包含两个配置项,exclude 和 include。意思就是让你指定排除哪些和包含哪些。比如我只想监听 Adatabase 库下的 Atable 表的变化。我可以这样。

--filter='exclude: *.*, include: Adatabase.Atable'

这样我们就可以轻松实现监听 mysql binlog 的变化,并可以定制自己的需求。

OK,这一章我们介绍了 mysql binlog ,kafka 以及 maxwell 的一些内容,下一篇我们将会看到 storm 如何写入 hdfs 以及定制一些策略。see you~~