Monitors and Wait Conditions in Qt (Part 2)
Материал из Wiki.crossplatform.ru
Qt Quarterly | Выпуск 22 | Документация |
by Jasmin Blanchette
In the previous issue of Qt Quarterly, we saw how to synchronizethreads using monitors, a high-level synchronization mechanism. Wealso reviewed the code of two examples, BankAccount andBoundedBuffer, that showed how to implement monitors using QMutex andQWaitCondition. In this issue, we will review a few fundamentaltechniques for developing more powerful monitors than those presentedpreviously.
Содержание |
More precisely, we will study the following four techniques: coveringconditions, "passing the condition", priority wait, and invariants.We will also compare monitors with semaphores and show how theyrelate to each other. Most of the techniques and examples are drawnfrom Gregory Andrews's textbook Multithreaded, Parallel, andDistributed Programming (2000).
[править] Covering Conditions
In last quarter's article, we reviewed the followingBoundedBuffer monitor, which relies on two wait conditions,bufferIsNotFull and bufferIsNotEmpty:
monitor BoundedBuffer { public: BoundedBuffer() { head = 0; tail = 0; } void put(char ch) { while (tail == head + N) wait(bufferIsNotFull); buffer[tail++ % N] = ch; signal(bufferIsNotEmpty); } char get() { while (head == tail) wait(bufferIsNotEmpty); char ch = buffer[head++ % N]; signal(bufferIsNotFull); return ch; } private: cond bufferIsNotFull; cond bufferIsNotEmpty; int head, tail; char buffer[N]; };
Recall that in our pseudo-code universe, the member functions of amonitor are implicitly executed with mutual exclusion. In Java, thiswould be implemented using the synchronized keyword. InQt/C++, we must explicitly lock and unlock a QMutex:
class BoundedBuffer { public: BoundedBuffer() { head = 0; tail = 0; } void put(char ch) { QMutexLocker locker(&mutex); while (tail == head + N) bufferIsNotFull.wait(&mutex); buffer[tail++ % N] = ch; bufferIsNotEmpty.wakeOne(); } char get() { QMutexLocker locker(&mutex); while (head == tail) bufferIsNotEmpty.wait(&mutex); char ch = buffer[head++ % N]; bufferIsNotFull.wakeOne(); return ch; } private: QMutex mutex; QWaitCondition bufferIsNotFull; QWaitCondition bufferIsNotEmpty; int head, tail; char buffer[N]; };
A covering condition is a wait condition that is used for severalpurposes. For the BoundedBuffer monitor, we could for examplemerge the two wait conditions into a single bufferChangedcondition, leading to the following pseudo-code:
monitor BoundedBuffer { public: BoundedBuffer() { head = 0; tail = 0; } void put(char ch) { while (tail == head + N) wait(bufferChanged); buffer[tail++ % N] = ch; signalAll(bufferChanged); } char get() { while (head == tail) wait(bufferChanged); char ch = buffer[head++ % N]; signalAll(bufferChanged); return ch; } private: cond bufferChanged; int head, tail; char buffer[N]; };
In the new version of the monitor, we signal all the waiting threadsevery time we read from or write to the buffer by callingsignalAll(bufferChanged). The waiting threads then recheck thepredicate on which they are waiting (tail == head +N or head == tail). One of them will proceed and theother ones will wait.
Both versions of the BoundedBuffer monitor are semanticallyequivalent, but the first version is faster in practice, because acall to get() or put() will wake at most one thread, insteadof waking up all suspended threads.
Why use covering conditions then? In the BoundedBuffer example,we could do without because when n threads try to access themonitor simultaneously, they wait for one of two conditions tooccur ("buffer is not empty" and "buffer is not full"). For someapplications, using separate wait conditions would not be practical,because the n threads might be waiting for n differentconditions. The BankAccount monitor below illustrates this:
monitor BankAccount { public: BankAccount() { balance = 0; } void withdraw(uint amount) { while (amount > balance) wait(moreMoney); balance -= amount; } void deposit(uint amount) { balance += amount; signalAll(moreMoney); } private: cond moreMoney; int balance; };
Covering conditions are very common in Java because, prior toversion 1.5, Java supported only one wait condition per monitor.
[править] Passing the Condition
The next technique we will review is called "passing thecondition." Consider the following MemoryAllocator monitor:
monitor MemoryAllocator { public: MemoryAllocator() { for (int i = 0; i < N; ++i) isFree[i] = true; numFree = N; } char *allocBlock() { if (numFree == 0) wait(blockFreed); for (int i = 0; i < N; ++i) { if (isFree[i]) { isFree[i] = false; --numFree; return blocks[i]; } } assert(false); return 0; } void freeBlock(char *block) { int i = (block - blocks[0]) / BlockSize; assert(i >= 0 && i < N && !isFree[i]); isFree[i] = true; ++numFree; signal(blockFreed); } private: cond blockFreed; char blocks[N][BlockSize]; bool isFree[N]; int numFree; };
The MemoryAllocator monitor manages N memory blocks of sizeBlockSize. The allocBlock() function returns a pointer to apreviously unused block, and freeBlock() releases a usedblock. If allocBlock() is called when no blocks are available(numFree == 0), we wait until a block is freed.The next time freeBlock() is called, the blockFreed conditionwill be signaled, and allocBlock() will resume.
What essentially happens is that we free a block only to allocateit immediately afterwards. We can optimize the monitor a bit by handingover the newly released block directly to the thread that was tryingto allocate it. Here's the pseudo-code:
monitor MemoryAllocator { public: MemoryAllocator() { for (int i = 0; i < N; ++i) isFree[i] = true; numFree = N; numDelayed = 0; lastFreed = 0; } char *allocBlock() { if (numFree == 0) { ++numDelayed; wait(blockFreed); --numDelayed; return lastFreed; } else { for (int i = 0; i < N; ++i) { if (isFree[i]) { isFree[i] = false; --numFree; return blocks[i]; } } assert(false); return 0; } } void freeBlock(char *block) { int i = (block - blocks[0]) / BlockSize; assert(i >= 0 && i < N && !isFree[i]); if (numDelayed > 0) { lastFreed = block; signal(blockFreed); } else { isFree[i] = true; ++numFree; } } private: cond blockFreed; char blocks[N][BlockSize]; bool isFree[N]; int numFree; int numDelayed; char *lastFreed; };
The numDelayed variable is used to count how many threadsare waiting for a free block in allocBlock(). If numDelayed> 0 when freeBlock() is called, we simply signalblockFreed and store the freed block in lastFreed, where itcan be picked up by allocBlock().
If allocBlock() is called when numFree == 0,we increment numDelayed and wait until we are signaled. When thathappens, we can simply return lastFreed, instead of iterating throughall the blocks.
[править] Fairness and Priority Wait
One issue that constantly arises in multithreaded and distributedprograms is that of fairness. Fairness is necessary to avoidstarvation, which occurs when one thread is delayed indefinitelywhile other threads are accessing a resource.
Starvation can occur with the MemoryAllocator example abovebecause signal() (or the Qt functionQWaitCondition::wakeOne()) can wake up any thread that wakeson the condition---not necessarily the thread that has waited thelongest.
With the MemoryAllocator example, starvation is unlikely becausethe threads are symmetric and real-world implementations of threadstend to be sufficiently random. A more realistic example wherestarvation rears its ugly head is that of a read-write lock, such as QReadWriteLock. Such a lock allows many readers to access thesame data simultaneously, but only one writer. Furthermore, readersand writers may not be active at the same time.
The pseudo-code below shows one way to implement a ReadWriteLockmonitor:
monitor ReadWriteLock { public: ReadWriteLock() { numReaders = 0; numWriters = 0; } void lockForRead() { while (numWriters > 0) wait(unlocked); ++numReaders; } void lockForWrite() { while (numReaders + numWriters > 0) wait(unlocked); ++numWriters; } void unlockAfterRead() { assert(numReaders > 0); --numReaders; signalAll(unlocked); } void unlockAfterWrite() { assert(numWriters > 0); --numWriters; signalAll(unlocked); } private: cond unlocked; int numReaders; int numWriters; };
This monitor can be used to synchronize threads that access a sharedresource. For example, a reader thread would call lockForRead()before reading data from the resource and unlockAfterRead() when itis done.
The problem with the above code is that writers are likely to starveif there are many reader threads. Writers can only seize control ofthe lock when numReaders is 0. It's a bit like being a tourist inParis in July and expecting to find the Chвteau de Versailles totallyempty (and open!).
One standard way to solve this problem (thread starvation, notVersailles) is to let the readers and writers alternate. When awriter is waiting, we don't let any new reader start:
void lockForRead() { if (numWriters + delayedWriters > 0) { ++delayedReaders; do { readersMayProceed.wait(&mutex); } while (numWriters > 0); --delayedReaders; } ++numReaders; }
And when other threads are active, we don't let a new writer start:
void lockForWrite() { while (numReaders + numWriters > 0) { ++delayedWriters; checkInvariant(); oneWriterMayProceed.wait(&mutex); checkInvariant(); --delayedWriters; } ++numWriters; }
The unlockAfter...() functions must also be adapted:
void unlockAfterRead() { assert(numReaders > 0); --numReaders; if (numReaders == 0 && delayedWriters > 0) signal(oneWriterMayProceed); } void unlockAfterWrite() { assert(numWriters > 0); --numWriters; if (delayedReaders > 0) { signalAll(readersMayProceed); } else if (delayedWriters > 0) { signal(oneWriterMayProceed); } }
The new version of the ReadWriteLock monitor ensures that neitherthe readers nor the writers can monopolize the shared resource.Another way to solve this issue would be to use a priority queue andto enforce a strict "first come, first serve" policy. The queue canbe stored explicitly as a QQueue of thread IDs, or implicitly byhaving the threads take a number, just as you would do when you go tothe bakery (at least in Norway). Here's an implementation of theBankAccount monitor using the "take a number" approach:
monitor BankAccount { public: BankAccount() { balance = 0; firstNo = 0; nextNo = 0; } void withdraw(uint amount) { int myNo = nextNo++; while (firstNo != myNo || amount > balance) wait(moreMoney); balance -= amount; ++firstNo; } void deposit(uint amount) { balance += amount; signalAll(moreMoney); } private: cond moreMoney; int balance; int firstNo; int nextNo; };
With the "first come, first served" policy, if there is $500 in theaccount, a thread that requests only $20 might have to wait for onethat requested $1000. This might seem unfortunate, but it is oftenthe price to pay to avoid starvation. Using a different datastructure, we could implement a different scheduling policy, such as"shortest job next" or "round robin".
[править] Assumptions and Invariants
Multithreaded programming is difficult because there is always thedanger that several threads might access the same data structuresimultaneously and leave it in an undesirable state. Monitors, withtheir implicit mutual exclusion, make this less likely.
It may still pay off to add explicit checks in the code to ensurethat the data structure is in a consistent state whenever a threadenters or leaves a monitor (including before and after calls towait()). The concept of a consistent state is typicallyformulated as a Boolean expression, called an invariant, thatshould be true whenever a thread enters or leaves the monitor.
Here's the BankAccount code, with calls to thecheckInvariant() private function in the appropriate places:
BankAccount() { balance = 0; §checkInvariant(); } void withdraw(uint amount) { §checkInvariant(); while (amount > balance) { §checkInvariant(); wait(moreMoney); §checkInvariant(); } balance -= amount; §checkInvariant(); } void deposit(uint amount) { §checkInvariant(); balance += amount; signalAll(moreMoney); §checkInvariant(); } void checkInvariant() const { assert(balance >= 0); }
For BankAccount, the invariant is trivial, and it is easy tocheck that it holds. [1] For ReadWriteLock, the invariant is slightly more complex:
void checkInvariant() const { assert((numReaders == 0 || numWriters == 0) && numWriters <= 1); }
For MemoryAllocator, we can check that numFree is in syncwith the isFree array by iterating over isFree:
void checkInvariant() const { int actualNumFree = 0; for (int i = 0; i < N; ++i) { if (isFree[i]) ++actualNumFree; } assert(numFree == actualNumFree); }
The last invariant is fairly expensive to compute, so it's probablybetter to leave it out of production code, or to reorganize the datastructure so that numFree is no longer necessary.
In a well-written monitor, the invariant should hold whenever athread enters or leaves the monitor, no matter which monitorfunctions are called, and when. To avoid entering bad states as a resultof calling the methods in the wrong order, it is a good idea todetect these cases and abort. We already used this approach in theReadWriteLock monitor's unlockAfterRead() andunlockAfterWrite() functions to guard against spuriouscalls. Aborting may sound like a drastic measure, but it is alwayspreferable to producing incorrect results or corrupting a database.
[править] Monitors versus Semaphores
A semaphore is a low-level synchronization tool that provides twooperations: acquire() and release(). A special case of asemaphore is that of a binary semaphore, often called a mutex.The mutex operation lock() corresponds to an acquire(), andunlock() corresponds to release(). General (non-binary)semaphores can be used to guard n identical resources.
For all the problems we have studied so far, we could have come upwith a solution that uses only QSemaphore or QMutex. However,by using the higher-level monitor construct, we could focus on thebusiness logic and spend little time thinking about threadsynchronization.
To illustrate the fundamental differences between monitors andsemaphores, we will study the code of a Semaphore monitor:
monitor Semaphore { public: Semaphore() { value = 0; } void release() { ++value; signal(released); } void acquire() { if (value == 0) wait(released); --value; } private: cond released; int value; };
As the example illustrates, there is a strong connection between asemaphore release() and a monitor signal() on the one hand,and between a semaphore acquire() and a monitor wait() onthe other.There are, however, two important differences:
- Calls to release() are "remembered", even when nothread is waiting to acquire the semaphore. In contrast, signal()is effectively a no-op if no thread is waiting on the condition atthe time when it is signaled.
- Calls to acquire() only block if the semaphore's value is 0.In contrast, wait() always blocks.
Incidentally, the above monitor can be rewritten to use the "passthe condition" idiom:
monitor Semaphore { public: Semaphore() { value = 0; delayed = 0; } void release() { if (delayed == 0) ++value; else signal(released); } void acquire() { if (value == 0) { ++delayed; wait(released); --delayed; } else { --value; } } private: cond released; int value; int delayed; };
On platforms that guarantee a FIFO (first in, first out) orderingwhen signaling wait conditions, the above monitor implements a FIFOsemaphore. Alternatively, we could use an implicit or explicitpriority queue to order the pending acquire() calls.
[править] Conclusion
The monitor mechanism was proposed in the early 1970s as a structuredsolution to the problem of synchronizing threads in an operatingsystem. It went out of fashion during the 1980s and was granted asecond life by Java's designers.
Many Qt programs use QMutex to avoid race conditions whenaccessing shared data. By hiding the data inside a class and throwinga QWaitCondition into the mix, we can obtain finer control over theorder in which threads are executed, as we saw with theReadWriteLock example and the "take a number" version of theBankAccount monitor. And once we have all the code manipulatingthe shared data structure in a centralized place, it is easy to addrun-time checks to ensure that the data structure never enters anundesirable state.
Despite this, it is often a matter of taste whether one should use QSemaphore or QWaitCondition to solve a particular problem.For example, the QReadWriteLock class is implemented in terms of QWaitCondition, whereas the ReadWriteMutex class described inImplementing a Read/Write Mutex by Volker Hilsheimer(Qt Quarterly issue 11) uses a QSemaphore.[1] Unless someone kindly deposits$2,147,483,648 in our bank account, resulting in an integer wraparound.