美文网首页
hadoop和spark读取GBK编码乱码

hadoop和spark读取GBK编码乱码

作者: MacDonald | 来源:发表于2017-12-22 15:54 被阅读171次

    转自 https://www.cnblogs.com/teagnes/p/6112019.html

    其实在很久之前洒家刚刚搞hadoop的时候就遇到了这个问题,只是那个时候只知道读取hdfs上的文本文件的时候一定要是utf8编码,不然就会出现乱码,后来倒也没遇到这个问题,毕竟平时的数据都是从hive里来的,那时候也不懂这是为什么,最近又遇到了,有感于斯,从新总结一下,如何在hadoop和spark上处理读取GBK编码文件

    首先来看一下为什么会出现这个问题, 下面是一个最简单的spark的wordcount程序,sc.textFile(filePath)方法从文本文件创建RDD,传入文件路径filePath,查看textFile方法, 可以看到,实际上调用了TextInputformat类来解析文本文件,熟悉hadoop的一定知道,mapreudce默认的解析文件文件的类就是TextInputformat,并返回了K V键值对

    object Wordcount {
      def main(args: Array[String]) {
         val filePath = "";
         val conf = new SparkConf().setAppName("WordCountApp")
         val sc = new SparkContext(conf)
         val line = sc.textFile(filePath)
         line.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect.foreach(println)
         sc.stop
      }
    }
    

    def textFile(
       path: String,
       minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
          assertNotStopped()
          hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
          minPartitions).map(pair => pair._2.toString).setName(path)
    }
    

    继续看TextInputFormat源码,TextInputFormat有两个作用。

    一是对输入文件分片,mapreduce会为每一个分片都起动一个map任务来处理,分片的任务由TextInputFormat的父类FileInputFormat完成,这里就不做深究了, TextInputFormat中只有读取数据的方法。

    二是从分片的数据,生成k v键值对也就是Recordreader ,createRecordReader方法不断的生成Recordreader对像并交给map端去处理 ,下面的代码中在delimiter.getBytes(Charsets.UTF_8)设置了字符集,很可惜这里并不是读取文件时使用的,而是指定了redcord的分割符,默认情况下是每一行生成一个record,一般情况下我们不需要使用到这个参数,只有在设置多行作为一个record输入的时候才会用到,可以通过设置参数“textinputformat.record.delimiter”来设置,那我们是不是可以在代码中指定我们的读取文件的字符集呢?

    package org.apache.hadoop.mapreduce.lib.input;
    
    import org.apache.hadoop.classification.InterfaceAudience;
    import org.apache.hadoop.classification.InterfaceStability;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.compress.CompressionCodec;
    import org.apache.hadoop.io.compress.CompressionCodecFactory;
    import org.apache.hadoop.io.compress.SplittableCompressionCodec;
    import org.apache.hadoop.mapreduce.InputFormat;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.JobContext;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    
    import com.google.common.base.Charsets;
    
    /** An {@link InputFormat} for plain text files.  Files are broken into lines.
     * Either linefeed or carriage-return are used to signal end of line.  Keys are
     * the position in the file, and values are the line of text.. */
    @InterfaceAudience.Public
    @InterfaceStability.Stable
    public class TextInputFormat extends FileInputFormat<LongWritable, Text> {
    
      @Override
      public RecordReader<LongWritable, Text> 
        createRecordReader(InputSplit split,
                           TaskAttemptContext context) {
        String delimiter = context.getConfiguration().get(
            "textinputformat.record.delimiter");
        byte[] recordDelimiterBytes = null;
        if (null != delimiter)
          recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
        return new LineRecordReader(recordDelimiterBytes);
      }
    
      @Override
      protected boolean isSplitable(JobContext context, Path file) {
        final CompressionCodec codec =
          new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
        if (null == codec) {
          return true;
        }
        return codec instanceof SplittableCompressionCodec;
      }
    
    }
    

    继续看LineRecordReader类,查看其中的nextKeyValue方法,该方法是具体生成k v记录时候使用的,这里有两个很意思的点,需要注意。

    一是skipUtfByteOrderMark()方法,该方法处理了当文件是有bom的utf-8格式的时候,读取程序自动跳过bom,有待具体测试一下

    二是如果我们读到的行跨块了怎么处理?因为hdfs是按文件的大小来切分文件的,难免一行数据被切分到两个块中去了,这里有相应的处理的逻辑,这里就不再详细说明了

    public boolean nextKeyValue() throws IOException {
        if (key == null) {
          key = new LongWritable();
        }
        key.set(pos);
        if (value == null) {
          value = new Text();
        }
        int newSize = 0;
        // We always read one extra line, which lies outside the upper
        // 具体读取记录的方法split limit i.e. (end - 1)
        while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
          if (pos == 0) {
            newSize = skipUtfByteOrderMark();
          } else {
            newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
            pos += newSize;
          }
    
          if ((newSize == 0) || (newSize < maxLineLength)) {
            break;
          }
    
          // line too long. try again
          LOG.info("Skipped line of size " + newSize + " at pos " + 
                   (pos - newSize));
        }
        if (newSize == 0) {
          key = null;
          value = null;
          return false;
        } else {
          return true;
        }
      }
    

    这里的value就是在map端获得的value,看它是怎么被赋值的,可以看到是从输入流中读取数据,这里有两种读取的方法,默认readDefaultLine的读取一行和通过自定义readCustomLine的分隔符的跨行

    public int readLine(Text str, int maxLineLength,
                          int maxBytesToConsume) throws IOException {
        if (this.recordDelimiterBytes != null) {
          return readCustomLine(str, maxLineLength, maxBytesToConsume);
        } else {
          return readDefaultLine(str, maxLineLength, maxBytesToConsume);
        }
      }
    

    默认的方式读取文件并没有用到自定义的分割符,而value获取到的还是输入流中的字节码,所以value的获得的依旧是文件的字节码,并没有做过处理,那么我们是不是可以在map端获取到的字节码按照“GBK”的方式来解码读取呢?经过测试之后发现的确是可以正常读取的

    private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)
      throws IOException {
        /* We're reading data from in, but the head of the stream may be
         * already buffered in buffer, so we have several cases:
         * 1. No newline characters are in the buffer, so we need to copy
         *    everything and read another buffer from the stream.
         * 2. An unambiguously terminated line is in buffer, so we just
         *    copy to str.
         * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
         *    in CR.  In this case we copy everything up to CR to str, but
         *    we also need to see what follows CR: if it's LF, then we
         *    need consume LF as well, so next call to readLine will read
         *    from after that.
         * We use a flag prevCharCR to signal if previous character was CR
         * and, if it happens to be at the end of the buffer, delay
         * consuming it until we have a chance to look at the char that
         * follows.
         */
        str.clear();
        int txtLength = 0; //tracks str.getLength(), as an optimization
        int newlineLength = 0; //length of terminating newline
        boolean prevCharCR = false; //true of prev char was CR
        long bytesConsumed = 0;
        do {
          int startPosn = bufferPosn; //starting from where we left off the last time
          if (bufferPosn >= bufferLength) {
            startPosn = bufferPosn = 0;
            if (prevCharCR) {
              ++bytesConsumed; //account for CR from previous read
            }
            bufferLength = fillBuffer(in, buffer, prevCharCR);
            if (bufferLength <= 0) {
              break; // EOF
            }
          }
          for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
            if (buffer[bufferPosn] == LF) {
              newlineLength = (prevCharCR) ? 2 : 1;
              ++bufferPosn; // at next invocation proceed from following byte
              break;
            }
            if (prevCharCR) { //CR + notLF, we are at notLF
              newlineLength = 1;
              break;
            }
            prevCharCR = (buffer[bufferPosn] == CR);
          }
          int readLength = bufferPosn - startPosn;
          if (prevCharCR && newlineLength == 0) {
            --readLength; //CR at the end of the buffer
          }
          bytesConsumed += readLength;
          int appendLength = readLength - newlineLength;
          if (appendLength > maxLineLength - txtLength) {
            appendLength = maxLineLength - txtLength;
          }
          if (appendLength > 0) {
            str.append(buffer, startPosn, appendLength);
            txtLength += appendLength;
          }
        } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
    
        if (bytesConsumed > Integer.MAX_VALUE) {
          throw new IOException("Too many bytes before newline: " + bytesConsumed);
        }
        return (int)bytesConsumed;
      }
    

    解决方法:

    spark读取GBK编码文件

    将value的字节码按照GBK的方式读取变成字符串,运行之后能够正常显示

    object GBKtoUTF8 {
     
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
          .setAppName(" GBK TO UTF8")
          .setMaster("local")
     
        val sc = new SparkContext(conf)
     
        val rdd = sc.hadoopFile("F:\\data\\score.txt", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], 1)
          .map(p => new String(p._2.getBytes, 0, p._2.getLength, "GBK"))
          .flatMap(s => s.split(","))
          .map(x => (x, 1))
          .reduceByKey(_ + _)
          .collect
          .foreach(println)
      }
    }
    

    hadoop读取GBK编码文件

    public void map(LongWritable key, Text value, Context context) {
            try {
    
                String line;
                line = new String(value.getBytes(), 0, value.getLength(), "GBK");//使用GBK解析字节码 ,转成String
                logger.info("gbkstr " + line);
                
                //不要使用toStirng方法来获取字符串
                //line = value.toString();    
                //logger.info("str " + line);
                
                  String[] item = line.split(",");
                for (String str : item) {
                    outkey = new Text(str);
                    context.write(outkey, outvalue);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    

    相关文章

      网友评论

          本文标题:hadoop和spark读取GBK编码乱码

          本文链接:https://www.haomeiwen.com/subject/iyqxgxtx.html