pjs/netwerk/cache/memcache/nsMemCacheChannel.cpp

668 строки
20 KiB
C++

/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
* The contents of this file are subject to the Netscape Public
* License Version 1.1 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of
* the License at http://www.mozilla.org/NPL/
*
* Software distributed under the License is distributed on an "AS
* IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or
* implied. See the License for the specific language governing
* rights and limitations under the License.
*
* The Original Code is Mozilla Communicator client code, released
* March 31, 1998.
*
* The Initial Developer of the Original Code is Netscape
* Communications Corporation. Portions created by Netscape are
* Copyright (C) 1998-1999 Netscape Communications Corporation. All
* Rights Reserved.
*
*/
#include "nsMemCache.h"
#include "nsMemCacheChannel.h"
#include "nsIStreamListener.h"
#include "nsIChannel.h"
#include "nsIStorageStream.h"
#include "nsIOutputStream.h"
#include "nsIServiceManager.h"
#include "nsIEventQueueService.h"
#include "nsNetUtil.h"
#include "nsILoadGroup.h"
static NS_DEFINE_CID(kIOServiceCID, NS_IOSERVICE_CID);
static NS_DEFINE_CID(kEventQueueService, NS_EVENTQUEUESERVICE_CID);
NS_IMPL_THREADSAFE_ISUPPORTS3(nsMemCacheChannel, nsIChannel, nsIRequest, nsITransport)
void
nsMemCacheChannel::NotifyStorageInUse(PRInt32 aBytesUsed)
{
mRecord->mCache->mOccupancy += aBytesUsed;
}
/**
* This class acts as an adaptor around a synchronous input stream to add async
* read capabilities. It adds methods for initiating, suspending, resuming and
* cancelling async reads.
*/
class AsyncReadStreamAdaptor : public nsIInputStream,
public nsIStreamListener
{
public:
AsyncReadStreamAdaptor(nsMemCacheChannel* aChannel, nsIInputStream *aSyncStream):
mSyncStream(aSyncStream), mDataAvailCursor(0),
mRemaining((PRUint32)-1), mAvailable(0), mChannel(aChannel), mAbortStatus(NS_OK), mSuspended(PR_FALSE)
{
NS_INIT_REFCNT();
NS_ADDREF(mChannel);
}
virtual ~AsyncReadStreamAdaptor() {
mChannel->mAsyncReadStream = 0;
NS_RELEASE(mChannel);
}
NS_DECL_ISUPPORTS
nsresult
IsPending(PRBool* aIsPending) {
*aIsPending = (mRemaining != 0) && NS_SUCCEEDED(mAbortStatus);
return NS_OK;
}
nsresult
Cancel(nsresult status) {
if (NS_SUCCEEDED(mAbortStatus)) {
mAbortStatus = status;
return mEventQueueStreamListener ?
mEventQueueStreamListener->OnStopRequest(mChannel, mContext, status):
status;
} else {
// Cancel has already been called... Do not fire another OnStopRequest!
return NS_OK;
}
}
nsresult
Suspend(void) { mSuspended = PR_TRUE; return NS_OK; }
nsresult
Resume(void) {
if (!mSuspended)
return NS_ERROR_FAILURE;
mSuspended = PR_FALSE;
return NextListenerEvent();
}
// nsIStreamListener methods, all of which delegate to the real listener
// This is the heart of this class.
// The OnDataAvailable() method is always called from an event processed by
// the system event queue. The event is sent from an
// AsyncReadStreamAdaptor object to itself. This method both forwards the
// event to the downstream listener and causes another OnDataAvailable()
// event to be enqueued.
NS_IMETHOD
OnDataAvailable(nsIRequest *request, nsISupports *aContext,
nsIInputStream *inStr, PRUint32 sourceOffset, PRUint32 count) {
nsresult rv;
rv = mDownstreamListener->OnDataAvailable(mChannel, aContext, inStr, sourceOffset, count);
if (NS_FAILED(rv)) {
Cancel(rv);
return rv;
}
if (!mSuspended && NS_SUCCEEDED(mAbortStatus)) {
rv = NextListenerEvent();
if (NS_FAILED(rv)) {
Fail();
return rv;
}
}
return rv;
}
NS_IMETHOD
OnStartRequest(nsIRequest *request, nsISupports *aContext) {
nsresult rv = NS_OK;
NS_ASSERTION(mDownstreamListener, "no downstream listener");
if (mDownstreamListener) {
rv = mDownstreamListener->OnStartRequest(mChannel, aContext);
}
if (NS_FAILED(rv))
Cancel(rv);
return rv;
}
NS_IMETHOD
OnStopRequest(nsIRequest *request, nsISupports *aContext,
nsresult aStatus) {
nsresult rv = NS_OK;
NS_ASSERTION(mDownstreamListener, "no downstream listener");
if (mDownstreamListener) {
rv = mDownstreamListener->OnStopRequest(mChannel, aContext, aStatus);
mDownstreamListener = 0;
}
// Tricky: causes this instance to be free'ed because mEventQueueStreamListener
// has a circular reference back to this.
mEventQueueStreamListener = 0;
return rv;
}
// nsIInputStream methods
NS_IMETHOD
Available(PRUint32 *aNumBytes) { return mAvailable; }
NS_IMETHOD
Read(char* aBuf, PRUint32 aCount, PRUint32 *aBytesRead) {
if (NS_FAILED(mAbortStatus))
return NS_BASE_STREAM_CLOSED;
*aBytesRead = 0;
aCount = PR_MIN(aCount, mAvailable);
nsresult rv = mSyncStream->Read(aBuf, aCount, aBytesRead);
mAvailable -= *aBytesRead;
if (NS_FAILED(rv) && (rv != NS_BASE_STREAM_WOULD_BLOCK)) {
Fail();
return rv;
}
return NS_OK;
}
NS_IMETHOD ReadSegments(nsWriteSegmentFun writer, void * closure, PRUint32 count, PRUint32 *_retval) {
return mSyncStream->ReadSegments(writer, closure, count, _retval);
}
NS_IMETHOD GetNonBlocking(PRBool *aNonBlocking) {
NS_NOTREACHED("GetNonBlocking");
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHOD GetObserver(nsIInputStreamObserver * *aObserver) {
NS_NOTREACHED("GetObserver");
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHOD SetObserver(nsIInputStreamObserver * aObserver) {
NS_NOTREACHED("SetObserver");
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHOD
Close() {
nsresult rv = mSyncStream->Close();
mSyncStream = 0;
mContext = 0;
mDownstreamListener = 0;
mEventQueueStreamListener = 0;
return rv;
}
NS_IMETHOD
GetTransferCount(nsLoadFlags *aTransferCount) {
*aTransferCount = mRemaining;
return NS_OK;
}
NS_IMETHOD
SetTransferCount(nsLoadFlags aTransferCount) {
mRemaining = aTransferCount;
return NS_OK;
}
nsresult
AsyncRead(nsIStreamListener* aListener, nsISupports* aContext) {
nsresult rv;
nsIEventQueue *eventQ;
mContext = aContext;
mDownstreamListener = aListener;
NS_WITH_SERVICE(nsIIOService, serv, kIOServiceCID, &rv);
if (NS_FAILED(rv)) return rv;
NS_WITH_SERVICE(nsIEventQueueService, eventQService, kEventQueueService, &rv);
if (NS_FAILED(rv)) return rv;
rv = eventQService->GetThreadEventQueue(PR_CurrentThread(), &eventQ);
if (NS_FAILED(rv)) return rv;
rv = NS_NewAsyncStreamListener(getter_AddRefs(mEventQueueStreamListener),
NS_STATIC_CAST(nsIStreamListener*, this), eventQ);
NS_RELEASE(eventQ);
if (NS_FAILED(rv)) return rv;
rv = mEventQueueStreamListener->OnStartRequest(mChannel, aContext);
if (NS_FAILED(rv)) return rv;
return NextListenerEvent();
}
protected:
nsresult
Fail(void) {
mAbortStatus = NS_BINDING_ABORTED;
return mEventQueueStreamListener->OnStopRequest(mChannel, mContext, NS_BINDING_FAILED);
}
// If more data remains in the source stream that the downstream consumer
// has not yet been notified about, fire an OnDataAvailable event.
// Otherwise, fire an OnStopRequest event.
nsresult
NextListenerEvent() {
PRUint32 available;
nsresult rv = mSyncStream->Available(&available);
if (NS_FAILED(rv)) return rv;
available -= mAvailable;
available = PR_MIN(available, mRemaining);
if (available) {
PRUint32 size = PR_MIN(available, MEM_CACHE_SEGMENT_SIZE);
rv = mEventQueueStreamListener->OnDataAvailable(mChannel, mContext, this,
mDataAvailCursor, size);
mDataAvailCursor += size;
mRemaining -= size;
mAvailable += size;
return rv;
} else {
rv = mEventQueueStreamListener->OnStopRequest(mChannel, mContext, NS_OK);
AsyncReadStreamAdaptor* thisAlias = this;
NS_RELEASE(thisAlias);
return rv;
}
}
private:
nsCOMPtr<nsISupports> mContext; // Opaque context passed to AsyncRead()
nsCOMPtr<nsIStreamListener> mEventQueueStreamListener; // Stream listener that has been proxied
nsCOMPtr<nsIStreamListener> mDownstreamListener; // Original stream listener
nsCOMPtr<nsIInputStream> mSyncStream; // Underlying synchronous stream that is
// being converted to an async stream
PRUint32 mDataAvailCursor;
PRUint32 mRemaining; // Size of AsyncRead request less bytes for
// consumer OnDataAvailable's that were fired
PRUint32 mAvailable; // Number of bytes for which OnDataAvailable fired
nsMemCacheChannel* mChannel; // Associated memory cache channel, strong link
// but can not use nsCOMPtr
nsresult mAbortStatus; // Abort() has been called
PRBool mSuspended; // Suspend() has been called
};
NS_IMPL_ISUPPORTS3(AsyncReadStreamAdaptor, nsIInputStream,
nsIStreamListener, nsIRequestObserver)
// The only purpose of this output stream wrapper is to adjust the cache's
// overall occupancy as new data flows into the cache entry.
class MemCacheWriteStreamWrapper : public nsIOutputStream {
public:
MemCacheWriteStreamWrapper(nsMemCacheChannel* aChannel, nsIOutputStream *aBaseStream):
mBaseStream(aBaseStream), mChannel(aChannel)
{
NS_INIT_REFCNT();
NS_ADDREF(mChannel);
}
virtual ~MemCacheWriteStreamWrapper() { NS_RELEASE(mChannel); };
static nsresult
Create(nsMemCacheChannel* aChannel, nsIOutputStream *aBaseStream, nsIOutputStream* *aWrapper) {
MemCacheWriteStreamWrapper *wrapper =
new MemCacheWriteStreamWrapper(aChannel, aBaseStream);
if (!wrapper) return NS_ERROR_OUT_OF_MEMORY;
NS_ADDREF(wrapper);
*aWrapper = wrapper;
return NS_OK;
}
NS_DECL_ISUPPORTS
NS_IMETHOD
Close() { return mBaseStream->Close(); }
NS_IMETHOD
Flush() { return mBaseStream->Flush(); }
NS_IMETHOD
Write(const char *aBuffer, PRUint32 aCount, PRUint32 *aNumWritten) {
*aNumWritten = 0;
nsresult rv = mBaseStream->Write(aBuffer, aCount, aNumWritten);
mChannel->NotifyStorageInUse(*aNumWritten);
return rv;
}
NS_IMETHOD
WriteFrom(nsIInputStream *inStr, PRUint32 count, PRUint32 *_retval) {
NS_NOTREACHED("WriteFrom");
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHOD
WriteSegments(nsReadSegmentFun reader, void * closure, PRUint32 count, PRUint32 *_retval) {
NS_NOTREACHED("WriteSegments");
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHOD
GetNonBlocking(PRBool *aNonBlocking) {
NS_NOTREACHED("GetNonBlocking");
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHOD
SetNonBlocking(PRBool aNonBlocking) {
NS_NOTREACHED("SetNonBlocking");
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHOD
GetObserver(nsIOutputStreamObserver * *aObserver) {
NS_NOTREACHED("GetObserver");
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHOD
SetObserver(nsIOutputStreamObserver * aObserver) {
NS_NOTREACHED("SetObserver");
return NS_ERROR_NOT_IMPLEMENTED;
}
private:
nsCOMPtr<nsIOutputStream> mBaseStream;
nsMemCacheChannel* mChannel;
};
NS_IMPL_THREADSAFE_ISUPPORTS1(MemCacheWriteStreamWrapper, nsIOutputStream)
nsMemCacheChannel::nsMemCacheChannel(nsMemCacheRecord *aRecord, nsILoadGroup *aLoadGroup)
: mRecord(aRecord), mStatus(NS_OK),
mLoadFlags(nsIRequest::LOAD_NORMAL)
{
NS_INIT_REFCNT();
mRecord->mNumChannels++;
}
nsMemCacheChannel::~nsMemCacheChannel()
{
mRecord->mNumChannels--;
}
NS_IMETHODIMP
nsMemCacheChannel::GetName(PRUnichar* *result)
{
NS_NOTREACHED("nsMemCacheChannel::GetName");
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHODIMP
nsMemCacheChannel::IsPending(PRBool* aIsPending)
{
*aIsPending = PR_FALSE;
if (!mAsyncReadStream)
return NS_OK;
return mAsyncReadStream->IsPending(aIsPending);
}
NS_IMETHODIMP
nsMemCacheChannel::GetStatus(nsresult *status)
{
*status = mStatus;
return NS_OK;
}
NS_IMETHODIMP
nsMemCacheChannel::Cancel(nsresult status)
{
mStatus = status;
if (!mAsyncReadStream)
return NS_ERROR_FAILURE;
return mAsyncReadStream->Cancel(status);
}
NS_IMETHODIMP
nsMemCacheChannel::Suspend(void)
{
if (!mAsyncReadStream)
return NS_ERROR_FAILURE;
return mAsyncReadStream->Suspend();
}
NS_IMETHODIMP
nsMemCacheChannel::Resume(void)
{
if (!mAsyncReadStream)
return NS_ERROR_FAILURE;
return mAsyncReadStream->Resume();
}
NS_IMETHODIMP
nsMemCacheChannel::GetOriginalURI(nsIURI* *aURI)
{
// Not required
NS_NOTREACHED("nsMemCacheChannel::GetOriginalURI");
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHODIMP
nsMemCacheChannel::SetOriginalURI(nsIURI* aURI)
{
// Not required
NS_NOTREACHED("nsMemCacheChannel::SetOriginalURI");
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHODIMP
nsMemCacheChannel::GetURI(nsIURI* *aURI)
{
// Not required to be implemented, since it is implemented by cache manager
NS_NOTREACHED("nsMemCacheChannel::GetURI");
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHODIMP
nsMemCacheChannel::Open(nsIInputStream **aResult)
{
return OpenInputStream(0, PRUint32(-1), 0, aResult);
}
NS_IMETHODIMP
nsMemCacheChannel::AsyncOpen(nsIStreamListener *aListener, nsISupports *aContext)
{
nsCOMPtr<nsIRequest> req;
return AsyncRead(aListener, aContext, 0, PRUint32(-1), 0, getter_AddRefs(req));
}
NS_IMETHODIMP
nsMemCacheChannel::OpenInputStream(PRUint32 offset, PRUint32 count, PRUint32 flags,
nsIInputStream* *aResult)
{
nsresult rv;
NS_ENSURE_ARG(aResult);
if (mInputStream)
return NS_ERROR_NOT_AVAILABLE;
rv = mRecord->mStorageStream->NewInputStream(offset, getter_AddRefs(mInputStream));
if (NS_FAILED(rv)) return rv;
*aResult = mInputStream;
NS_ADDREF(*aResult);
return rv;
}
NS_IMETHODIMP
nsMemCacheChannel::OpenOutputStream(PRUint32 offset, PRUint32 count, PRUint32 flags,
nsIOutputStream* *aResult)
{
nsresult rv;
NS_ENSURE_ARG(aResult);
nsCOMPtr<nsIOutputStream> outputStream;
PRUint32 oldLength;
mRecord->mStorageStream->GetLength(&oldLength);
rv = mRecord->mStorageStream->GetOutputStream(offset, getter_AddRefs(outputStream));
if (NS_FAILED(rv)) return rv;
if (offset < oldLength)
NotifyStorageInUse(offset - oldLength);
return MemCacheWriteStreamWrapper::Create(this, outputStream, aResult);
}
NS_IMETHODIMP
nsMemCacheChannel::AsyncRead(nsIStreamListener *aListener, nsISupports *aContext,
PRUint32 offset, PRUint32 count, PRUint32 flags,
nsIRequest **aResult)
{
NS_ENSURE_ARG_POINTER(aResult);
nsCOMPtr<nsIInputStream> inputStream;
nsresult rv = OpenInputStream(offset, count, flags, getter_AddRefs(inputStream));
if (NS_FAILED(rv)) return rv;
AsyncReadStreamAdaptor *asyncReadStreamAdaptor;
asyncReadStreamAdaptor = new AsyncReadStreamAdaptor(this, inputStream);
if (!asyncReadStreamAdaptor)
return NS_ERROR_OUT_OF_MEMORY;
NS_ADDREF(asyncReadStreamAdaptor);
mAsyncReadStream = asyncReadStreamAdaptor;
rv = asyncReadStreamAdaptor->AsyncRead(aListener, aContext);
if (NS_FAILED(rv)) {
mAsyncReadStream = nsnull;
NS_RELEASE(asyncReadStreamAdaptor);
}
NS_ADDREF(*aResult = this);
return rv;
}
NS_IMETHODIMP
nsMemCacheChannel::AsyncWrite(nsIStreamProvider *provider, nsISupports *ctxt,
PRUint32 offset, PRUint32 count, PRUint32 flags,
nsIRequest **aResult)
{
// Not required to be implemented
NS_NOTREACHED("nsMemCacheChannel::AsyncWrite");
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHODIMP
nsMemCacheChannel::SetContentLength(PRInt32 aContentLength)
{
return NS_OK;
}
NS_IMETHODIMP
nsMemCacheChannel::GetLoadFlags(nsLoadFlags *aLoadFlags)
{
*aLoadFlags = mLoadFlags;
return NS_OK;
}
NS_IMETHODIMP
nsMemCacheChannel::SetLoadFlags(nsLoadFlags aLoadFlags)
{
mLoadFlags = aLoadFlags;
return NS_OK;
}
NS_IMETHODIMP
nsMemCacheChannel::GetContentType(char* *aContentType)
{
// Not required to be implemented, since it is implemented by cache manager
NS_NOTREACHED("nsMemCacheChannel::GetContentType");
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHODIMP
nsMemCacheChannel::SetContentType(const char *aContentType)
{
// Not required to be implemented, since it is implemented by cache manager
NS_NOTREACHED("nsMemCacheChannel::SetContentType");
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHODIMP
nsMemCacheChannel::GetContentLength(PRInt32 *aContentLength)
{
PRUint32 cl = 0;
mRecord->GetStoredContentLength(&cl);
*aContentLength = (PRInt32) cl;
return NS_OK;
}
NS_IMETHODIMP
nsMemCacheChannel::GetOwner(nsISupports* *aOwner)
{
*aOwner = mOwner.get();
NS_IF_ADDREF(*aOwner);
return NS_OK;
}
NS_IMETHODIMP
nsMemCacheChannel::SetOwner(nsISupports* aOwner)
{
// Not required to be implemented, since it is implemented by cache manager
mOwner = aOwner;
return NS_OK;
}
NS_IMETHODIMP
nsMemCacheChannel::GetLoadGroup(nsILoadGroup* *aLoadGroup)
{
// Not required to be implemented, since it is implemented by cache manager
NS_NOTREACHED("nsMemCacheChannel::GetLoadGroup");
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHODIMP
nsMemCacheChannel::SetLoadGroup(nsILoadGroup* aLoadGroup)
{
// Not required to be implemented, since it is implemented by cache manager
NS_NOTREACHED("nsMemCacheChannel::SetLoadGroup");
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHODIMP
nsMemCacheChannel::GetNotificationCallbacks(nsIInterfaceRequestor* *aNotificationCallbacks)
{
// Not required to be implemented, since it is implemented by cache manager
NS_NOTREACHED("nsMemCacheChannel::GetNotificationCallbacks");
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHODIMP
nsMemCacheChannel::SetNotificationCallbacks(nsIInterfaceRequestor* aNotificationCallbacks)
{
// Not required to be implemented, since it is implemented by cache manager
NS_NOTREACHED("nsMemCacheChannel::SetNotificationCallbacks");
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHODIMP
nsMemCacheChannel::SetNotificationCallbacks(nsIInterfaceRequestor* aCallbacks,
PRBool isBackground)
{
// Not required to be implemented, since it is implemented by cache manager
NS_NOTREACHED("nsMemCacheChannel::SetNotificationCallbacks");
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHODIMP
nsMemCacheChannel::GetSecurityInfo(nsISupports * *aSecurityInfo)
{
*aSecurityInfo = nsnull;
return NS_OK;
}