Let tests provide raw InputStream bodies.

This enables testing of large files, such as those beyond the 2GB
boundary.  Also switches to streaming in MTU-sized chunks.

Bug: 8209169
Change-Id: Iab6b299c13a3d67bbbaa80a9e5bc563ef6cf9302
diff --git a/src/main/java/com/google/mockwebserver/MockResponse.java b/src/main/java/com/google/mockwebserver/MockResponse.java
index 17e00f3..64eb8f3 100644
--- a/src/main/java/com/google/mockwebserver/MockResponse.java
+++ b/src/main/java/com/google/mockwebserver/MockResponse.java
@@ -17,8 +17,11 @@
 package com.google.mockwebserver;
 
 import static com.google.mockwebserver.MockWebServer.ASCII;
+
+import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -30,11 +33,11 @@
 public final class MockResponse implements Cloneable {
     private static final String EMPTY_BODY_HEADER = "Content-Length: 0";
     private static final String CHUNKED_BODY_HEADER = "Transfer-encoding: chunked";
-    private static final byte[] EMPTY_BODY = new byte[0];
 
     private String status = "HTTP/1.1 200 OK";
     private List<String> headers = new ArrayList<String>();
-    private byte[] body = EMPTY_BODY;
+    private InputStream body;
+    private long bodyLength;
     private int bytesPerSecond = Integer.MAX_VALUE;
     private SocketPolicy socketPolicy = SocketPolicy.KEEP_OPEN;
 
@@ -107,21 +110,45 @@
     }
 
     /**
-     * Returns an input stream containing the raw HTTP payload.
+     * Returns a {@code byte[]} containing the raw HTTP payload. This is less
+     * efficient than {@link #getBodyStream()}.
      */
     public byte[] getBody() {
+        try {
+            return readFullyNoClose(body);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Returns an input stream containing the raw HTTP payload.
+     */
+    public InputStream getBodyStream() {
         return body;
     }
 
-    public MockResponse setBody(byte[] body) {
-        if (this.body == EMPTY_BODY) {
+    /**
+     * Returns length of raw HTTP payload.
+     */
+    public long getBodyLength() {
+        return bodyLength;
+    }
+
+    public MockResponse setBody(InputStream body, long bodyLength) {
+        if (this.body == null) {
             headers.remove(EMPTY_BODY_HEADER);
         }
-        this.headers.add("Content-Length: " + body.length);
+        this.headers.add("Content-Length: " + bodyLength);
         this.body = body;
+        this.bodyLength = bodyLength;
         return this;
     }
 
+    public MockResponse setBody(byte[] body) {
+        return setBody(new ByteArrayInputStream(body), body.length);
+    }
+
     public MockResponse setBody(String body) {
         try {
             return setBody(body.getBytes(ASCII));
@@ -145,7 +172,10 @@
             pos += chunkSize;
         }
         bytesOut.write("0\r\n\r\n".getBytes(ASCII)); // last chunk + empty trailer + crlf
-        this.body = bytesOut.toByteArray();
+
+        body = bytesOut.toByteArray();
+        this.body = new ByteArrayInputStream(body);
+        this.bodyLength = body.length;
         return this;
     }
 
@@ -177,4 +207,14 @@
     @Override public String toString() {
         return status;
     }
+
+    private static byte[] readFullyNoClose(InputStream in) throws IOException {
+        ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+        byte[] buffer = new byte[1024];
+        int count;
+        while ((count = in.read(buffer)) != -1) {
+            bytes.write(buffer, 0, count);
+        }
+        return bytes.toByteArray();
+    }
 }
diff --git a/src/main/java/com/google/mockwebserver/MockWebServer.java b/src/main/java/com/google/mockwebserver/MockWebServer.java
index 696a440..6173c88 100644
--- a/src/main/java/com/google/mockwebserver/MockWebServer.java
+++ b/src/main/java/com/google/mockwebserver/MockWebServer.java
@@ -479,17 +479,29 @@
         out.write(("\r\n").getBytes(ASCII));
         out.flush();
 
-        byte[] body = response.getBody();
-        int bytesPerSecond = response.getBytesPerSecond();
+        final InputStream in = response.getBodyStream();
+        final int bytesPerSecond = response.getBytesPerSecond();
 
-        for (int offset = 0; offset < body.length; offset += bytesPerSecond) {
-            int count = Math.min(body.length - offset, bytesPerSecond);
-            out.write(body, offset, count);
+        // Stream data in MTU-sized increments
+        final byte[] buffer = new byte[1452];
+        final long delayMs;
+        if (bytesPerSecond == Integer.MAX_VALUE) {
+            delayMs = 0;
+        } else {
+            delayMs = (1000 * buffer.length) / bytesPerSecond;
+        }
+
+        int read;
+        long sinceDelay = 0;
+        while ((read = in.read(buffer)) != -1) {
+            out.write(buffer, 0, read);
             out.flush();
 
-            if (offset + count < body.length) {
+            sinceDelay += read;
+            if (sinceDelay >= buffer.length && delayMs > 0) {
+                sinceDelay %= buffer.length;
                 try {
-                    Thread.sleep(1000);
+                    Thread.sleep(delayMs);
                 } catch (InterruptedException e) {
                     throw new AssertionError();
                 }