Bug 574507 - 'IndexedDB: Fix transaction queue logic to prevent starving transactions across multiple objectStores'. r=sicking

This commit is contained in:
Ben Turner 2010-07-12 10:05:01 -04:00
Родитель 6651a7269c
Коммит 946bfefa19
4 изменённых файлов: 188 добавлений и 171 удалений

Просмотреть файл

@ -61,22 +61,36 @@ const PRUint32 kIdleThreadTimeoutMs = 30000;
TransactionThreadPool* gInstance = nsnull;
bool gShutdown = false;
inline
nsresult
CheckOverlapAndMergeObjectStores(nsTArray<nsString>& aLockedStores,
const nsTArray<nsString>& aObjectStores,
bool aShouldMerge,
bool* aStoresOverlap)
{
PRUint32 length = aObjectStores.Length();
bool overlap = false;
for (PRUint32 index = 0; index < length; index++) {
const nsString& storeName = aObjectStores[index];
if (aLockedStores.Contains(storeName)) {
overlap = true;
}
else if (aShouldMerge && !aLockedStores.AppendElement(storeName)) {
NS_WARNING("Out of memory!");
return NS_ERROR_OUT_OF_MEMORY;
}
}
*aStoresOverlap = overlap;
return NS_OK;
}
} // anonymous namespace
BEGIN_INDEXEDDB_NAMESPACE
struct QueuedDispatchInfo
{
QueuedDispatchInfo()
: finish(false)
{ }
nsRefPtr<IDBTransaction> transaction;
nsCOMPtr<nsIRunnable> runnable;
nsCOMPtr<nsIRunnable> finishRunnable;
bool finish;
};
class FinishTransactionRunnable : public nsIRunnable
{
public:
@ -208,59 +222,91 @@ TransactionThreadPool::FinishTransaction(IDBTransaction* aTransaction)
nsTArray<TransactionInfo>& transactionsInProgress =
dbTransactionInfo->transactions;
PRUint32 count = transactionsInProgress.Length();
PRUint32 transactionCount = transactionsInProgress.Length();
#ifdef DEBUG
if (aTransaction->mMode == IDBTransaction::FULL_LOCK) {
NS_ASSERTION(dbTransactionInfo->locked, "Should be locked!");
NS_ASSERTION(count == 1, "More transactions running than should be!");
NS_ASSERTION(transactionCount == 1,
"More transactions running than should be!");
}
#endif
if (count == 1) {
if (transactionCount == 1) {
#ifdef DEBUG
{
TransactionInfo& info = transactionsInProgress[0];
NS_ASSERTION(info.transaction == aTransaction, "Transaction mismatch!");
NS_ASSERTION(info.mode == aTransaction->mMode, "Mode mismatch!");
}
#endif
mTransactionsInProgress.Remove(databaseId);
}
else {
for (PRUint32 index = 0; index < count; index++) {
TransactionInfo& info = transactionsInProgress[index];
if (info.transaction == aTransaction) {
// We need to rebuild the locked object store list.
nsTArray<nsString> storesWriting, storesReading;
for (PRUint32 index = 0, count = transactionCount; index < count; index++) {
IDBTransaction* transaction = transactionsInProgress[index].transaction;
if (transaction == aTransaction) {
NS_ASSERTION(count == transactionCount, "More than one match?!");
transactionsInProgress.RemoveElementAt(index);
break;
index--;
count--;
continue;
}
const nsTArray<nsString>& objectStores = transaction->mObjectStoreNames;
bool dummy;
if (transaction->mMode == nsIIDBTransaction::READ_WRITE) {
if (NS_FAILED(CheckOverlapAndMergeObjectStores(storesWriting,
objectStores,
true, &dummy))) {
NS_WARNING("Out of memory!");
}
}
else if (transaction->mMode == nsIIDBTransaction::READ_ONLY) {
if (NS_FAILED(CheckOverlapAndMergeObjectStores(storesReading,
objectStores,
true, &dummy))) {
NS_WARNING("Out of memory!");
}
}
else {
NS_NOTREACHED("Unknown mode!");
}
}
NS_ASSERTION(transactionsInProgress.Length() == count - 1,
NS_ASSERTION(transactionsInProgress.Length() == transactionCount - 1,
"Didn't find the transaction we were looking for!");
dbTransactionInfo->storesWriting.SwapElements(storesWriting);
dbTransactionInfo->storesReading.SwapElements(storesReading);
}
// Try to dispatch all the queued transactions again.
nsTArray<QueuedDispatchInfo> queuedDispatch;
queuedDispatch.SwapElements(mDelayedDispatchQueue);
count = queuedDispatch.Length();
for (PRUint32 index = 0; index < count; index++) {
QueuedDispatchInfo& info = queuedDispatch[index];
if (NS_FAILED(Dispatch(info.transaction, info.runnable, info.finish,
info.finishRunnable))) {
transactionCount = queuedDispatch.Length();
for (PRUint32 index = 0; index < transactionCount; index++) {
if (NS_FAILED(Dispatch(queuedDispatch[index]))) {
NS_WARNING("Dispatch failed!");
}
}
}
bool
nsresult
TransactionThreadPool::TransactionCanRun(IDBTransaction* aTransaction,
TransactionQueue** aQueue)
bool* aCanRun,
TransactionQueue** aExistingQueue)
{
NS_ASSERTION(NS_IsMainThread(), "Wrong thread!");
NS_ASSERTION(aTransaction, "Null pointer!");
NS_ASSERTION(aQueue, "Null pointer!");
NS_ASSERTION(aCanRun, "Null pointer!");
NS_ASSERTION(aExistingQueue, "Null pointer!");
const PRUint32 databaseId = aTransaction->mDatabase->Id();
const nsTArray<nsString>& objectStoreNames = aTransaction->mObjectStoreNames;
@ -270,123 +316,62 @@ TransactionThreadPool::TransactionCanRun(IDBTransaction* aTransaction,
DatabaseTransactionInfo* dbTransactionInfo;
if (!mTransactionsInProgress.Get(databaseId, &dbTransactionInfo)) {
// First transaction for this database, fine to run.
*aQueue = nsnull;
return true;
*aCanRun = true;
*aExistingQueue = nsnull;
return NS_OK;
}
nsTArray<TransactionInfo>& transactionsInProgress =
dbTransactionInfo->transactions;
PRUint32 transactionCount = transactionsInProgress.Length();
NS_ASSERTION(transactionCount, "Should never be 0!");
if (mode == IDBTransaction::FULL_LOCK) {
switch (transactionCount) {
case 0: {
*aQueue = nsnull;
return true;
}
dbTransactionInfo->lockPending = true;
}
case 1: {
if (transactionsInProgress[0].transaction == aTransaction) {
*aQueue = transactionsInProgress[0].queue;
return true;
}
return false;
}
default: {
dbTransactionInfo->lockPending = true;
return false;
}
for (PRUint32 index = 0; index < transactionCount; index++) {
// See if this transaction is in out list of current transactions.
const TransactionInfo& info = transactionsInProgress[index];
if (info.transaction == aTransaction) {
*aCanRun = true;
*aExistingQueue = info.queue;
return NS_OK;
}
}
bool locked = dbTransactionInfo->locked || dbTransactionInfo->lockPending;
bool mayRun = true;
// Another transaction for this database is already running. See if we can
// run at the same time.
for (PRUint32 transactionIndex = 0;
transactionIndex < transactionCount;
transactionIndex++) {
TransactionInfo& transactionInfo = transactionsInProgress[transactionIndex];
if (transactionInfo.transaction == aTransaction) {
// Same transaction, already running.
*aQueue = transactionInfo.queue;
return true;
}
if (locked) {
// Full lock in place or requested, no need to check objectStores.
continue;
}
// Not our transaction. See if the objectStores overlap.
nsTArray<TransactionObjectStoreInfo>& objectStoreInfoArray =
transactionInfo.objectStoreInfo;
PRUint32 objectStoreCount = objectStoreInfoArray.Length();
for (PRUint32 objectStoreIndex = 0;
objectStoreIndex < objectStoreCount;
objectStoreIndex++) {
TransactionObjectStoreInfo& objectStoreInfo =
objectStoreInfoArray[objectStoreIndex];
if (objectStoreNames.Contains(objectStoreInfo.objectStoreName)) {
// Overlapping name, see if the modes are compatible.
switch (mode) {
case nsIIDBTransaction::READ_WRITE: {
// Someone else is reading or writing to this table, we can't
// run now. Mark that we're waiting for it though.
objectStoreInfo.writerWaiting = true;
// We need to mark all overlapping transactions, not just the first
// one we find. Set the retval to false here but don't return until
// we've gone through the rest of the open transactions.
mayRun = false;
} break;
case nsIIDBTransaction::READ_ONLY: {
if (objectStoreInfo.writing || objectStoreInfo.writerWaiting) {
// Someone else is writing to this table, we can't run now.
return false;
}
} break;
case nsIIDBTransaction::SNAPSHOT_READ: {
NS_NOTYETIMPLEMENTED("Not implemented!");
} break;
default: {
NS_NOTREACHED("Should never get here!");
}
}
}
// Continue on to the next TransactionObjectStoreInfo.
}
// Continue on to the next TransactionInfo.
if (dbTransactionInfo->locked || dbTransactionInfo->lockPending) {
*aCanRun = false;
*aExistingQueue = nsnull;
return NS_OK;
}
if (locked) {
// If we got here then this is a new transaction and we won't allow it to
// start.
return false;
bool writeOverlap;
nsresult rv =
CheckOverlapAndMergeObjectStores(dbTransactionInfo->storesWriting,
objectStoreNames,
mode == nsIIDBTransaction::READ_WRITE,
&writeOverlap);
NS_ENSURE_SUCCESS(rv, rv);
bool readOverlap;
rv = CheckOverlapAndMergeObjectStores(dbTransactionInfo->storesReading,
objectStoreNames,
mode == nsIIDBTransaction::READ_ONLY,
&readOverlap);
NS_ENSURE_SUCCESS(rv, rv);
if (writeOverlap ||
(readOverlap && mode == nsIIDBTransaction::READ_WRITE)) {
*aCanRun = false;
*aExistingQueue = nsnull;
return NS_OK;
}
if (!mayRun) {
// If we got here then there are conflicting transactions and we can't run
// yet.
return false;
}
// If we got here then there are no conflicting transactions and we should
// be fine to run.
*aQueue = nsnull;
return true;
*aCanRun = true;
*aExistingQueue = nsnull;
return NS_OK;
}
nsresult
@ -399,8 +384,12 @@ TransactionThreadPool::Dispatch(IDBTransaction* aTransaction,
NS_ASSERTION(aTransaction, "Null pointer!");
NS_ASSERTION(aRunnable, "Null pointer!");
bool canRun;
TransactionQueue* existingQueue;
if (!TransactionCanRun(aTransaction, &existingQueue)) {
nsresult rv = TransactionCanRun(aTransaction, &canRun, &existingQueue);
NS_ENSURE_SUCCESS(rv, rv);
if (!canRun) {
QueuedDispatchInfo* info = mDelayedDispatchQueue.AppendElement();
NS_ENSURE_TRUE(info, NS_ERROR_OUT_OF_MEMORY);
@ -443,6 +432,18 @@ TransactionThreadPool::Dispatch(IDBTransaction* aTransaction,
dbTransactionInfo->locked = true;
}
const nsTArray<nsString>& objectStoreNames = aTransaction->mObjectStoreNames;
nsTArray<nsString>& storesInUse =
aTransaction->mMode == nsIIDBTransaction::READ_WRITE ?
dbTransactionInfo->storesWriting :
dbTransactionInfo->storesReading;
if (!storesInUse.AppendElements(objectStoreNames)) {
NS_WARNING("Out of memory!");
return NS_ERROR_OUT_OF_MEMORY;
}
nsTArray<TransactionInfo>& transactionInfoArray =
dbTransactionInfo->transactions;
@ -454,23 +455,16 @@ TransactionThreadPool::Dispatch(IDBTransaction* aTransaction,
if (aFinish) {
transactionInfo->queue->Finish(aFinishRunnable);
}
transactionInfo->mode = aTransaction->mMode;
const nsTArray<nsString>& objectStoreNames = aTransaction->mObjectStoreNames;
PRUint32 count = objectStoreNames.Length();
for (PRUint32 index = 0; index < count; index++) {
TransactionObjectStoreInfo* info =
transactionInfo->objectStoreInfo.AppendElement();
NS_ENSURE_TRUE(info, NS_ERROR_OUT_OF_MEMORY);
info->objectStoreName = objectStoreNames[index];
info->writing = transactionInfo->mode == nsIIDBTransaction::READ_WRITE;
if (!transactionInfo->objectStoreNames.AppendElements(objectStoreNames)) {
NS_WARNING("Out of memory!");
return NS_ERROR_OUT_OF_MEMORY;
}
if (autoDBTransactionInfo) {
if (!mTransactionsInProgress.Put(databaseId, autoDBTransactionInfo)) {
NS_ERROR("Failed to put!");
return NS_ERROR_FAILURE;
NS_WARNING("Failed to put!");
return NS_ERROR_OUT_OF_MEMORY;
}
autoDBTransactionInfo.forget();
}

Просмотреть файл

@ -101,27 +101,11 @@ protected:
bool mShouldFinish;
};
struct TransactionObjectStoreInfo
{
TransactionObjectStoreInfo()
: writing(false), writerWaiting(false)
{ }
nsString objectStoreName;
bool writing;
bool writerWaiting;
};
struct TransactionInfo
{
TransactionInfo()
: mode(nsIIDBTransaction::READ_ONLY)
{ }
nsRefPtr<IDBTransaction> transaction;
nsRefPtr<TransactionQueue> queue;
nsTArray<TransactionObjectStoreInfo> objectStoreInfo;
PRUint16 mode;
nsTArray<nsString> objectStoreNames;
};
struct DatabaseTransactionInfo
@ -133,6 +117,20 @@ protected:
bool locked;
bool lockPending;
nsTArray<TransactionInfo> transactions;
nsTArray<nsString> storesReading;
nsTArray<nsString> storesWriting;
};
struct QueuedDispatchInfo
{
QueuedDispatchInfo()
: finish(false)
{ }
nsRefPtr<IDBTransaction> transaction;
nsCOMPtr<nsIRunnable> runnable;
nsCOMPtr<nsIRunnable> finishRunnable;
bool finish;
};
TransactionThreadPool();
@ -143,8 +141,15 @@ protected:
void FinishTransaction(IDBTransaction* aTransaction);
bool TransactionCanRun(IDBTransaction* aTransaction,
TransactionQueue** aQueue);
nsresult TransactionCanRun(IDBTransaction* aTransaction,
bool* aCanRun,
TransactionQueue** aExistingQueue);
nsresult Dispatch(const QueuedDispatchInfo& aInfo)
{
return Dispatch(aInfo.transaction, aInfo.runnable, aInfo.finish,
aInfo.finishRunnable);
}
nsCOMPtr<nsIThreadPool> mThreadPool;

Просмотреть файл

@ -49,7 +49,16 @@
.add({});
request.onerror = errorHandler;
request.onsuccess = function(event) {
ok(stepNumber == 1 || stepNumber == 2, "This callback came before 3");
is(stepNumber, 1, "This callback came first");
stepNumber++;
}
request = db.transaction(["foo"], READ_WRITE)
.objectStore("foo")
.add({});
request.onerror = errorHandler;
request.onsuccess = function(event) {
is(stepNumber, 2, "This callback came second");
stepNumber++;
}
@ -58,9 +67,17 @@
.add({});
request.onerror = errorHandler;
request.onsuccess = function(event) {
is(stepNumber, 3, "This callback came in at 3");
is(stepNumber, 3, "This callback came third");
stepNumber++;
}
request = db.transaction(["foo", "bar"], READ_WRITE)
.objectStore("bar")
.add({});
request.onerror = errorHandler;
request.onsuccess = function(event) {
is(stepNumber, 4, "This callback came fourth");
stepNumber++;
event.transaction.oncomplete = grabEventAndContinueHandler;
}
request = db.transaction(["bar"], READ_WRITE)
@ -68,14 +85,15 @@
.add({});
request.onerror = errorHandler;
request.onsuccess = function(event) {
ok(stepNumber == 1 || stepNumber == 2, "This callback came before 3");
is(stepNumber, 5, "This callback came fifth");
stepNumber++;
event.transaction.oncomplete = grabEventAndContinueHandler;
}
stepNumber++;
yield;
is(stepNumber, 4, "All callbacks received");
is(stepNumber, 6, "All callbacks received");
}
finishTest();

Просмотреть файл

@ -44,8 +44,6 @@
SimpleTest.executeSoon(function() { testGenerator.next(); });
yield;
let objectStore = db.objectStore("foo");
let continueReading = true;
let readerCount = 0;
let callbackCount = 0;
@ -55,7 +53,7 @@
// loop.
for (let i = 0; i < 20; i++) {
readerCount++;
request = objectStore.get(key);
request = db.objectStore("foo").get(key);
request.onerror = errorHandler;
request.onsuccess = function(event) {
callbackCount++;
@ -75,7 +73,7 @@
request.onsuccess = function(event) {
continueReading = false;
finalCallbackCount = callbackCount;
ok(event.result == callbackCount,
is(event.result, callbackCount,
"write callback came before later reads");
}
}
@ -96,7 +94,9 @@
finishTest();
yield;
}
SimpleTest.requestLongerTimeout(5); // see bug 580875
</script>
<script type="text/javascript;version=1.7" src="helpers.js"></script>