基础环境需求:

Flink 支持用 Java,Scala 和 Python 进行开发,下面我将会用 Java 语言来实现这一需求。

用 Java 语言进行Flink 开发,需要 JDK8 以上版本和 Maven 3.2 以上版本,下面用命令来看一下基础环境是否安装:

mvn -version
java -version
python --version

基础环境都有了之后就可以初始化一个java开发的Flink基础工程了:

Windows脚本:

mvn archetype:generate -DinteractiveMode=false -DgroupId=org.khkw -DartifactId=No37-flink-env -Dpackage=sql -Dversion=0.0.1

Linux脚本:

mvn archetype:generate \
-DinteractiveMode=false \
-DgroupId=org.khkw \
-DartifactId=No37-flink-env \
-Dpackage=sql \
-Dversion=0.0.1

创建完成之后,导入IDEA,并运行自动生成的App.java。如果一切顺利,我们就可以进入FlinkSQL开发了。

Flink Hello World:

所谓HelloWorld,就是不关心业务逻辑,重在开发环境的调试。首先我们要在项目里面添加Flink SQL开发需要的依赖,作为第一次学习,不用关心每个细节,当作黑盒子。pom.xml文件核心依赖(部分)如下详细参阅

pom 依赖:

添加 properties 和 dependency:

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <flink.version>1.11.1</flink.version>
    <mysql.version>5.1.40</mysql.version>
    <scala.binary.version>2.11</scala.binary.version>
    <scala.version>2.11.12</scala.version>
</properties>

<dependencies>
    <!-- 利用Java开发 -->
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    </dependency>

    <!-- 使用Blink Planner -->
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    </dependency>

    <!-- 支持一些自定义的消息格式,比如kafka里面消息格式是json的,或者需要自定义函数支持 -->
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>${flink.version}</version>
    </dependency>

    <!-- JDBC Connector的支持,本案例会是使用MySQL -->
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    </dependency>

    <!-- Kafka Connector的支持-->
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-sql-connector-kafka-0.11_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    </dependency>

    <!-- Kafka里面的消息采用Json格式 -->
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-json</artifactId>
    <version>${flink.version}</version>
    </dependency>

    <!-- MySQL的驱动 -->
    <dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>${mysql.version}</version>
    </dependency>

    <!--提交作业所必须的依赖,比如:LocalExecutorFactory -->
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    </dependency>

    <!-- 日志方便调试 -->
    <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.7</version>
    <scope>runtime</scope>
    </dependency>
    <dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.17</version>
    <scope>runtime</scope>
    </dependency>
    <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>3.8.1</version>
    <scope>test</scope>
    </dependency>
</dependencies>

示例代码:

