美文网首页Spark专题
SparkStreaming从kafka读取数据编码问题(Jav

SparkStreaming从kafka读取数据编码问题(Jav

作者: leep_li | 来源:发表于2018-06-06 13:32 被阅读0次

    SparkStreaming从kafka读取文件流时(Java),默认是utf-8的,如果源文件的编码不是utf-8,那就会出现乱码现象,但是kafka的官网参数里没有关于编码的参数,不过kafka的源码里面是有的。源码如下:

    class String Encoder(props:VerifiableProperties=null) extends Encoder[String]{

    val encoding = if (props==null) "UTF8" else props.getString("serializer.encoding","UTF8")

     根据上面的源码,设置serializer.encoding就可以了,经过测试,设置serializer.encoding确实是有效的。下面是SparkStream从kafka读取数据代码:

    ...

    HashSet_TopicsSet = newHashSet(Arrays.asList("a1_topic","a2_topic"));

    Map_KafkaParams = new HashMap();

    _KafkaParams.put("metadata.broker.list","dn1:9092,nn0:9092,dn0:9092");

    _KafkaParams.put("group.id", "groupid");

    _KafkaParams.put("fetch.message.max.bytes","5120000");

    _KafkaParams.put("serializer.encoding", "gb18030");

    JavaDStream_MessagesLines = KafkaUtils.createDirectStream(_JavaStreamingContext,String.class, String.class,StringDecoder.class,StringDecoder.class,_KafkaParams, _TopicsSet)

    .map(newFunction, String>() {

    publicString call(Tuple2 tuple2) {

    returntuple2._2();

    }

    });

    ...

    相关文章

      网友评论

        本文标题:SparkStreaming从kafka读取数据编码问题(Jav

        本文链接:https://www.haomeiwen.com/subject/aksbsftx.html