Bug 1522705 - Add streaming response support to GeckoWebExecutor r=esawin,agi

Differential Revision: https://phabricator.services.mozilla.com/D19504

--HG--
extra : moz-landing-system : lando
This commit is contained in:
James Willcox 2019-02-21 18:15:02 +00:00
Родитель b8f76d07c5
Коммит f4c323650d
9 изменённых файлов: 331 добавлений и 78 удалений

Просмотреть файл

@ -919,14 +919,12 @@ package org.mozilla.geckoview {
@android.support.annotation.AnyThread public abstract class WebMessage {
ctor protected WebMessage(@android.support.annotation.NonNull org.mozilla.geckoview.WebMessage.Builder);
field @android.support.annotation.Nullable public final java.nio.ByteBuffer body;
field @android.support.annotation.NonNull public final java.util.Map<java.lang.String, java.lang.String> headers;
field @android.support.annotation.NonNull public final java.lang.String uri;
}
@android.support.annotation.AnyThread public abstract static class WebMessage.Builder {
method @android.support.annotation.NonNull public org.mozilla.geckoview.WebMessage.Builder addHeader(@android.support.annotation.NonNull java.lang.String, @android.support.annotation.NonNull java.lang.String);
method @android.support.annotation.NonNull public org.mozilla.geckoview.WebMessage.Builder body(@android.support.annotation.Nullable java.nio.ByteBuffer);
method @android.support.annotation.NonNull public org.mozilla.geckoview.WebMessage.Builder header(@android.support.annotation.NonNull java.lang.String, @android.support.annotation.NonNull java.lang.String);
method @android.support.annotation.NonNull public org.mozilla.geckoview.WebMessage.Builder uri(@android.support.annotation.NonNull java.lang.String);
}
@ -939,6 +937,7 @@ package org.mozilla.geckoview {
field public static final int CACHE_MODE_NO_STORE = 2;
field public static final int CACHE_MODE_ONLY_IF_CACHED = 6;
field public static final int CACHE_MODE_RELOAD = 3;
field @android.support.annotation.Nullable public final java.nio.ByteBuffer body;
field public final int cacheMode;
field @android.support.annotation.NonNull public final java.lang.String method;
field @android.support.annotation.Nullable public final java.lang.String referrer;
@ -946,6 +945,7 @@ package org.mozilla.geckoview {
@android.support.annotation.AnyThread public static class WebRequest.Builder extends org.mozilla.geckoview.WebMessage.Builder {
ctor public Builder(@android.support.annotation.NonNull java.lang.String);
method @android.support.annotation.NonNull public org.mozilla.geckoview.WebRequest.Builder body(@android.support.annotation.Nullable java.nio.ByteBuffer);
method @android.support.annotation.NonNull public org.mozilla.geckoview.WebRequest build();
method @android.support.annotation.NonNull public org.mozilla.geckoview.WebRequest.Builder cacheMode(int);
method @android.support.annotation.NonNull public org.mozilla.geckoview.WebRequest.Builder method(@android.support.annotation.NonNull java.lang.String);
@ -1002,12 +1002,14 @@ package org.mozilla.geckoview {
@android.support.annotation.AnyThread public class WebResponse extends org.mozilla.geckoview.WebMessage {
ctor protected WebResponse(@android.support.annotation.NonNull org.mozilla.geckoview.WebResponse.Builder);
field @android.support.annotation.Nullable public final java.io.InputStream body;
field public final boolean redirected;
field public final int statusCode;
}
@android.support.annotation.AnyThread public static class WebResponse.Builder extends org.mozilla.geckoview.WebMessage.Builder {
ctor public Builder(@android.support.annotation.NonNull java.lang.String);
method @android.support.annotation.NonNull public org.mozilla.geckoview.WebResponse.Builder body(@android.support.annotation.NonNull java.io.InputStream);
method @android.support.annotation.NonNull public org.mozilla.geckoview.WebResponse build();
method @android.support.annotation.NonNull public org.mozilla.geckoview.WebResponse.Builder redirected(boolean);
method @android.support.annotation.NonNull public org.mozilla.geckoview.WebResponse.Builder statusCode(int);

Просмотреть файл

@ -13,12 +13,16 @@ import android.support.test.filters.MediumTest
import android.support.test.filters.SdkSuppress
import android.support.test.runner.AndroidJUnit4
import java.math.BigInteger
import java.net.URI
import java.nio.ByteBuffer
import java.nio.CharBuffer
import java.nio.charset.Charset
import java.security.MessageDigest
import java.util.concurrent.CountDownLatch
import org.hamcrest.MatcherAssert.assertThat
@ -91,7 +95,8 @@ class WebExecutorTest {
}
fun WebResponse.getJSONBody(): JSONObject {
return JSONObject(Charset.forName("UTF-8").decode(body).toString())
val bytes = ByteBuffer.wrap(body!!.readBytes())
return JSONObject(Charset.forName("UTF-8").decode(bytes).toString())
}
@Test
@ -239,4 +244,44 @@ class WebExecutorTest {
thrown.expect(equalTo(WebRequestError(WebRequestError.ERROR_UNKNOWN_HOST, WebRequestError.ERROR_CATEGORY_URI)));
executor.resolve("this should not resolve").poll()
}
@Test
fun testFetchStream() {
val expectedCount = 1 * 1024 * 1024 // 1MB
val response = executor.fetch(WebRequest("$TEST_ENDPOINT/bytes/$expectedCount")).poll(env.defaultTimeoutMillis)!!
assertThat("Status code should match", response.statusCode, equalTo(200))
assertThat("Content-Length should match", response.headers["Content-Length"]!!.toInt(), equalTo(expectedCount))
val stream = response.body!!
val bytes = stream.readBytes(expectedCount)
stream.close()
assertThat("Byte counts should match", bytes.size, equalTo(expectedCount))
val digest = MessageDigest.getInstance("SHA-256").digest(bytes)
assertThat("Hashes should match", response.headers["X-SHA-256"],
equalTo(String.format("%064x", BigInteger(1, digest))))
}
@Test
fun testFetchStreamCancel() {
val expectedCount = 1 * 1024 * 1024 // 1MB
val response = executor.fetch(WebRequest("$TEST_ENDPOINT/bytes/$expectedCount")).poll(env.defaultTimeoutMillis)!!
assertThat("Status code should match", response.statusCode, equalTo(200))
assertThat("Content-Length should match", response.headers["Content-Length"]!!.toInt(), equalTo(expectedCount))
val stream = response.body!!;
assertThat("Stream should have 0 bytes available", stream.available(), equalTo(0))
// Wait a second. Not perfect, but should be enough time for at least one buffer
// to be appended if things are not going as they should.
SystemClock.sleep(1000);
assertThat("Stream should still have 0 bytes available", stream.available(), equalTo(0));
stream.close()
}
}

Просмотреть файл

@ -0,0 +1,147 @@
package org.mozilla.geckoview;
import org.mozilla.gecko.annotation.WrapForJNI;
import org.mozilla.gecko.mozglue.JNIObject;
import org.mozilla.gecko.util.ThreadUtils;
import android.support.annotation.AnyThread;
import android.support.annotation.NonNull;
import android.util.Log;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.LinkedList;
/**
* This class provides an {@link InputStream} wrapper for a Gecko nsIChannel
* (or really, nsIRequest).
*/
@WrapForJNI
@AnyThread
/* package */ class GeckoInputStream extends InputStream {
private static final String LOGTAG = "GeckoInputStream";
private LinkedList<ByteBuffer> mBuffers = new LinkedList<>();
private boolean mEOF;
private boolean mResumed;
private Support mSupport;
/**
* This is only called via JNI. The support instance provides
* callbacks for the native counterpart.
*
* @param support An instance of {@link Support}, used for native callbacks.
*/
private GeckoInputStream(@NonNull Support support) {
mSupport = support;
}
@Override
public synchronized void close() throws IOException {
super.close();
sendEof();
}
@Override
public synchronized int available() throws IOException {
final ByteBuffer buf = mBuffers.peekFirst();
return buf != null ? buf.remaining() : 0;
}
@Override
public synchronized int read() throws IOException {
int expect = Integer.SIZE / 8;
byte[] bytes = new byte[expect];
int count = 0;
while (count < expect) {
long bytesRead = read(bytes, count, expect - count);
if (bytesRead < 0) {
return -1;
}
count += bytesRead;
}
final ByteBuffer buffer = ByteBuffer.wrap(bytes);
return buffer.getInt();
}
@Override
public int read(@NonNull byte[] b) throws IOException {
return read(b, 0, b.length);
}
@Override
public synchronized int read(@NonNull byte[] dest, int offset, int length) throws IOException {
while (!mEOF && mBuffers.size() == 0) {
// The underlying channel is suspended, so resume that before
// waiting for a buffer.
if (!mResumed) {
mSupport.resume();
mResumed = true;
}
try {
wait();
} catch (InterruptedException e) {
}
}
if (mEOF && mBuffers.size() == 0) {
// We have no data and we're not expecting more.
return -1;
}
final ByteBuffer buf = mBuffers.peekFirst();
final int readCount = Math.min(length, buf.remaining());
buf.get(dest, offset, readCount);
if (buf.remaining() == 0) {
// We're done with this buffer, advance the queue.
mBuffers.removeFirst();
}
return readCount;
}
/**
* Called by native code to indicate that no more data will be
* sent via {@link #appendBuffer}.
*/
@WrapForJNI(calledFrom = "gecko")
public synchronized void sendEof() {
mEOF = true;
notifyAll();
}
/**
* Called by native code to provide data for this stream.
*
* @param buf the bytes
* @throws IOException
*/
@WrapForJNI(exceptionMode = "nsresult", calledFrom = "gecko")
private synchronized void appendBuffer(byte[] buf) throws IOException {
ThreadUtils.assertOnGeckoThread();
if (mEOF) {
throw new IllegalStateException();
}
mBuffers.add(ByteBuffer.wrap(buf));
notifyAll();
}
@WrapForJNI
private static class Support extends JNIObject {
@WrapForJNI(dispatchTo = "gecko")
private native void resume();
@Override // JNIObject
protected void disposeNative() {
throw new UnsupportedOperationException();
}
}
}

Просмотреть файл

@ -36,21 +36,9 @@ public abstract class WebMessage {
*/
public final @NonNull Map<String, String> headers;
/**
* The body of the request or response. Must be a directly-allocated ByteBuffer.
* May be null.
*/
public final @Nullable ByteBuffer body;
protected WebMessage(final @NonNull Builder builder) {
uri = builder.mUri;
headers = Collections.unmodifiableMap(builder.mHeaders);
if (builder.mBody != null) {
body = builder.mBody.asReadOnlyBuffer();
} else {
body = null;
}
}
// This is only used via JNI.
@ -130,21 +118,6 @@ public abstract class WebMessage {
return this;
}
/**
* Set the body.
*
* @param buffer A {@link ByteBuffer} with the data.
* Must be allocated directly via {@link ByteBuffer#allocateDirect(int)}.
* @return This Builder instance.
*/
public @NonNull Builder body(final @Nullable ByteBuffer buffer) {
if (buffer != null && !buffer.isDirect()) {
throw new IllegalArgumentException("body must be directly allocated");
}
mBody = buffer;
return this;
}
}
}

Просмотреть файл

@ -30,6 +30,12 @@ public class WebRequest extends WebMessage {
*/
public final @NonNull String method;
/**
* The body of the request. Must be a directly-allocated ByteBuffer.
* May be null.
*/
public final @Nullable ByteBuffer body;
/**
* The cache mode for the request. See {@link #CACHE_MODE_DEFAULT}.
* These modes match those from the DOM Fetch API.
@ -104,6 +110,12 @@ public class WebRequest extends WebMessage {
method = builder.mMethod;
cacheMode = builder.mCacheMode;
referrer = builder.mReferrer;
if (builder.mBody != null) {
body = builder.mBody.asReadOnlyBuffer();
} else {
body = null;
}
}
/**
@ -142,9 +154,18 @@ public class WebRequest extends WebMessage {
return this;
}
@Override
public @NonNull Builder body(@Nullable ByteBuffer buffer) {
super.body(buffer);
/**
* Set the body.
*
* @param buffer A {@link ByteBuffer} with the data.
* Must be allocated directly via {@link ByteBuffer#allocateDirect(int)}.
* @return This Builder instance.
*/
public @NonNull Builder body(final @Nullable ByteBuffer buffer) {
if (buffer != null && !buffer.isDirect()) {
throw new IllegalArgumentException("body must be directly allocated");
}
mBody = buffer;
return this;
}

Просмотреть файл

@ -223,7 +223,7 @@ public class WebRequestError extends Exception {
* @param category An error category, e.g. {@link #ERROR_CATEGORY_URI}
*/
public WebRequestError(@Error int code, @ErrorCategory int category) {
super(String.format("Request failed, error=%d, category=%d", code, category));
super(String.format("Request failed, error=0x%x, category=0x%x", code, category));
this.code = code;
this.category = category;
}

Просмотреть файл

@ -10,7 +10,9 @@ import org.mozilla.gecko.annotation.WrapForJNI;
import android.support.annotation.AnyThread;
import android.support.annotation.NonNull;
import android.support.annotation.Nullable;
import java.io.InputStream;
import java.nio.ByteBuffer;
/**
@ -31,10 +33,16 @@ public class WebResponse extends WebMessage {
*/
public final boolean redirected;
/**
* An {@link InputStream} containing the response body, if available.
*/
public final @Nullable InputStream body;
protected WebResponse(final @NonNull Builder builder) {
super(builder);
this.statusCode = builder.mStatusCode;
this.redirected = builder.mRedirected;
this.body = builder.mBody;
}
/**
@ -45,6 +53,7 @@ public class WebResponse extends WebMessage {
public static class Builder extends WebMessage.Builder {
/* package */ int mStatusCode;
/* package */ boolean mRedirected;
/* package */ InputStream mBody;
/**
* Constructs a new Builder instance with the specified URI.
@ -73,9 +82,14 @@ public class WebResponse extends WebMessage {
return this;
}
@Override
public @NonNull Builder body(final @NonNull ByteBuffer buffer) {
super.body(buffer);
/**
* Sets the {@link InputStream} containing the body of this response.
*
* @param stream An {@link InputStream} with the body of the response.
* @return This Builder instance.
*/
public @NonNull Builder body(final @NonNull InputStream stream) {
mBody = stream;
return this;
}

Просмотреть файл

@ -34,6 +34,9 @@ exclude: true
- Added [`GeckoSession.getDefaultUserAgent`][67.1] to expose the build-time
default user agent synchronously.
- Changed `WebResponse.body` from a `ByteBuffer` to an `InputStream`. Apps that want access
to the entire response body will now need to read the stream themselves.
[67.1]: ../GeckoSession.html#getDefaultUserAgent--
## v66
@ -145,4 +148,4 @@ exclude: true
[65.24]: ../CrashReporter.html#sendCrashReport-android.content.Context-android.os.Bundle-java.lang.String-
[65.25]: ../GeckoResult.html
[api-version]: 0b19e298c556966ca0821bc2be8b015ccd014fa9
[api-version]: 5655c3f6a74c860809e57a2d66499633ac23cfcc

Просмотреть файл

@ -134,8 +134,23 @@ class HeaderVisitor final : public nsIHttpHeaderVisitor {
NS_IMPL_ISUPPORTS(HeaderVisitor, nsIHttpHeaderVisitor)
class LoaderListener final : public nsIStreamLoaderObserver,
public nsIRequestObserver {
class StreamSupport final
: public java::GeckoInputStream::Support::Natives<StreamSupport> {
public:
typedef java::GeckoInputStream::Support::Natives<StreamSupport> Base;
using Base::AttachNative;
using Base::DisposeNative;
using Base::GetNative;
StreamSupport(nsIRequest* aRequest) : mRequest(aRequest) {}
void Resume() { mRequest->Resume(); }
private:
nsCOMPtr<nsIRequest> mRequest;
};
class LoaderListener final : public nsIStreamListener {
public:
NS_DECL_THREADSAFE_ISUPPORTS
@ -144,39 +159,84 @@ class LoaderListener final : public nsIStreamLoaderObserver,
}
NS_IMETHOD
OnStreamComplete(nsIStreamLoader* aLoader, nsISupports* aContext,
nsresult aStatus, uint32_t aResultLength,
const uint8_t* aResult) override {
nsresult rv = HandleWebResponse(aLoader, aStatus, aResultLength, aResult);
OnStartRequest(nsIRequest* aRequest, nsISupports* aContext) override {
MOZ_ASSERT(!mStream);
nsresult status;
aRequest->GetStatus(&status);
if (NS_FAILED(status)) {
CompleteWithError(mResult, status);
return NS_OK;
}
StreamSupport::Init();
// We're expecting data later via OnDataAvailable, so create the stream now.
mSupport = java::GeckoInputStream::Support::New();
StreamSupport::AttachNative(mSupport,
mozilla::MakeUnique<StreamSupport>(aRequest));
mStream = java::GeckoInputStream::New(mSupport);
// Suspend the request immediately. It will be resumed when (if) someone
// tries to read the Java stream.
aRequest->Suspend();
nsresult rv = HandleWebResponse(aRequest);
if (NS_FAILED(rv)) {
CompleteWithError(mResult, rv);
return NS_OK;
}
return NS_OK;
}
NS_IMETHOD
OnStartRequest(nsIRequest* aRequest, nsISupports* aContext) override {
OnStopRequest(nsIRequest* aRequest, nsISupports* aContext,
nsresult aStatusCode) override {
if (mStream) {
mStream->SendEof();
}
return NS_OK;
}
NS_IMETHOD
OnStopRequest(nsIRequest* aRequest, nsISupports* aContext,
nsresult aStatusCode) override {
return NS_OK;
OnDataAvailable(nsIRequest* aRequest, nsISupports* aContext,
nsIInputStream* aInputStream, uint64_t aOffset,
uint32_t aCount) override {
MOZ_ASSERT(mStream);
// We only need this for the ReadSegments call, the value is unused.
uint32_t countRead;
return aInputStream->ReadSegments(WriteSegment, this, aCount, &countRead);
}
private:
static nsresult WriteSegment(nsIInputStream* aInputStream, void* aClosure,
const char* aFromSegment, uint32_t aToOffset,
uint32_t aCount, uint32_t* aWriteCount) {
LoaderListener* self = static_cast<LoaderListener*>(aClosure);
MOZ_ASSERT(self);
MOZ_ASSERT(self->mStream);
*aWriteCount = aCount;
jni::ByteArray::LocalRef buffer = jni::ByteArray::New(
reinterpret_cast<signed char*>(const_cast<char*>(aFromSegment)),
*aWriteCount);
if (NS_FAILED(self->mStream->AppendBuffer(buffer))) {
// The stream was closed or something, abort reading this channel.
return NS_ERROR_ABORT;
}
return NS_OK;
}
NS_IMETHOD
HandleWebResponse(nsIStreamLoader* aLoader, nsresult aStatus,
uint32_t aBodyLength, const uint8_t* aBody) {
NS_ENSURE_SUCCESS(aStatus, aStatus);
nsCOMPtr<nsIRequest> request;
nsresult rv = aLoader->GetRequest(getter_AddRefs(request));
NS_ENSURE_SUCCESS(rv, rv);
nsCOMPtr<nsIHttpChannel> channel = do_QueryInterface(request, &rv);
HandleWebResponse(nsIRequest* aRequest) {
nsresult rv;
nsCOMPtr<nsIHttpChannel> channel = do_QueryInterface(aRequest, &rv);
NS_ENSURE_SUCCESS(rv, rv);
// URI
@ -210,18 +270,9 @@ class LoaderListener final : public nsIStreamLoaderObserver,
builder->Redirected(!loadInfo->RedirectChain().IsEmpty());
// Body
if (aBodyLength) {
jni::ByteBuffer::LocalRef buffer;
rv = java::GeckoWebExecutor::CreateByteBuffer(aBodyLength, &buffer);
NS_ENSURE_SUCCESS(rv, NS_ERROR_OUT_OF_MEMORY);
MOZ_ASSERT(buffer->Address());
MOZ_ASSERT(buffer->Capacity() == aBodyLength);
memcpy(buffer->Address(), aBody, aBodyLength);
builder->Body(buffer);
// Body stream
if (mStream) {
builder->Body(mStream);
}
mResult->Complete(builder->Build());
@ -231,9 +282,11 @@ class LoaderListener final : public nsIStreamLoaderObserver,
virtual ~LoaderListener() {}
const java::GeckoResult::GlobalRef mResult;
java::GeckoInputStream::GlobalRef mStream;
java::GeckoInputStream::Support::GlobalRef mSupport;
};
NS_IMPL_ISUPPORTS(LoaderListener, nsIStreamLoaderObserver, nsIRequestObserver)
NS_IMPL_ISUPPORTS(LoaderListener, nsIStreamListener)
class DNSListener final : public nsIDNSListener {
public:
@ -379,7 +432,7 @@ nsresult WebExecutorSupport::CreateStreamLoader(
}
// Body
const auto body = reqBase->Body();
const auto body = req->Body();
if (body) {
nsCOMPtr<nsIInputStream> stream = new ByteBufferStream(body);
@ -418,16 +471,11 @@ nsresult WebExecutorSupport::CreateStreamLoader(
rv = internalChannel->SetBlockAuthPrompt(true);
NS_ENSURE_SUCCESS(rv, rv);
// All done, set up the stream loader
// All done, set up the listener
RefPtr<LoaderListener> listener = new LoaderListener(aResult);
nsCOMPtr<nsIStreamLoader> loader;
rv = NS_NewStreamLoader(getter_AddRefs(loader), listener);
NS_ENSURE_SUCCESS(rv, rv);
// Finally, open the channel
rv = httpChannel->AsyncOpen(loader);
NS_ENSURE_SUCCESS(rv, rv);
rv = httpChannel->AsyncOpen(listener);
return NS_OK;
}