Flink为了方便用户测试,特意开发了内置的随机数据源(datagen source)和控制台打印数据汇(print sink),我们Helloworld示例就是声明数据源和数据输出DDL,然后直接将表结构形同的数据源(source)表写入到数据汇(sink)表。代码如下:

  • 数据源(source): 为了方便测试,flink提供了自动生成数据的随机Source

    String sourceDDL = "CREATE TABLE random_source (\n" +
        " f_sequence INT,\n" +
        " f_random INT,\n" +
        " f_random_str STRING\n" +
        ") WITH (\n" +
        " 'connector' = 'datagen',\n" +
        " 'rows-per-second'='5',\n" +
        " 'fields.f_sequence.kind'='sequence',\n" +
        " 'fields.f_sequence.start'='1',\n" +
        " 'fields.f_sequence.end'='1000',\n" +
        " 'fields.f_random.min'='1',\n" +
        " 'fields.f_random.max'='1000',\n" +
        " 'fields.f_random_str.length'='10'\n" +
        ")";
    
  • 数据汇(sink): 为了方便测试,flink提供了控制台打印的print。声明了和上面随机数据源一样的表schema的输出表

    String sinkDDL = "CREATE TABLE print_sink (\n" +
        " f_sequence INT,\n" +
        " f_random INT,\n" +
        " f_random_str STRING \n" +
        ") WITH (\n" +
        " 'connector' = 'print'\n" +
        ")";
    
  • 连接器参数:

    
    | 参数            | 是否必选 |        默认参数         |    数据类型     |                           描述                           |
    | :-------------- | :------: | :---------------------: | :-------------: | :------------------------------------------------------: |
    | connector       |   必须   |         (none)          |     String      |          指定要使用的连接器,这里是 'datagen'。          |
    | rows-per-second |   可选   |          10000          |      Long       |          每秒生成的行数,用以控制数据发出速率。          |
    | fields.#.kind   |   可选   |         random          |     String      |  指定 '#' 字段的生成器。可以是 'sequence' 或 'random'。  |
    | fields.#.min    |   可选   | (Minimum value of type) | (Type of field) |           随机生成器的最小值,适用于数字类型。           |
    | fields.#.max    |   可选   | (Maximum value of type) | (Type of field) |           随机生成器的最大值,适用于数字类型。           |
    | fields.#.length |   可选   |           100           |     Integer     | 随机生成器生成字符的长度,适用于 char、varchar、string。 |
    | fields.#.start  |   可选   |         (none)          | (Type of field) |                   序列生成器的起始值。                   |
    | fields.#.end    |   可选   |         (none)          | (Type of field) |                   序列生成器的结束值。                   |
    
  • 创建执行环境

    EnvironmentSettings settings = EnvironmentSettings
        .newInstance()
        .useBlinkPlanner()
        .inStreamingMode()
        .build();
    TableEnvironment tEnv = TableEnvironment.create(settings);
    
    • 两种Planner (useBlinkPlanner 和 useOldPlanner),useBlinkPlanner 实现了流批一体,它会将批处理当做流处理的一种特例,用同样的方式进行处理
    • 两种 Mode (inBatchMode 和 inStreamingMode),inBatchMode 表示进行批处理,inStreamingMode表示流处理。
  • 注册source和sink

    tEnv.executeSql(sourceDDL);
    tEnv.executeSql(sinkDDL);
    

    注册名为 random_source 和 print_sink 的表

  • 数据提取

    //from 扫描注册的订单表,进行数据提取
    Table sourceTab = tEnv.from("random_source");  //读
    //这里我们暂时先使用 标注了 deprecated 的API, 因为新的异步提交测试有待改进...
    sourceTab.insertInto("print_sink");            //写
    //执行作业
    tEnv.execute("Flink Hello World");
    

完整代码:

package sql;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

public class App {
    public static void main(String[] args) throws Exception {
        // 为了方便测试,flink提供了自动生成数据的source.
        String sourceDDL = "CREATE TABLE random_source (\n" +
                " f_sequence INT,\n" +
                " f_random INT,\n" +
                " f_random_str STRING\n" +
                ") WITH (\n" +
                " 'connector' = 'datagen',\n" +
                " 'rows-per-second'='5',\n" +
                " 'fields.f_sequence.kind'='sequence',\n" +
                " 'fields.f_sequence.start'='1',\n" +
                " 'fields.f_sequence.end'='1000',\n" +
                " 'fields.f_random.min'='1',\n" +
                " 'fields.f_random.max'='1000',\n" +
                " 'fields.f_random_str.length'='10'\n" +
                ")";

        // 为了方便测试,flink提供了控制台打印的print.
        String sinkDDL = "CREATE TABLE print_sink (\n" +
                " f_sequence INT,\n" +
                " f_random INT,\n" +
                " f_random_str STRING \n" +
                ") WITH (\n" +
                " 'connector' = 'print'\n" +
                ")";

        // 创建执行环境
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        TableEnvironment tEnv = TableEnvironment.create(settings);

        //注册source和sink
        tEnv.executeSql(sourceDDL);
        tEnv.executeSql(sinkDDL);

        //数据提取
        Table sourceTab = tEnv.from("random_source");
        //这里我们暂时先使用 标注了 deprecated 的API, 因为新的异步提交测试有待改进...
        sourceTab.executeInsert("print_sink");
        //执行作业
        tEnv.execute("Flink Hello World");
    }
}

运行之后会自动插入1000条数据到 print_sink,并打印到控制台。

下面添加外部数据源,先安装kafka:

当前用 docker 搭建开发环境是最简单的方式,docker 的下载和安装参见 https://www.docker.com/get-started 。

安装完成后,可以用命令行查看一下docker是否安装成功:

docker --version

安装Kafka:

//搜索可用镜像
docker search kafka

//选择第一个wurstmeister/kafka,安装镜像
docker pull wurstmeister/kafka:2.12-2.5.0

安装Zookeeper:

//搜索可用镜像
docker search zookeeper

//选择 zookeeper 3.6.1 版本,进行安装
docker pull zookeeper:3.6.1

安装完成之后,我们可以查看一下已经安装的镜像,如下命令:

docker images

