am a67f19de: Adds a few utilities to ProtoBufUtil, integrating over from mainline

Merge commit 'a67f19deb0794784d1b841bc1900328103af03c5'

* commit 'a67f19deb0794784d1b841bc1900328103af03c5':
  Adds a few utilities to ProtoBufUtil, integrating over from mainline
diff --git a/src/com/google/common/io/protocol/BoundInputStream.java b/src/com/google/common/io/protocol/BoundInputStream.java
new file mode 100644
index 0000000..dbccca7
--- /dev/null
+++ b/src/com/google/common/io/protocol/BoundInputStream.java
@@ -0,0 +1,91 @@
+// Copyright 2008 Google Inc. All Rights Reserved.
+
+package com.google.common.io.protocol;
+
+import java.io.*;
+
+/**
+ * An input stream backed by another input stream, where reading from the
+ * underlying input stream is limited to a fixed number of bytes. Also does
+ * some buffering.
+ *
+ */
+public class BoundInputStream extends InputStream {
+
+  /** Buffer size */
+  static final int BUF_SIZE = 4096;
+
+  /** Number of bytes that may still be read from the underlying stream */
+  private int remaining;
+
+  /** Small buffer to avoid making OS calls for each byte read. */
+  private byte[] buf;
+
+  /** Current position in the buffer */
+  private int bufPos;
+
+  /** Filled size of the buffer */
+  private int bufSize;
+
+  /** Underlying stream to read from */
+  private InputStream base;
+
+  public BoundInputStream(InputStream base, int len) {
+    this.base = base;
+    this.remaining = len;
+
+    buf = new byte[Math.min(len, BUF_SIZE)];
+  }
+
+  /**
+   * Make sure there is at least one byte in the buffer. If not possible,
+   * return false.
+   */
+  private boolean checkBuf() throws IOException {
+    if (remaining <= 0) {
+      return false;
+    }
+
+    if (bufPos >= bufSize) {
+      bufSize = base.read(buf, 0, Math.min(remaining, buf.length));
+      if (bufSize <= 0) {
+        remaining = 0;
+        return false;
+      }
+      bufPos = 0;
+    }
+    return true;
+  }
+
+  public int available() {
+    return bufSize - bufPos;
+  }
+
+  public int read() throws IOException {
+    if (!checkBuf()) {
+      return -1;
+    }
+    remaining--;
+    return buf[bufPos++] & 255;
+  }
+
+  public int read(byte[] data, int start, int count) throws IOException {
+    if (!checkBuf()) {
+      return -1;
+    }
+    count = Math.min(count, bufSize - bufPos);
+    System.arraycopy(buf, bufPos, data, start, count);
+    bufPos += count;
+    remaining -= count;
+    return count;
+  }
+
+  /**
+   * How many bytes are remaining, based on the length provided to the
+   * constructor. The underlying stream may terminate earlier. Provided mainly
+   * for testing purposes.
+   */
+  public int getRemaining() {
+    return remaining;
+  }
+}
diff --git a/src/com/google/common/io/protocol/ProtoBufUtil.java b/src/com/google/common/io/protocol/ProtoBufUtil.java
index 72e1bca..a14bef6 100644
--- a/src/com/google/common/io/protocol/ProtoBufUtil.java
+++ b/src/com/google/common/io/protocol/ProtoBufUtil.java
@@ -40,7 +40,7 @@
   /**
    * Get an int with "tag" from the proto buffer. If the given field can't be
    * retrieved, return the provided default value.
-   * 
+   *
    * @param proto The proto buffer.
    * @param tag The tag value that identifies which protocol buffer field to
    *        retrieve.
@@ -111,13 +111,39 @@
   }
 
   /**
+   * Returns an input stream for reading protocol buffer
+   * responses. This method reads a 32-bit signed integer from the
+   * stream, which determines the data size and compression. If the
+   * integer is negative, indicating a GZipped input stream which we
+   * do not support, an exception is thrown. Otherwise, just a
+   * BoundInputStream is returned. The input stream returned is always
+   * limited to the data available.
+   *
+   * @param dataInput the data input to read from
+   * @return an input stream, limited to the data size read from the stream
+   * @throws IOException if the incoming stream is gzipped.
+   */
+  public static InputStream getInputStreamForProtoBufResponse(
+      DataInput dataInput) throws IOException {
+
+    int size = dataInput.readInt();
+    InputStream is = new BoundInputStream((InputStream) dataInput,
+        Math.abs(size));
+
+    if (size < 0) {
+        throw new IOException("Cannot read gzipped streams");
+    }
+    return is;
+  }
+
+  /**
    * Reads a single protocol buffer from the given input stream. This method is
-   * provided where the client needs incremental access to the contents of a 
-   * protocol buffer which contains a sequence of protocol buffers.  
+   * provided where the client needs incremental access to the contents of a
+   * protocol buffer which contains a sequence of protocol buffers.
    * <p />
-   * Please use {@link #getInputStreamForProtoBufResponse} to obtain an input 
+   * Please use {@link #getInputStreamForProtoBufResponse} to obtain an input
    * stream suitable for this method.
-   * 
+   *
    * @param umbrellaType the type of the "outer" protocol buffer containing
    *                    the message to read
    * @param is the stream to read the protocol buffer from
@@ -125,24 +151,46 @@
    *               with the data read and the type will be set)
    * @return the tag id of the message, -1 at the end of the stream
    */
