2010年12月11日 星期六

Condition variable

A mutex is used to cause other threads to wait while the thread holding the mutex executes code in a critical section.

In contrast, a condition variable is typically used by a thread to make itself wait until an expression involving shared data attains a particular state.

在 POSIX-supported platform 上, 問題非常簡單, 有 pthread interface 可以用.

// Copyright (c) 2006-2008 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
class ConditionVariable {
public:
explicit ConditionVariable(Lock* user_lock);
~ConditionVariable();

void Wait();
void TimedWait(const base::TimeDelta& max_time);

void Broadcast();
void Signal();

private:
pthread_cond_t condition_;
pthread_mutex_t* user_mutex_;
};

ConditionVariable::ConditionVariable(Lock* user_lock)
: user_mutex_(user_lock->lock_.os_lock())
{
pthread_cond_init(&condition_, NULL);
}

ConditionVariable::~ConditionVariable() {
pthread_cond_destroy(&condition_);
}

void ConditionVariable::Wait() {
pthread_cond_wait(&condition_, user_mutex_);
}

void ConditionVariable::TimedWait(const TimeDelta& max_time) {
int64 usecs = max_time.InMicroseconds();

// The timeout argument to pthread_cond_timedwait is in absolute time.
struct timeval now;
gettimeofday(&now, NULL);

struct timespec abstime;
abstime.tv_sec = now.tv_sec + (usecs / Time::kMicrosecondsPerSecond);
abstime.tv_nsec = (now.tv_usec + (usecs % Time::kMicrosecondsPerSecond)) *
Time::kNanosecondsPerMicrosecond;
abstime.tv_sec += abstime.tv_nsec / Time::kNanosecondsPerSecond;
abstime.tv_nsec %= Time::kNanosecondsPerSecond;

pthread_cond_timedwait(&condition_, user_mutex_, &abstime);
}

void ConditionVariable::Broadcast() {
pthread_cond_broadcast(&condition_);
}

void ConditionVariable::Signal() {
pthread_cond_signal(&condition_);
}

在 Windows platform 上, 雖然有新的 APIs: InitializeConditionVariable(), SleepConditionVariableCS(), SleepConditionVariableSRW(), WakeAllConditionVariable(), WakeConditionVariable(). 但舊的平台仍不支援,可憐的 programmer 得自己造輪子,或是用 3rd-party lib 來玩 cv.

先看 Windows platform 上 C++ class 怎麼表示他:

class ConditionVariable {
public:
...
private:
class Event {
public:
Event();
~Event();

void InitListElement();

bool IsEmpty() const;
void PushBack(Event* other);
Event* PopFront();
Event* PopBack();

HANDLE handle() const;
Event* Extract();

bool IsSingleton() const;

private:
HANDLE handle_;
Event* next_;
Event* prev_;
};

// Note that RUNNING is an unlikely number to have in RAM by accident.
// This helps with defensive destructor coding in the face of user error.
enum RunState { SHUTDOWN = 0, RUNNING = 64213 };

// Internal implementation methods supporting Wait().
Event* GetEventForWaiting();
void RecycleEvent(Event* used_event);

RunState run_state_;

// Private critical section for access to member data.
Lock internal_lock_;

// Lock that is acquired before calling Wait().
Lock& user_lock_;

// Events that threads are blocked on.
Event waiting_list_;

// Free list for old events.
Event recycling_list_;
int recycling_list_size_;

// The number of allocated, but not yet deleted events.
int allocation_counter_;
};

用 private class Event 來模擬 cv. Implementation 還挺複雜的,可以先看這篇文章: Strategies for Implementing POSIX Condition Variables on Win32. 難點在於: However, extreme care must be taken with Win32 events to ensure that there are no race conditions introduced when switching from one mechanism to another. Unfortunately, there's no way to release just one waiting thread with a manual-reset event. Likewise, there's no way to release all waiting threads with an auto-reset event.

Google implementation 簡單說就是用 create auto-reset event + SetEvent() + WaitForSingleObject(),現在來一一破解:

void ConditionVariable::Wait() {
TimedWait(TimeDelta::FromMilliseconds(INFINITE));
}

void ConditionVariable::TimedWait(const TimeDelta& max_time) {
Event* waiting_event;
HANDLE handle;
{
AutoLock auto_lock(internal_lock_);
if (RUNNING != run_state_) return; // Destruction in progress.
waiting_event = GetEventForWaiting();
handle = waiting_event->handle();
} // Release internal_lock.

{
AutoUnlock unlock(user_lock_); // Release caller's lock
WaitForSingleObject(handle, static_cast(max_time.InMilliseconds()));
// Minimize spurious signal creation window by recycling asap.
AutoLock auto_lock(internal_lock_);
RecycleEvent(waiting_event);
// Release internal_lock_
} // Reacquire callers lock to depth at entry.
}