接下来我们进行容器的启动,先启动 zookeeper,映射2181端口号,并用docker ps查看启动状态,如下:

docker run -d --name zookeeper  -p 2181:2181 -t zookeeper:3.6.1

接下来,我们启动Kafka,如下:

docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=127.0.0.1 --env KAFKA_ADVERTISED_PORT=9092 wurstmeister/kafka:2.12-2.5.0

查看Docker启动的所有正在运行的容器,如下:

docker ps

如上信息证明,Kafka已经成功启动,接下来我们简单测试一下,我们启动Kafka所在容器的shell,并且收发消息:

  • 进入Kafka命令行

    docker exec -it kafka /bin/bash
    
  • 尝试创建一个Topic,名字叫 test

    /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test
    
  • test Topic 发数据:

    kafka-console-producer 生产者

    bash-4.4# /opt/kafka/bin/kafka-console-producer.sh --topic=test --broker-list localhost:9092
    >
    >
    

然后启动另一个Kafka的Shell来消费消息:

docker exec -it kafka /bin/bash

接收数据,kafka-console-consumer 消费者:

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 -from-beginning --topic test

这时在发数据的 bash 中输入,hello,kafka

bash-4.4# /opt/kafka/bin/kafka-console-producer.sh --topic=test --broker-list localhost:9092
>hello
>kafka

接收数据的shell也会得到:

bash-4.4# /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 -from-beginning --topic test
hello
kafka

至此,Kafka的环境部署测试完成,下面进行MySQL部分。

安装 MySQL:

//搜索可用镜像
docker search mysql

//安装MySQL5.7
docker pull mysql:5.7

安装完成后,再次查看一下我们安装的所有镜像信息,如下:

docker images

MySQL安装成功后进行启动,名字叫 flink_mysql:

docker run -p 3306:3306 --name flink_mysql -e MYSQL_ROOT_PASSWORD=123456 -d mysql:5.7 

简单的解释一下命令参数含义如下:

  • -p 3306:3306:将容器的 3306 端口映射到主机的 3306 端口。
  • -e MYSQL_ROOT_PASSWORD=123456:初始化 root 用户的密码。
  • -d: 后台运行容器,并返回容器ID

如果显示监听3306端口不可用,那么需要先进任务管理器将本机的 mysql.exe 和 mysqld.exe 先 kill 掉,然后显示所有的docker容器(包括已经停止的),然后将停掉的 mysql 容器启动:

//显示所有的docker容器(包括已经停止的)
docker ps -a

//docker ps -a 命令会显示各个容器的 CONTAINER ID
//用 docker start [CONTAINER ID的前四位],来重新启动这个容器
docker start bcd6

Mysql已经启动,并映射了3306端口号,我们尝试连接一下名为 flink_mysql 的数据库:

C:\Users\F201021>docker exec -it flink_mysql bash
root@12a612bfe306:/# mysql -h localhost -u root -p
Enter password:

进入了Mysql的Shell命令行,我们可以尝试创建数据库和表,如下:

mysql> show databases;
mysql> create database flinkdb;
mysql> use flinkdb;

好了,如果上面这轮操作都成功了,那么证明MySQL环境已经可用了,所有基础环境安装完成。

下面先演示从kafka读数据并写入到控制台:

Flink 代码如下:

package sql;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

public class Kafka2Print {
    public static void main(String[] args) throws Exception {
        // Kafka 读
        String sourceDDL = "CREATE TABLE kafka_source (\n" +
                " msg STRING\n" +
                ") WITH (\n" +
                " 'connector' = 'kafka-0.11',\n" +
                " 'topic' = 'cdn-log',\n" +
                " 'properties.bootstrap.servers' = 'localhost:9092',\n" +
                " 'format' = 'json',\n" +
                " 'scan.startup.mode' = 'latest-offset'\n" +
                ")";

        // 写入控制台
        String sinkDDL = "CREATE TABLE print_sink (\n" +
                " msg STRING \n" +
                ") WITH (\n" +
                " 'connector' = 'print'\n" +
                ")";

        // 创建执行环境
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        TableEnvironment tEnv = TableEnvironment.create(settings);

        //注册source和sink
        tEnv.executeSql(sourceDDL);
        tEnv.executeSql(sinkDDL);

        //数据提取
        Table sourceTab = tEnv.from("kafka_source");
        //这里我们暂时先使用 标注了 deprecated 的API, 因为新的异步提交测试有待改进...
        sourceTab.insertInto("print_sink");
        //执行作业
        tEnv.execute("Flink Hello World");
    }
}
  • format:可以有多种选择,如:JSON,CSV,AVRO,Canal CDC,Debezium CDC等,详细见:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/

  • scan.startup.mode: 也可以有多种选择,如:earliest-offset, latest-offset, group-offsets, timestamp and specific-offsets详见:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#start-reading-position

  • 配置Mysql的连接属性比较简单,详细的其他参数参见https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html