-  public static int readNextProtoBuf(ProtoBufType umbrellaType, 
+  public static int readNextProtoBuf(ProtoBufType umbrellaType,
       InputStream is, ProtoBuf result) throws IOException {
     long tagAndType = ProtoBuf.readVarInt(is, true /* permits EOF */);
     if (tagAndType == -1) {
       return -1;
     }
-    
+
     if ((tagAndType & 7) != ProtoBuf.WIRETYPE_LENGTH_DELIMITED) {
       throw new IOException("Message expected");
     }
     int tag = (int) (tagAndType >>> 3);
-    
+
     result.setType((ProtoBufType) umbrellaType.getData(tag));
     int length = (int) ProtoBuf.readVarInt(is, false);
     result.parse(is, length);
     return tag;
   }
-  
+
+  /**
+   * Reads a size int and a protocol buffer from a DataInput. If the size
+   * is negative, this is interpreted as an indicator that the protocol buffer
+   * is packed with GZIP. In this case, -size bytes are read, and the data is
+   * unpacked with GZIP before constructing the protocol buffer.
+   *
+   * @param protoBufType the protocol buffer type to read
+   * @param dataInput the data input to read from
+   * @return a protocol buffer of the given type
+   * @throws IOException
+   */
+  public static ProtoBuf readProtoBufResponse(ProtoBufType protoBufType,
+      DataInput dataInput) throws IOException {
+    ProtoBuf response = new ProtoBuf(protoBufType);
+    InputStream is = getInputStreamForProtoBufResponse(dataInput);
+    response.parse(is);
+    if (is.read() != -1) {
+      throw new IOException();
+    }
+    return response;
+  }
+
   /**
    * A wrapper for <code> getProtoValueOrNegativeOne </code> that drills into
    * a sub message returning the long value if it exists, returning -1 if it
@@ -162,7 +210,7 @@
     } catch (IllegalArgumentException e) {
       return -1;
     } catch (ClassCastException e) {
-      return -1; 
+      return -1;
     }
   }
 
@@ -182,12 +230,12 @@
   public static int getSubProtoValueOrDefault(ProtoBuf proto, int sub, int tag,
       int defaultValue) {
     try {
-      return getProtoValueOrDefault(getSubProtoOrNull(proto, sub), tag, 
+      return getProtoValueOrDefault(getSubProtoOrNull(proto, sub), tag,
           defaultValue);
     } catch (IllegalArgumentException e) {
       return defaultValue;
     } catch (ClassCastException e) {
-      return defaultValue; 
+      return defaultValue;
     }
   }