void ConditionVariable::Broadcast() {
std::stack handles; // See FAQ-question-10.
{
AutoLock auto_lock(internal_lock_);
if (waiting_list_.IsEmpty())
return;
while (!waiting_list_.IsEmpty())
// This is not a leak from waiting_list_. See FAQ-question 12.
handles.push(waiting_list_.PopBack()->handle());
} // Release internal_lock_.
while (!handles.empty()) {
SetEvent(handles.top());
handles.pop();
}
}

void ConditionVariable::Signal() {
HANDLE handle;
{
AutoLock auto_lock(internal_lock_);
if (waiting_list_.IsEmpty())
return; // No one to signal.
// Only performance option should be used.
// This is not a leak from waiting_list. See FAQ-question 12.
handle = waiting_list_.PopBack()->handle(); // LIFO.
} // Release internal_lock_.
SetEvent(handle);
}

ConditionVariable::Event* ConditionVariable::GetEventForWaiting() {
// We hold internal_lock, courtesy of Wait().
Event* cv_event;
if (0 == recycling_list_size_) {
cv_event = new Event();
cv_event->InitListElement();
allocation_counter_++;
} else {
cv_event = recycling_list_.PopFront();
recycling_list_size_--;
}
waiting_list_.PushBack(cv_event);
return cv_event;
}

void ConditionVariable::RecycleEvent(Event* used_event) {
// We hold internal_lock, courtesy of Wait().
// If the cv_event timed out, then it is necessary to remove it from
// waiting_list_. If it was selected by Broadcast() or Signal(), then it is
// already gone.
used_event->Extract(); // Possibly redundant
recycling_list_.PushBack(used_event);
recycling_list_size_++;
}

然後 Event class implementation:

// Event provides a doubly-linked-list of events for use exclusively by the
// ConditionVariable class.

// This custom container was crafted because no simple combination of STL
// classes appeared to support the functionality required. The specific
// unusual requirement for a linked-list-class is support for the Extract()
// method, which can remove an element from a list, potentially for insertion
// into a second list. Most critically, the Extract() method is idempotent,
// turning the indicated element into an extracted singleton whether it was
// contained in a list or not. This functionality allows one (or more) of
// threads to do the extraction. The iterator that identifies this extractable
// element (in this case, a pointer to the list element) can be used after
// arbitrary manipulation of the (possibly) enclosing list container. In
// general, STL containers do not provide iterators that can be used across
// modifications (insertions/extractions) of the enclosing containers, and
// certainly don't provide iterators that can be used if the identified
// element is *deleted* (removed) from the container.

// It is possible to use multiple redundant containers, such as an STL list,
// and an STL map, to achieve similar container semantics. This container has
// only O(1) methods, while the corresponding (multiple) STL container approach
// would have more complex O(log(N)) methods (yeah... N isn't that large).
// Multiple containers also makes correctness more difficult to assert, as
// data is redundantly stored and maintained, which is generally evil.

ConditionVariable::Event::Event() : handle_(0) {
next_ = prev_ = this; // Self referencing circular.
}

ConditionVariable::Event::~Event() {
if (0 == handle_) {
// This is the list holder
while (!IsEmpty()) {
Event* cv_event = PopFront();
delete cv_event;
}
}
if (0 != handle_) {
CloseHandle(handle_);
}
}

// Change a container instance permanently into an element of a list.
void ConditionVariable::Event::InitListElement() {
handle_ = CreateEvent(NULL, false, false, NULL);
}

// Methods for use on lists.
bool ConditionVariable::Event::IsEmpty() const {
return IsSingleton();
}

void ConditionVariable::Event::PushBack(Event* other) {
// Prepare other for insertion.
other->prev_ = prev_;
other->next_ = this;
// Cut into list.
prev_->next_ = other;
prev_ = other;
}

ConditionVariable::Event* ConditionVariable::Event::PopFront() {
return next_->Extract();
}

ConditionVariable::Event* ConditionVariable::Event::PopBack() {
return prev_->Extract();
}

// Methods for use on list elements.
// Accessor method.
HANDLE ConditionVariable::Event::handle() const {
return handle_;
}

// Pull an element from a list (if it's in one).
ConditionVariable::Event* ConditionVariable::Event::Extract() {
if (!IsSingleton()) {
// Stitch neighbors together.
next_->prev_ = prev_;
prev_->next_ = next_;
// Make extractee into a singleton.
prev_ = next_ = this;
}
return this;
}

// Method for use on a list element or on a list.
bool ConditionVariable::Event::IsSingleton() const {
return next_ == this;
}


謝謝收看。

沒有留言:

張貼留言