Commit 09989a60 authored by Leif Walsh's avatar Leif Walsh Committed by Yoni Fogel

refs #5351 add comments and a timed* api


git-svn-id: file:///svn/toku/tokudb@48727 c7de825b-a66e-492c-adef-691d508d4ae1
parent 45e1dfd1
...@@ -107,6 +107,27 @@ namespace toku { ...@@ -107,6 +107,27 @@ namespace toku {
return pushed; return pushed;
} }
template<typename T>
bool circular_buffer<T>::timedpush(const T &elt, toku_timespec_t *abstime) {
bool pushed = false;
invariant_notnull(abstime);
lock();
if (is_empty()) {
++m_push_waiters;
int r = toku_cond_timedwait(&m_push_cond, &m_lock, abstime);
if (r != 0) {
invariant(r == ETIMEDOUT);
}
--m_push_waiters;
}
if (!is_full() && m_push_waiters == 0) {
push_and_maybe_signal_unlocked(elt);
pushed = true;
}
unlock();
return pushed;
}
template<typename T> template<typename T>
T circular_buffer<T>::pop_and_maybe_signal_unlocked(void) { T circular_buffer<T>::pop_and_maybe_signal_unlocked(void) {
toku_mutex_assert_locked(&m_lock); toku_mutex_assert_locked(&m_lock);
...@@ -145,4 +166,26 @@ namespace toku { ...@@ -145,4 +166,26 @@ namespace toku {
return popped; return popped;
} }
template<typename T>
bool circular_buffer<T>::timedpop(T * const eltp, toku_timespec_t *abstime) {
bool popped = false;
invariant_notnull(eltp);
invariant_notnull(abstime);
lock();
if (is_empty()) {
++m_pop_waiters;
int r = toku_cond_timedwait(&m_pop_cond, &m_lock, abstime);
if (r != 0) {
invariant(r == ETIMEDOUT);
}
--m_pop_waiters;
}
if (!is_empty()) {
*eltp = pop_and_maybe_signal_unlocked();
popped = true;
}
unlock();
return popped;
}
} }
...@@ -13,24 +13,88 @@ ...@@ -13,24 +13,88 @@
namespace toku { namespace toku {
// The circular buffer manages an array of elements as a thread-safe FIFO queue.
// It does not allocate its own space, or grow the space it manages.
// Access to the circular buffer is managed by a mutex.
// The blocking operations are managed by condition variables. They are as fairly scheduled as the threading library supports.
//
// Sample usage:
// int array[2]
// circular_buffer<int> intbuf;
// intbuf.init(array, 2);
//
// // thread A
// intbuf.push(1);
// intbuf.push(2);
// intbuf.push(3); // <- blocks until thread B runs
//
// // thread B
// int a = intbuf.pop(); // <- 1
// int b = intbuf.pop(); // <- 2
// int c = intbuf.pop(); // <- 3
// int d = intbuf.pop(); // <- blocks until more elements are available
template<typename T> template<typename T>
class circular_buffer { class circular_buffer {
public: public:
__attribute__((nonnull))
void init(T * const array, size_t cap);
// Effect:
// Initialize the circular buffer with an array of elements to manage.
// Requires:
// array must remain valid until deinit() is called.
void init(T * const array, size_t cap) __attribute__((nonnull));
// Effect:
// Deinitialize the circular buffer. Destroys mutex and condition variables, checks for errors.
// Requires:
// Must be empty, use trypop() to drain everything before calling deinit().
// Must be free of waiters, no outstanding calls to push() or pop(), trypush() sentinels to flush waiters if necessary.
void deinit(void); void deinit(void);
void push(const T &); // Effect:
// Append elt to the end of the queue.
__attribute__((warn_unused_result)) // Notes:
bool trypush(const T &); // Blocks until there is room in the array.
void push(const T &elt);
__attribute__((warn_unused_result))
T pop(void); // Effect:
// Append elt to the end of the queue if there's room and nobody is waiting to push.
__attribute__((nonnull, warn_unused_result)) // Notes:
bool trypop(T * const); // Doesn't block.
// Returns:
// true iff elt was appended
bool trypush(const T &elt) __attribute__((warn_unused_result));
// Effect:
// Append elt to the end of the queue if there's room before abstime.
// Notes:
// Blocks until at most abstime waiting for room in the queue. See pthread_cond_timedwait(3) for an example of how to use abstime.
// Returns:
// true iff elt was appended
bool timedpush(const T &elt, toku_timespec_t *abstime) __attribute__((nonnull, warn_unused_result));
// Effect:
// Remove the first item from the queue and return it.
// Notes:
// Blocks until there is something to return.
T pop(void) __attribute__((warn_unused_result));
// Effect:
// Remove the first item from the queue and return it, if one exists.
// Notes:
// Doesn't block.
// Returns the element in *eltp.
// Returns:
// true iff *eltp was set
bool trypop(T * const eltp) __attribute__((nonnull, warn_unused_result));
// Effect:
// Remove the first item from the queue and return it, if one exists before abstime
// Notes:
// Blocks until at most abstime waiting for room in the queue. See pthread_cond_timedwait(3) for an example of how to use abstime.
// Returns the element in *eltp.
// Returns:
// true iff *eltp was set
bool timedpop(T * const eltp, toku_timespec_t *abstime) __attribute__((nonnull, warn_unused_result));
private: private:
void lock(void); void lock(void);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment