美文网首页Flink架构
Flink(1.13) 中的Sink

Flink(1.13) 中的Sink

作者: 万事万物 | 来源:发表于2021-08-11 18:04 被阅读0次

前言:

版本:1.13

通用Maven依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flink</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <flink.version>1.13.1</flink.version>
        <scala.binary.version>2.12</scala.binary.version>
        <slf4j.version>1.7.30</slf4j.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.0</version>
            <scope>provided</scope>
        </dependency>

        <!--读取hdfs文件-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.3</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>

    </dependencies>


    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.4</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers combine.children="append">
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>


</project>

KafkaSink

  • 导入依赖

        <!--读取Kafka-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
  • 程序
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        /**
         * 从网络中读取数据
         * 参数:ip,段口,分隔符,重试次数
         */
        SocketTextStreamFunction socketText = new SocketTextStreamFunction("hadoop102", 9999, "\n", 3);
        DataStreamSource<String> source = env.addSource(socketText);

        source.print("kafka>>>");

        // 将数据写入Kafka

        // 集群地址
        String brokerList = "hadoop102:9092,hadoop103:9092,hadoop104:9092";
        String topicId = "flink_kafka";
        FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>(brokerList, topicId, new SimpleStringSchema());
        kafkaSink.invoke("javaddd");
        // 添加sink
        source.addSink(kafkaSink);

        // 启动执行
        env.execute();
    }
  • 输入
[admin@hadoop102 ~]$ nc -lk 9999
java
wwww
  • 输出(消费者)
[admin@hadoop102 bin]$ sh kafka-console-consumer.sh --bootstrap-server hadoop102:9092, hadoop103:9092, hadoop104:9092
--topic flink_kafka

java
wwww

RedisSink

  • 导入依赖
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-redis -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-redis_2.11</artifactId>
    <version>1.1.5</version>
</dependency>
  • 程序
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        /**
         * 从网络中读取数据
         * 参数:ip,段口,分隔符,重试次数
         */
        SocketTextStreamFunction socketText = new SocketTextStreamFunction("hadoop102", 9999, "\n", 3);
        DataStreamSource<String> source = env.addSource(socketText);

        // redis 连接配置
        FlinkJedisPoolConfig builder = new FlinkJedisPoolConfig.Builder()
                .setHost("hadoop102")
                .setPort(6379)
                .build();

        // redis 命令
        RedisMapper<String> redisMapper = new RedisMapper<String>() {

            @Override
            public RedisCommandDescription getCommandDescription() {
                return new RedisCommandDescription(RedisCommand.HSET,"key_sss");
            }

            @Override
            public String getKeyFromData(String data) {
                return data+"_ByKey";
            }

            @Override
            public String getValueFromData(String data) {
                return data;
            }
        };


        org.apache.flink.streaming.connectors.redis.RedisSink<String> stringRedisSink =
                new org.apache.flink.streaming.connectors.redis.RedisSink<>(builder,redisMapper);

        source.addSink(stringRedisSink);


        env.execute();

    }
  • 输入
java
  • 输出
> hget key_sss java_ByKey
java
  • 支持的redis命令
public enum RedisCommand {

    /**
     * Insert the specified value at the head of the list stored at key.
     * If key does not exist, it is created as empty list before performing the push operations.
     */
    LPUSH(RedisDataType.LIST),

    /**
     * Insert the specified value at the tail of the list stored at key.
     * If key does not exist, it is created as empty list before performing the push operation.
     */
    RPUSH(RedisDataType.LIST),

    /**
     * Add the specified member to the set stored at key.
     * Specified member that is already a member of this set is ignored.
     */
    SADD(RedisDataType.SET),

    /**
     * Set key to hold the string value. If key already holds a value,
     * it is overwritten, regardless of its type.
     */
    SET(RedisDataType.STRING),

    /**
     * Adds the element to the HyperLogLog data structure stored at the variable name specified as first argument.
     */
    PFADD(RedisDataType.HYPER_LOG_LOG),

    /**
     * Posts a message to the given channel.
     */
    PUBLISH(RedisDataType.PUBSUB),

    /**
     * Adds the specified members with the specified score to the sorted set stored at key.
     */
    ZADD(RedisDataType.SORTED_SET),

