hadoop io 源码阅读

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                context.write(word, one);

    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            context.write(key, result);

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);


  • 什么是序列化?
  • 什么是反序列化?
  • 序列化用途
  • 作为一种持久化格式。
  • 作为一种通信的数据格式。
  • 作为一种数据拷贝、克隆机制。
  • Java序列化和反序列化
  • 创建一个对象实现了Serializable
  • 序列化:ObjectOutputStream.writeObject(序列化对象)。反序列化:ObjectInputStream.readObject()返回序列化对象
  • 为什么Hadoop不直接使用java序列化?


    public interface Writable {  
      void write(DataOutput out) throws IOException;  
      void readFields(DataInput in) throws IOException;  
Java基本类型 Writable 序列化后长度
布尔型(boolean) BooleanWritable 1
字节型(byte) ByteWritable 1
整型(int) IntWritable
浮点型(float) FloatWritable 4
长整型(long) LongWritable
双精度浮点型(double) DoubleWritable 8




package org.apache.hadoop.io;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

/** A WritableComparable for ints. */
public class IntWritable implements WritableComparable<IntWritable> {
  private int value;

  public IntWritable() {}

  public IntWritable(int value) { set(value); }

  /** Set the value of this IntWritable. */
  public void set(int value) { this.value = value; }

  /** Return the value of this IntWritable. */
  public int get() { return value; }

  public void readFields(DataInput in) throws IOException {
    value = in.readInt();

  public void write(DataOutput out) throws IOException {

  /** Returns true iff <code>o</code> is a IntWritable with the same value. */
  public boolean equals(Object o) {
    if (!(o instanceof IntWritable))
      return false;
    IntWritable other = (IntWritable)o;
    return this.value == other.value;

  public int hashCode() {
    return value;

  /** Compares two IntWritables. */
  public int compareTo(IntWritable o) {
    int thisValue = this.value;
    int thatValue = o.value;
    return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));

  public String toString() {
    return Integer.toString(value);

  /** A Comparator optimized for IntWritable. */ 
  public static class Comparator extends WritableComparator {
    public Comparator() {
    public int compare(byte[] b1, int s1, int l1,
                       byte[] b2, int s2, int l2) {
      int thisValue = readInt(b1, s1);
      int thatValue = readInt(b2, s2);
      return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));

  static {                                        // register this comparator
    WritableComparator.define(IntWritable.class, new Comparator());

代码中的static块调用WritableComparator的static方法define()用来注册上面这个Comparator,就是将其加入WritableComparator的comparators成员中,comparators是HashMap类型且是static的。这样,就告诉WritableComparator,当我使用WritableComparator.get(IntWritable.class)方法的时候,你返回我注册的这个Comparator给我[对IntWritable来说就是IntWritable.Comparator],然后我就可以使用comparator.compare(byte[] b1, int s1, int l1,byte[] b2, int s2, int l2)来比较b1和b2,而不需要将它们反序列化成对象[像下面代码中]。comparator.compare(byte[] b1, int s1, int l1,byte[] b2, int s2, int l2)中的readInt()是从WritableComparator继承来的,它将IntWritable的value从byte数组中通过移位转换出来。

RawComparator<IntWritable> comparator = WritableComparator.get(IntWritable.class);  


package org.apache.hadoop.io;

import java.io.*;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

 * A WritableComparable for booleans. 
public class BooleanWritable implements WritableComparable<BooleanWritable> {
  private boolean value;

  public BooleanWritable() {};

  public BooleanWritable(boolean value) {

   * Set the value of the BooleanWritable
  public void set(boolean value) {
    this.value = value;

   * Returns the value of the BooleanWritable
  public boolean get() {
    return value;

  public void readFields(DataInput in) throws IOException {
    value = in.readBoolean();

  public void write(DataOutput out) throws IOException {

  public boolean equals(Object o) {
    if (!(o instanceof BooleanWritable)) {
      return false;
    BooleanWritable other = (BooleanWritable) o;
    return this.value == other.value;

  public int hashCode() {
    return value ? 0 : 1;

  public int compareTo(BooleanWritable o) {
    boolean a = this.value;
    boolean b = o.value;
    return ((a == b) ? 0 : (a == false) ? -1 : 1);
  public String toString() {
    return Boolean.toString(get());

   * A Comparator optimized for BooleanWritable. 
  public static class Comparator extends WritableComparator {
    public Comparator() {

    public int compare(byte[] b1, int s1, int l1,
                       byte[] b2, int s2, int l2) {
      return compareBytes(b1, s1, l1, b2, s2, l2);

  static {
    WritableComparator.define(BooleanWritable.class, new Comparator());


package org.apache.hadoop.io;

import java.io.*;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

/** A WritableComparable for a single byte. */
public class ByteWritable implements WritableComparable<ByteWritable> {
  private byte value;

  public ByteWritable() {}

  public ByteWritable(byte value) { set(value); }

  /** Set the value of this ByteWritable. */
  public void set(byte value) { this.value = value; }

  /** Return the value of this ByteWritable. */
  public byte get() { return value; }

  public void readFields(DataInput in) throws IOException {
    value = in.readByte();

  public void write(DataOutput out) throws IOException {

  /** Returns true iff <code>o</code> is a ByteWritable with the same value. */
  public boolean equals(Object o) {
    if (!(o instanceof ByteWritable)) {
      return false;
    ByteWritable other = (ByteWritable)o;
    return this.value == other.value;

  public int hashCode() {
    return (int)value;

  /** Compares two ByteWritables. */
  public int compareTo(ByteWritable o) {
    int thisValue = this.value;
    int thatValue = o.value;
    return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));

  public String toString() {
    return Byte.toString(value);

  /** A Comparator optimized for ByteWritable. */ 
  public static class Comparator extends WritableComparator {
    public Comparator() {

    public int compare(byte[] b1, int s1, int l1,
                       byte[] b2, int s2, int l2) {
      byte thisValue = b1[s1];
      byte thatValue = b2[s2];
      return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));

  static {                                        // register this comparator
    WritableComparator.define(ByteWritable.class, new Comparator());


package org.apache.hadoop.io;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

/** A WritableComparable for longs. */
public class LongWritable implements WritableComparable<LongWritable> {
  private long value;

  public LongWritable() {}

  public LongWritable(long value) { set(value); }

  /** Set the value of this LongWritable. */
  public void set(long value) { this.value = value; }

  /** Return the value of this LongWritable. */
  public long get() { return value; }

  public void readFields(DataInput in) throws IOException {
    value = in.readLong();

  public void write(DataOutput out) throws IOException {

  /** Returns true iff <code>o</code> is a LongWritable with the same value. */
  public boolean equals(Object o) {
    if (!(o instanceof LongWritable))
      return false;
    LongWritable other = (LongWritable)o;
    return this.value == other.value;

  public int hashCode() {
    return (int)value;

  /** Compares two LongWritables. */
  public int compareTo(LongWritable o) {
    long thisValue = this.value;
    long thatValue = o.value;
    return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));

  public String toString() {
    return Long.toString(value);

  /** A Comparator optimized for LongWritable. */ 
  public static class Comparator extends WritableComparator {
    public Comparator() {

    public int compare(byte[] b1, int s1, int l1,
                       byte[] b2, int s2, int l2) {
      long thisValue = readLong(b1, s1);
      long thatValue = readLong(b2, s2);
      return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));

  /** A decreasing Comparator optimized for LongWritable. */ 
  public static class DecreasingComparator extends Comparator {
    public int compare(WritableComparable a, WritableComparable b) {
      return super.compare(b, a);
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
      return super.compare(b2, s2, l2, b1, s1, l1);

  static {                                       // register default comparator
    WritableComparator.define(LongWritable.class, new Comparator());



package org.apache.hadoop.io;

import java.io.*;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

/** A WritableComparable for floats. */
public class FloatWritable implements WritableComparable<FloatWritable> {
  private float value;

  public FloatWritable() {}

  public FloatWritable(float value) { set(value); }

  /** Set the value of this FloatWritable. */
  public void set(float value) { this.value = value; }

  /** Return the value of this FloatWritable. */
  public float get() { return value; }

  public void readFields(DataInput in) throws IOException {
    value = in.readFloat();

  public void write(DataOutput out) throws IOException {

  /** Returns true iff <code>o</code> is a FloatWritable with the same value. */
  public boolean equals(Object o) {
    if (!(o instanceof FloatWritable))
      return false;
    FloatWritable other = (FloatWritable)o;
    return this.value == other.value;

  public int hashCode() {
    return Float.floatToIntBits(value);

  /** Compares two FloatWritables. */
  public int compareTo(FloatWritable o) {
    float thisValue = this.value;
    float thatValue = o.value;
    return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));

  public String toString() {
    return Float.toString(value);

  /** A Comparator optimized for FloatWritable. */ 
  public static class Comparator extends WritableComparator {
    public Comparator() {
    public int compare(byte[] b1, int s1, int l1,
                       byte[] b2, int s2, int l2) {
      float thisValue = readFloat(b1, s1);
      float thatValue = readFloat(b2, s2);
      return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));

  static {                                        // register this comparator
    WritableComparator.define(FloatWritable.class, new Comparator());



package org.apache.hadoop.io;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

 * Writable for Double values.
public class DoubleWritable implements WritableComparable<DoubleWritable> {

  private double value = 0.0;
  public DoubleWritable() {
  public DoubleWritable(double value) {
  public void readFields(DataInput in) throws IOException {
    value = in.readDouble();

  public void write(DataOutput out) throws IOException {
  public void set(double value) { this.value = value; }
  public double get() { return value; }

   * Returns true iff <code>o</code> is a DoubleWritable with the same value.
  public boolean equals(Object o) {
    if (!(o instanceof DoubleWritable)) {
      return false;
    DoubleWritable other = (DoubleWritable)o;
    return this.value == other.value;
  public int hashCode() {
    return (int)Double.doubleToLongBits(value);
  public int compareTo(DoubleWritable o) {
    return (value < o.value ? -1 : (value == o.value ? 0 : 1));
  public String toString() {
    return Double.toString(value);

  /** A Comparator optimized for DoubleWritable. */ 
  public static class Comparator extends WritableComparator {
    public Comparator() {

    public int compare(byte[] b1, int s1, int l1,
                       byte[] b2, int s2, int l2) {
      double thisValue = readDouble(b1, s1);
      double thatValue = readDouble(b2, s2);
      return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));

  static {                                        // register this comparator
    WritableComparator.define(DoubleWritable.class, new Comparator());






package org.apache.hadoop.io;

import java.io.*;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

/** A WritableComparable for longs in a variable-length format. Such values take
 *  between one and five bytes.  Smaller values take fewer bytes.
 *  @see org.apache.hadoop.io.WritableUtils#readVLong(DataInput)
public class VLongWritable implements WritableComparable<VLongWritable> {
  private long value;

  public VLongWritable() {}

  public VLongWritable(long value) { set(value); }

  /** Set the value of this LongWritable. */
  public void set(long value) { this.value = value; }

  /** Return the value of this LongWritable. */
  public long get() { return value; }

  public void readFields(DataInput in) throws IOException {
    value = WritableUtils.readVLong(in);

  public void write(DataOutput out) throws IOException {
    WritableUtils.writeVLong(out, value);

  /** Returns true iff <code>o</code> is a VLongWritable with the same value. */
  public boolean equals(Object o) {
    if (!(o instanceof VLongWritable))
      return false;
    VLongWritable other = (VLongWritable)o;
    return this.value == other.value;

  public int hashCode() {
    return (int)value;

  /** Compares two VLongWritables. */
  public int compareTo(VLongWritable o) {
    long thisValue = this.value;
    long thatValue = o.value;
    return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));

  public String toString() {
    return Long.toString(value);





package org.apache.hadoop.io;

import java.io.*;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

/** A WritableComparable for integer values stored in variable-length format.
 * Such values take between one and five bytes.  Smaller values take fewer bytes.
 * @see org.apache.hadoop.io.WritableUtils#readVInt(DataInput)
public class VIntWritable implements WritableComparable<VIntWritable> {
  private int value;

  public VIntWritable() {}

  public VIntWritable(int value) { set(value); }

  /** Set the value of this VIntWritable. */
  public void set(int value) { this.value = value; }

  /** Return the value of this VIntWritable. */
  public int get() { return value; }

  public void readFields(DataInput in) throws IOException {
    value = WritableUtils.readVInt(in);

  public void write(DataOutput out) throws IOException {
    WritableUtils.writeVInt(out, value);

  /** Returns true iff <code>o</code> is a VIntWritable with the same value. */
  public boolean equals(Object o) {
    if (!(o instanceof VIntWritable))
      return false;
    VIntWritable other = (VIntWritable)o;
    return this.value == other.value;

  public int hashCode() {
    return value;

  /** Compares two VIntWritables. */
  public int compareTo(VIntWritable o) {
    int thisValue = this.value;
    int thatValue = o.value;
    return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
  public String toString() {
    return Integer.toString(value);





public static void writeVLong(DataOutput stream, long i) throws IOException {
    if (i >= -112 && i <= 127) {
    int len = -112;
    if (i < 0) {
      i ^= -1L; //连符号一起取反+1
      len = -120;
    long tmp = i;//到这里,i一定是正数
    while (tmp != 0) {//然后用循环计算一下长度,i越大,实际长度就越大,偏离长度起始值[原来len]越大,len值越小。
      tmp = tmp >> 8;
    len = (len < -120) ? -(len + 120) : -(len + 112);//计算出了长度,不包含第一个byte【表示长度的byte】
    for (int idx = len; idx != 0; idx--) {//这里将i的二进制码从左到右8位8位的拿出来,然后写入到流中。
      int shiftbits = (idx - 1) * 8;
      long mask = 0xFFL << shiftbits;
      stream.writeByte((byte)((i & mask) >> shiftbits));


public static long readVLong(DataInput stream) throws IOException {  
    byte firstByte = stream.readByte();  
    int len = decodeVIntSize(firstByte);  
    if (len == 1) {  
      return firstByte;  
    long i = 0;  
    for (int idx = 0; idx < len-1; idx++) {  
      byte b = stream.readByte();  
      i = i << 8;  
      i = i | (b & 0xFF);  
    return (isNegativeVInt(firstByte) ? (i ^ -1L) : i);  



public static int decodeVIntSize(byte value) {  
    if (value >= -112) {  
      return 1;  
    } else if (value < -120) {  
      return -119 - value;  
    return -111 - value;  




如果WritableComparator.get()没有得到注册的Comparator,则会创建一个新的Comparator,其实是WritableComparator的实例,然后当你使用 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)进行比较,它会去使用你要比较的Writable的实现的readFields()方法读出value来。
比如,VIntWritable没有注册,我们get()时它就构造一个WritableComparator,然后设置key1,key2,buffer,keyClass,当你使用 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) ,则使用VIntWritable.readField从编码后的byte[]中读取value值再进行比较。

package org.apache.hadoop.io;

import java.io.DataInput;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;

/** A Comparator for {@link WritableComparable}s.
 * <p>This base implementation uses the natural ordering.  To define alternate
 * orderings, override {@link #compare(WritableComparable,WritableComparable)}.
 * <p>One may optimize compare-intensive operations by overriding
 * {@link #compare(byte[],int,int,byte[],int,int)}.  Static utility methods are
 * provided to assist in optimized implementations of this method.
public class WritableComparator implements RawComparator, Configurable {

  private static final ConcurrentHashMap<Class, WritableComparator> comparators 
          = new ConcurrentHashMap<Class, WritableComparator>(); // registry

  private Configuration conf;

  /** For backwards compatibility. **/
  public static WritableComparator get(Class<? extends WritableComparable> c) {
    return get(c, null);

  /** Get a comparator for a {@link WritableComparable} implementation. */
  public static WritableComparator get(
      Class<? extends WritableComparable> c, Configuration conf) {
    WritableComparator comparator = comparators.get(c);
    if (comparator == null) {
      // force the static initializers to run
      // look to see if it is defined now
      comparator = comparators.get(c);
      // if not, use the generic one
      if (comparator == null) {
        comparator = new WritableComparator(c, conf, true);
    // Newly passed Configuration objects should be used.
    ReflectionUtils.setConf(comparator, conf);
    return comparator;

  public void setConf(Configuration conf) {
    this.conf = conf;

  public Configuration getConf() {
    return conf;

   * Force initialization of the static members.
   * As of Java 5, referencing a class doesn't force it to initialize. Since
   * this class requires that the classes be initialized to declare their
   * comparators, we force that initialization to happen.
   * @param cls the class to initialize
  private static void forceInit(Class<?> cls) {
    try {
      Class.forName(cls.getName(), true, cls.getClassLoader());
    } catch (ClassNotFoundException e) {
      throw new IllegalArgumentException("Can't initialize class " + cls, e);

  /** Register an optimized comparator for a {@link WritableComparable}
   * implementation. Comparators registered with this method must be
   * thread-safe. */
  public static void define(Class c, WritableComparator comparator) {
    comparators.put(c, comparator);

  private final Class<? extends WritableComparable> keyClass;
  private final WritableComparable key1;
  private final WritableComparable key2;
  private final DataInputBuffer buffer;

  protected WritableComparator() {

  /** Construct for a {@link WritableComparable} implementation. */
  protected WritableComparator(Class<? extends WritableComparable> keyClass) {
    this(keyClass, null, false);

  protected WritableComparator(Class<? extends WritableComparable> keyClass,
      boolean createInstances) {
    this(keyClass, null, createInstances);

  protected WritableComparator(Class<? extends WritableComparable> keyClass,
                               Configuration conf,
                               boolean createInstances) {
    this.keyClass = keyClass;
    this.conf = (conf != null) ? conf : new Configuration();
    if (createInstances) {
      key1 = newKey();
      key2 = newKey();
      buffer = new DataInputBuffer();
    } else {
      key1 = key2 = null;
      buffer = null;

  /** Returns the WritableComparable implementation class. */
  public Class<? extends WritableComparable> getKeyClass() { return keyClass; }

  /** Construct a new {@link WritableComparable} instance. */
  public WritableComparable newKey() {
    return ReflectionUtils.newInstance(keyClass, conf);

  /** Optimization hook.  Override this to make SequenceFile.Sorter's scream.
   * <p>The default implementation reads the data into two {@link
   * WritableComparable}s (using {@link
   * Writable#readFields(DataInput)}, then calls {@link
   * #compare(WritableComparable,WritableComparable)}.
  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
    try {
      buffer.reset(b1, s1, l1);                   // parse key1
      buffer.reset(b2, s2, l2);                   // parse key2
      buffer.reset(null, 0, 0);                   // clean up reference
    } catch (IOException e) {
      throw new RuntimeException(e);
    return compare(key1, key2);                   // compare them

  /** Compare two WritableComparables.
   * <p> The default implementation uses the natural ordering, calling {@link
   * Comparable#compareTo(Object)}. */
  public int compare(WritableComparable a, WritableComparable b) {
    return a.compareTo(b);

  public int compare(Object a, Object b) {
    return compare((WritableComparable)a, (WritableComparable)b);

  /** Lexicographic order of binary data. */
  public static int compareBytes(byte[] b1, int s1, int l1,
                                 byte[] b2, int s2, int l2) {
    return FastByteComparisons.compareTo(b1, s1, l1, b2, s2, l2);

  /** Compute hash for binary data. */
  public static int hashBytes(byte[] bytes, int offset, int length) {
    int hash = 1;
    for (int i = offset; i < offset + length; i++)
      hash = (31 * hash) + (int)bytes[i];
    return hash;
  /** Compute hash for binary data. */
  public static int hashBytes(byte[] bytes, int length) {
    return hashBytes(bytes, 0, length);

  /** Parse an unsigned short from a byte array. */
  public static int readUnsignedShort(byte[] bytes, int start) {
    return (((bytes[start]   & 0xff) <<  8) +
            ((bytes[start+1] & 0xff)));

  /** Parse an integer from a byte array. */
  public static int readInt(byte[] bytes, int start) {
    return (((bytes[start  ] & 0xff) << 24) +
            ((bytes[start+1] & 0xff) << 16) +
            ((bytes[start+2] & 0xff) <<  8) +
            ((bytes[start+3] & 0xff)));


  /** Parse a float from a byte array. */
  public static float readFloat(byte[] bytes, int start) {
    return Float.intBitsToFloat(readInt(bytes, start));

  /** Parse a long from a byte array. */
  public static long readLong(byte[] bytes, int start) {
    return ((long)(readInt(bytes, start)) << 32) +
      (readInt(bytes, start+4) & 0xFFFFFFFFL);

  /** Parse a double from a byte array. */
  public static double readDouble(byte[] bytes, int start) {
    return Double.longBitsToDouble(readLong(bytes, start));

   * Reads a zero-compressed encoded long from a byte array and returns it.
   * @param bytes byte array with decode long
   * @param start starting index
   * @throws java.io.IOException 
   * @return deserialized long
  public static long readVLong(byte[] bytes, int start) throws IOException {
    int len = bytes[start];
    if (len >= -112) {
      return len;
    boolean isNegative = (len < -120);
    len = isNegative ? -(len + 120) : -(len + 112);
    if (start+1+len>bytes.length)
      throw new IOException(
                            "Not enough number of bytes for a zero-compressed integer");
    long i = 0;
    for (int idx = 0; idx < len; idx++) {
      i = i << 8;
      i = i | (bytes[start+1+idx] & 0xFF);
    return (isNegative ? (i ^ -1L) : i);
   * Reads a zero-compressed encoded integer from a byte array and returns it.
   * @param bytes byte array with the encoded integer
   * @param start start index
   * @throws java.io.IOException 
   * @return deserialized integer
  public static int readVInt(byte[] bytes, int start) throws IOException {
    return (int) readVLong(bytes, start);