因为Kafka的DDL连接属性的 topic 配置的 name 为 cdn-log,因此我们需要先启动kafka,并启动一个bash交互终端:

docker exec -it kafka /bin/bash

并创建一个 topic 也叫 cdn-log

/opt/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic cdn-log

然后可以看一下当前已经创建的 topic

/opt/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092

当发现 cdn-log 这个 topic 已经创建之后,我们就可以向 cdn-log Topic 发数据了,发送的数据为 {"msg":"welcome flink users..."},这是一个 json 格式的数据,键 msg 对应 Flink 代码中 kafka 的字段 msg,发送数据前需要先启动 Flink 程序:

/opt/kafka/bin/kafka-console-producer.sh --topic=cdn-log --broker-list localhost:9092
>{"msg":"welcome flink users..."}
>{"msg":"welcome flink users..."}
>{"msg":"welcome flink users..."}

这时能在 Flink 程序的控制台接收到发送的数据。

最后演示从kafka读数据并写入到MySQL:

Flink 代码如下:

package sql;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

public class Kafka2Mysql {
    public static void main(String[] args) throws Exception {
        // Kafka {"msg": "welcome flink users..."}
        String sourceDDL = "CREATE TABLE kafka_source (\n" +
                " msg STRING\n" +
                ") WITH (\n" +
                " 'connector' = 'kafka-0.11',\n" +
                " 'topic' = 'cdn-log',\n" +
                " 'properties.bootstrap.servers' = 'localhost:9092',\n" +
                " 'format' = 'json',\n" +
                " 'scan.startup.mode' = 'latest-offset'\n" +
                ")";

        // Mysql
        String sinkDDL = "CREATE TABLE mysql_sink (\n" +
                " msg STRING \n" +
                ") WITH (\n" +
                "  'connector' = 'jdbc',\n" +
                "   'url' = 'jdbc:mysql://localhost:3306/flinkdb?characterEncoding=utf-8&useSSL=false',\n" +
                "   'table-name' = 'cdn_log',\n" +
                "   'username' = 'root',\n" +
                "   'password' = '123456',\n" +
                "   'sink.buffer-flush.max-rows' = '1'\n" +
                ")";

        // 创建执行环境
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        TableEnvironment tEnv = TableEnvironment.create(settings);

        //注册source和sink
        tEnv.executeSql(sourceDDL);
        tEnv.executeSql(sinkDDL);

        //数据提取
        Table sourceTab = tEnv.from("kafka_source");
        //这里我们暂时先使用 标注了 deprecated 的API, 因为新的异步提交测试有待改进...
        sourceTab.insertInto("mysql_sink");
        //执行作业
        tEnv.execute("Flink Hello World");
    }
}

创建一个Mysql的Table,比如叫:cdn_log:

mysql> create table cdn_log (msg VARCHAR(300));

然后在IDE启动SQL作业,然后向Kafka的cdn-log主题发送消息 {"msg": "welcome flink users..."},然后再Mysql的shell环境对表cdn_log表进行查询:

//启动kafka,并启动一个bash交互终端
docker exec -it kafka /bin/bash

/opt/kafka/bin/kafka-console-producer.sh --topic=cdn-log --broker-list localhost:9092
>{"msg":"welcome flink users..."}
>{"msg":"welcome flink users..."}
>{"msg":"welcome flink users..."}

然后查看 MySQL 中的 cdn_log 数据表:

select * from cdn_log;

接收到数据,则说明数据已经从 kafka 读到并写入 MySQL了

参考链接:

1.Apache Flink 漫谈系列 - 搭建Flink 1.11 版本 Table API/SQL开发环境(需求驱动)

2.Flink 1.11官方文档 —— DataGen SQL 连接器

3.Flink 1.11官方文档 —— CREATE 语句

4.Flink 1.11官方文档 —— Flink 支持的表格式

5.Apache Kafka SQL连接器

6.JDBC SQL连接器

7.Docker容器的创建、启动、和停止

8.两种计划器(Planner)的主要区别