    /**
     * Sets field in the hash stored at key to value. If key does not exist,
     * a new key holding a hash is created. If field already exists in the hash, it is overwritten.
     */
    HSET(RedisDataType.HASH);
}
import lombok.SneakyThrows;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        /**
         * 从网络中读取数据
         * 参数:ip,段口,分隔符,重试次数
         */
        SocketTextStreamFunction socketText = new SocketTextStreamFunction("hadoop102", 9999, "\n", 3);
        DataStreamSource<String> source = env.addSource(socketText);


        // 将数据解析成Bean
        SingleOutputStreamOperator<UserBean> map = source.map(e -> new UserBean(e.split(",")))
                .returns(Types.POJO(UserBean.class));


        // es 连接
        List<HttpHost> httpHosts=new ArrayList<>();
        httpHosts.add(new HttpHost("hadoop102",9200,"http"));
        httpHosts.add(new HttpHost("hadoop103",9200,"http"));
        httpHosts.add(new HttpHost("hadoop104",9200,"http"));


        // es 写入数据
        org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink.Builder builder = new org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink.Builder(
                httpHosts,
                new ElasticsearchSinkFunction<UserBean>() {

                    public IndexRequest createIndexRequest(UserBean element) throws IllegalAccessException {
                        return Requests.indexRequest()
                                .index("user_index")
                                .type("_doc")
                                .source(objectToMap(element));
                    }

                    @SneakyThrows
                    @Override
                    public void process(UserBean element, RuntimeContext ctx, RequestIndexer indexer) {
                        indexer.add(createIndexRequest(element));
                    }
                }
        );
        // 指定攒批策略
        builder.setBulkFlushInterval(1000);

        org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink sink = builder.build();

        map.addSink(sink);

        env.execute();


    }

    /**
     * 将Object对象里面的属性和值转化成Map对象
     *
     * @param obj
     * @return
     * @throws IllegalAccessException
     */
    public static Map<String, Object> objectToMap(Object obj) throws IllegalAccessException {
        Map<String, Object> map = new HashMap<>();
        Class<?> clazz = obj.getClass();
        for (Field field : clazz.getDeclaredFields()) {
            field.setAccessible(true);
            String fieldName = field.getName();
            Object value = field.get(obj);
            map.put(fieldName, value);
        }
        return map;
    }
  • UserBean
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;

/**
 * @author admin
 * @date 2021/8/10
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class UserBean {

    private int id;
    private String name;
    private int age;
    private String sex;


    public UserBean(String[] values) {
        this.id = Integer.parseInt(values[0]);
        this.name = values[1];
        this.age = Integer.parseInt(values[2]);
        this.sex = values[3];
    }


}
  • Requests
    indexRequest :新增:(id 存储则覆盖,不存在则插入)
    deleteRequest:删除
    bulkRequest:批操作

  • BulkFlush 刷新策略

es 为了提高性能,并不会立即进行保存,每一个批次数据量达到一定量之后再刷新到数据库中。

setBulkFlushMaxActions:为每个批量请求设置要缓冲的最大操作数。您可以通过 -1 来禁用它。
setBulkFlushMaxSizeMb:设置每个批量请求的缓冲操作的最大大小(以 mb 为单位)。您可以通过 -1 来禁用它
setBulkFlushInterval:设置批量刷新间隔,以毫秒为单位。您可以通过 -1 来禁用它。
setBulkFlushBackoff:设置是否启用批量刷新退避行为。
setBulkFlushBackoffType:设置刷新批量请求时要使用的返回类型。
setBulkFlushBackoffRetries:设置刷新批量请求时退避尝试的最大重试次数。
setBulkFlushBackoffDelay:设置刷新批量请求时每次退避尝试之间的延迟量,以毫秒为单位。

  • 输入
[admin@hadoop102 ~]$ nc -lk 9999
1,zhangsan,18,M
2,lisi,19,F
  • es
GET /user_index/_search
{
  "query": {
    "match_all": {}
  }
}

-输出

{
  "took" : 16,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 2,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "user_index",
        "_type" : "_doc",
        "_id" : "AafpL3sBamyuoJRi0TBG",
        "_score" : 1.0,
        "_source" : {
          "sex" : "F",
          "name" : "lisi",
          "id" : 2,
          "age" : 19
        }
      },
      {
        "_index" : "user_index",
        "_type" : "_doc",
        "_id" : "AKfpL3sBamyuoJRisjCU",
        "_score" : 1.0,
        "_source" : {
          "sex" : "M",
          "name" : "zhangsan",
          "id" : 1,
          "age" : 18
        }
      }
    ]
  }
}
  • 异常处理

异常信息:

The number of object passed must be even but was [1]

异常产生原因:参考Elasticsearch报异常:The number of object passed must be even but was [1]解决过程

return Requests.indexRequest()
                                .index("user_index")
                                .type("_doc")
                                .source(objectToMap(element));

.source 数据类型必须为k-v类型,最开始我是直接将element传递到source()中,所以出现了上面的错误。

解决方案:将 Bean转换成Map,这样就满足了k-v结构类型。

/**
     * 将Object对象里面的属性和值转化成Map对象
     *
     * @param obj
     * @return
     * @throws IllegalAccessException
     */
    public static Map<String, Object> objectToMap(Object obj) throws IllegalAccessException {
        Map<String, Object> map = new HashMap<>();
        Class<?> clazz = obj.getClass();
        for (Field field : clazz.getDeclaredFields()) {
            field.setAccessible(true);
            String fieldName = field.getName();
            Object value = field.get(obj);
            map.put(fieldName, value);
        }
        return map;
    }

自定义Sink

所有的sink类实现需要实现org.apache.flink.streaming.api.functions.sink.SinkFunction接口,实现里面的invoke方法。

public interface SinkFunction<IN> extends Function, Serializable {
  • 准备一张数据表
CREATE table t_user(
    id int not null primary key,
    name varchar(20) not null,
    age int not null ,
    sex varchar(20) not null
);
  • 导入依赖
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>
  • 方式一: 自定义sink 将数据写入到mysql
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

/**
 * @author admin
 * @date 2021/8/10
 */
public class CustomSink {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        /**
         * 从网络中读取数据
         * 参数:ip,段口,分隔符,重试次数
         */
        SocketTextStreamFunction socketText = new SocketTextStreamFunction("hadoop102", 9999, "\n", 3);
        DataStreamSource<String> source = env.addSource(socketText);


        // 将数据转换成对对象
        SingleOutputStreamOperator<UserBean> map = source.map(e -> new UserBean(e.split(",")));

        map.addSink(new MySink());

        map.print();

        env.execute();



    }

    private static class  MySink implements SinkFunction<UserBean>{

        // 加载驱动类
        private static final String URL = "jdbc:mysql://hadoop102:3306/demo?user=root&password=123321";
        private static final String INSERT_URL="insert into t_user values(?,?,?,?)";


        @Override
        public void invoke(UserBean value, Context context) throws Exception {
            Class.forName("com.mysql.jdbc.Driver"); // 加载驱动
            Connection conn =DriverManager.getConnection(URL); // 获取数据库连接
            PreparedStatement statement = conn.prepareStatement(INSERT_URL);

            statement.setInt(1,value.getId());
            statement.setString(2,value.getName());
            statement.setInt(3,value.getAge());
            statement.setString(4,value.getSex());

            statement.execute();

            statement.close();
            conn.close();
        }

    }
}
  • 输入
[admin@hadoop102 ~]$ nc -lk 9999
1,zhangsan,18,M
2,lisi,19,F
  • 查询数据库
mysql> select * from t_user;
+----+----------+-----+-----+
| id | name     | age | sex |
+----+----------+-----+-----+
|  1 | zhangsan |  18 | M   |
|  2 | lisi     |  19 | F   |
+----+----------+-----+-----+
2 rows in set (0.00 sec)
  • 方式二: 自定义sink 将数据写入到mysql

方式一明显是有问题的,每次调用invoke都会创建一次连接,这样是很不合理的,所以需要进行优化。

使用org.apache.flink.streaming.api.functions.sink.RichSinkFunction,配合生命周期使用,创建数据库连接。

   private static class  MySink2 extends RichSinkFunction<UserBean> {



        // 加载驱动类
        private static final String URL = "jdbc:mysql://hadoop102:3306/demo?user=root&password=123321";
        private static final String INSERT_URL="insert into t_user values(?,?,?,?)";

        private  Connection conn;
        private  PreparedStatement statement;

        /**
         * 程序启动是调用
         * @param parameters
         * @throws Exception
         */
        @Override
        public void open(Configuration parameters) throws Exception {
            Class.forName("com.mysql.jdbc.Driver"); // 加载驱动
            conn = DriverManager.getConnection(URL); // 获取数据库连接
            statement = conn.prepareStatement(INSERT_URL);
        }

        @Override
        public void invoke(UserBean value, Context context) throws Exception {


            statement.setInt(1,value.getId());
            statement.setString(2,value.getName());
            statement.setInt(3,value.getAge());
            statement.setString(4,value.getSex());

            statement.execute();

        }

        /**
         * 程序关闭调用
         * @throws Exception
         */
        @Override
        public void close() throws Exception {
            if(statement!=null){
                statement.close();
            }

            if (conn != null) {
                conn.close();
            }
        }
    }

openclose 调用是与sloat并发度有关。并行度有多少就调用多少次。

最终使用MySink2

    map.addSink(new MySink2());
  • 测试
[admin@hadoop102 ~]$ nc -lk 9999

4,lifeifei,26,F
3,wangfenyu,47,M
  • mysql 查询,加上上面插入的2条共4条
mysql> select * from t_user;
+----+-----------+-----+-----+
| id | name      | age | sex |
+----+-----------+-----+-----+
|  1 | zhangsan  |  18 | M   |
|  2 | lisi      |  19 | F   |
|  3 | wangfenyu |  47 | M   |
|  4 | lifeifei  |  26 | F   |
+----+-----------+-----+-----+
4 rows in set (0.00 sec)

官方的MysqlSink

虽然使用MySink2的mysql操作方式,减少连接次数,但是依然还有很多问题,比如flink是一直不间断的执行的,mysql也不可能会一直连接着(mysql服务会回收长时间连接的服务),若服务器的连接断了,没有进行重新连接,这样数据就无法保存到mysql中。当时这些问题都不用去考虑,因为有fink官网都给我们准备好了。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_2.11</artifactId>
    <version>1.13.0</version>
</dependency>
  • 程序
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;

/**
 * @author admin
 * @date 2021/8/10
 */
public class MysqlSink {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        /**
         * 从网络中读取数据
         * 参数:ip,段口,分隔符,重试次数
         */
        SocketTextStreamFunction socketText = new SocketTextStreamFunction("hadoop102", 9999, "\n", 3);
        DataStreamSource<String> source = env.addSource(socketText);


        // 将数据转换成对对象
        SingleOutputStreamOperator<UserBean> map = source.map(e -> new UserBean(e.split(",")));

        // 执行插入操作
        JdbcStatementBuilder<UserBean> builder = (JdbcStatementBuilder<UserBean>) (statement, userBean) -> {
            statement.setInt(1, userBean.getId());
            statement.setString(2, userBean.getName());
            statement.setInt(3, userBean.getAge());
            statement.setString(4, userBean.getSex());
        };

        // 执行配置
        JdbcExecutionOptions executionOptions = new JdbcExecutionOptions.Builder()
                .withBatchSize(1) // 测试,所以设置为1
                .withBatchIntervalMs(200)
                .withMaxRetries(5)
                .build();

        // 连接配置
        JdbcConnectionOptions connectionOptions = new JdbcConnectionOptions
                .JdbcConnectionOptionsBuilder()
                .withUrl("jdbc:mysql://hadoop102:3306/demo?useSSL=false") // url
                .withDriverName("com.mysql.jdbc.Driver") // driver
                .withUsername("root") // 账号
                .withPassword("123321") // 密码
                .withConnectionCheckTimeoutSeconds(30) // 连接超时时间 单位:秒
                .build();

        // 插入的sql语句
        String insertSql = "insert into t_user(id,name,age,sex) values(?,?,?,?)";

        SinkFunction<UserBean> sink = JdbcSink.sink(insertSql,builder,executionOptions,connectionOptions);
        map.print();
        map.addSink(sink);

        env.execute();

    }
}

相关文章

网友评论

    本文标题:Flink(1.13) 中的Sink

    本文链接:https://www.haomeiwen.com/subject/wapxbltx.html