Commit a3fd9e6b authored by Vladislav Vaintroub's avatar Vladislav Vaintroub

MDEV-29367 Refactor tpool::cache

Removed use std::vector's ba push_back(), pop_back() to  make it more
obvious that memory in the vectors won't be reallocated.

Also, "borrowed" elements can be debugged a little better now,
they are put into the start of the m_cache vector.
parent 720fa05e
...@@ -121,7 +121,7 @@ class io_slots ...@@ -121,7 +121,7 @@ class io_slots
size_t pending_io_count() size_t pending_io_count()
{ {
return (size_t)m_max_aio - m_cache.size(); return m_cache.pos();
} }
tpool::task_group* get_task_group() tpool::task_group* get_task_group()
......
...@@ -40,79 +40,121 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ ...@@ -40,79 +40,121 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
namespace tpool namespace tpool
{ {
enum cache_notification_mode
{
NOTIFY_ONE,
NOTIFY_ALL
};
/** /**
Generic "pointer" cache of a fixed size Generic "pointer" cache of a fixed size
with fast put/get operations. with fast put/get operations.
Compared to STL containers, is faster/does not Compared to STL containers,e.g stack or queue
do allocations. However, put() operation will wait is faster/does not do allocations.
if there is no free items.
However, get() operation will wait if there is no free items.
We assume that put() will only put back the elements that
were retrieved previously with get().
*/ */
template<typename T> class cache template<typename T> class cache
{ {
/** Protects updates of m_pos and m_cache members */
std::mutex m_mtx; std::mutex m_mtx;
/**
Notify waiting threads about "cache full" or "cache not empty" conditions
@see get() and wait()
*/
std::condition_variable m_cv; std::condition_variable m_cv;
std::vector<T> m_base;
/** Cached items vector.Does not change after construction */
std::vector<T> m_base;
/**
Pointers to cached items. Protected by m_mtx. Does not grow after
construction. Elements in position [0,m_pos-1] are "borrowed",
elements in position [m_pos,capacity()-1] are "free"
*/
std::vector<T*> m_cache; std::vector<T*> m_cache;
cache_notification_mode m_notification_mode;
/** Number of threads waiting for "cache full" condition (s. wait())
Protected by m_mtx */
int m_waiters; int m_waiters;
/** Current cache size. Protected by m_mtx*/
size_t m_pos;
private:
inline size_t capacity()
{
return m_base.size();
}
/**
@return true if cache is full (no items are borrowed)
*/
bool is_full() bool is_full()
{ {
return m_cache.size() == m_base.size(); return m_pos == 0;
}
/**
@return true if cache is empty (all items are borrowed)
*/
bool is_empty()
{
return m_pos == capacity();
} }
public: public:
cache(size_t count, cache_notification_mode mode= tpool::cache_notification_mode::NOTIFY_ALL): /**
m_mtx(), m_cv(), m_base(count),m_cache(count), m_notification_mode(mode),m_waiters() Constructor
@param size - maximum number of items in cache
*/
cache(size_t size) : m_mtx(), m_cv(), m_base(size), m_cache(size),
m_waiters(), m_pos(0)
{ {
for(size_t i = 0 ; i < count; i++) for(size_t i= 0 ; i < size; i++)
m_cache[i]=&m_base[i]; m_cache[i]= &m_base[i];
} }
T* get(bool blocking=true) /**
Retrieve an item from cache. Waits for free item, if cache is
currently empty.
@return borrowed item
*/
T* get()
{ {
std::unique_lock<std::mutex> lk(m_mtx); std::unique_lock<std::mutex> lk(m_mtx);
if (blocking) while(is_empty())
{ m_cv.wait(lk);
while(m_cache.empty()) assert(m_pos < capacity());
m_cv.wait(lk); // return last element
} return m_cache[m_pos++];
else
{
if(m_cache.empty())
return nullptr;
}
T* ret = m_cache.back();
m_cache.pop_back();
return ret;
} }
/**
Put back an item to cache.
@param item - item to put back
*/
void put(T *ele) void put(T *ele)
{ {
std::unique_lock<std::mutex> lk(m_mtx); std::unique_lock<std::mutex> lk(m_mtx);
m_cache.push_back(ele); assert(!is_full());
if (m_notification_mode == NOTIFY_ONE) // put element to the logical end of the array
m_cv.notify_one(); m_cache[--m_pos] = ele;
else if(m_cache.size() == 1)
m_cv.notify_all(); // Signal cache is not empty /* Notify waiters when the cache becomes
else if(m_waiters && is_full()) not empty, or when it becomes full */
m_cv.notify_all(); // Signal cache is full if (m_pos == 1 || (m_waiters && is_full()))
m_cv.notify_all();
} }
/** Check if pointer represents cached element */
bool contains(T* ele) bool contains(T* ele)
{ {
return ele >= &m_base[0] && ele <= &m_base[m_base.size() -1]; // No locking required, m_base does not change after construction.
return ele >= &m_base[0] && ele <= &m_base[capacity() - 1];
} }
/* Wait until cache is full.*/ /** Wait until cache is full.*/
void wait() void wait()
{ {
std::unique_lock<std::mutex> lk(m_mtx); std::unique_lock<std::mutex> lk(m_mtx);
...@@ -122,9 +164,13 @@ template<typename T> class cache ...@@ -122,9 +164,13 @@ template<typename T> class cache
m_waiters--; m_waiters--;
} }
TPOOL_SUPPRESS_TSAN size_t size() /**
@return approximate number of "borrowed" items.
A "dirty" read, not used in any critical functionality.
*/
TPOOL_SUPPRESS_TSAN size_t pos()
{ {
return m_cache.size(); return m_pos;
} }
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment