美文网首页
Flink Redis Sink 自定义

Flink Redis Sink 自定义

作者: yljphp | 来源:发表于2019-04-02 15:43 被阅读0次

    背景

    Flink 默认的 RedisMapper 不能设置key的过期时间。

    依赖

    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>2.9.0</version>
    </dependency>
    

    SinkFunction & RichSinkFunction

    image
    image
    image
    image

    Demo

    package com.yljphp.demo.sink
    
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.util.StringUtils
    import redis.clients.jedis.Jedis
    
    object RedisCustomSink extends App {
    
      val env = StreamExecutionEnvironment.getExecutionEnvironment
    
      env.setParallelism(1)
    
      val text: DataStream[String] = env.socketTextStream("localhost", 9000)
    
      val data = text.filter(!StringUtils.isNullOrWhitespaceOnly(_))
        .flatMap(_.split(","))
        .map(MySaveInfo("RedisCustomSink", _))
    
    
      data.addSink(new CustomSinkToRedis)
    
      env.execute("sink_demo")
    }
    
    class CustomSinkToRedis extends RichSinkFunction[MySaveInfo] {
    
      var redisCon: Jedis = _
    
      override def open(parameters: Configuration): Unit = {
        super.open(parameters)
        this.redisCon = new Jedis("localhost", 6379)
      }
    
      override def close(): Unit = {
        super.close()
        if (this.redisCon != null) {
          this.redisCon.close()
        }
      }
    
    
      override def invoke(value: MySaveInfo, context: SinkFunction.Context[_]): Unit = {
        //super.invoke(value, context)
        println(value)
        println("sadd之前"+this.redisCon.ttl(value.key))
        this.redisCon.sadd(value.key, value.value)
        println("sadd之后"+this.redisCon.ttl(value.key))
        if (this.redisCon.ttl(value.key) == -1) {
          this.redisCon.expire(value.key, 60 * 60)
        }
        println("设置过期之后/每次更新后"+this.redisCon.ttl(value.key))
      }
    
    }
    
    case class MySaveInfo(key: String, value: String)
    
    

    socket窗口输入123,345,234,123

    image image image

    参考

    如何自定义 Data Sink

    相关文章

      网友评论

          本文标题:Flink Redis Sink 自定义

          本文链接:https://www.haomeiwen.com/subject/niygbqtx.html