ndb dd -

  fix static log waiter pool
parent 4c485eb1
...@@ -131,6 +131,7 @@ Cmvmi::Cmvmi(Block_context& ctx) : ...@@ -131,6 +131,7 @@ Cmvmi::Cmvmi(Block_context& ctx) :
Cmvmi::~Cmvmi() Cmvmi::~Cmvmi()
{ {
m_shared_page_pool.clear();
} }
......
...@@ -33,6 +33,8 @@ ...@@ -33,6 +33,8 @@
#include <EventLogger.hpp> #include <EventLogger.hpp>
extern EventLogger g_eventLogger; extern EventLogger g_eventLogger;
#include <record_types.hpp>
/** /**
* ---<a>-----<b>-----<c>-----<d>---> (time) * ---<a>-----<b>-----<c>-----<d>---> (time)
* *
...@@ -95,7 +97,6 @@ Lgman::Lgman(Block_context & ctx) : ...@@ -95,7 +97,6 @@ Lgman::Lgman(Block_context & ctx) :
m_logfile_group_hash.setSize(10); m_logfile_group_hash.setSize(10);
m_file_pool.setSize(10); m_file_pool.setSize(10);
m_data_buffer_pool.setSize(10); m_data_buffer_pool.setSize(10);
m_log_waiter_pool.setSize(10000);
} }
Lgman::~Lgman() Lgman::~Lgman()
...@@ -118,6 +119,10 @@ Lgman::execREAD_CONFIG_REQ(Signal* signal) ...@@ -118,6 +119,10 @@ Lgman::execREAD_CONFIG_REQ(Signal* signal)
m_ctx.m_config.getOwnConfigIterator(); m_ctx.m_config.getOwnConfigIterator();
ndbrequire(p != 0); ndbrequire(p != 0);
Pool_context pc;
pc.m_block = this;
m_log_waiter_pool.wo_pool_init(RT_LGMAN_LOG_WAITER, pc);
ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend(); ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
conf->senderRef = reference(); conf->senderRef = reference();
conf->senderData = senderData; conf->senderData = senderData;
...@@ -271,9 +276,9 @@ Lgman::execDUMP_STATE_ORD(Signal* signal){ ...@@ -271,9 +276,9 @@ Lgman::execDUMP_STATE_ORD(Signal* signal){
if (!ptr.p->m_log_buffer_waiters.isEmpty()) if (!ptr.p->m_log_buffer_waiters.isEmpty())
{ {
Uint32 free_buffer= ptr.p->m_free_buffer_words; Uint32 free_buffer= ptr.p->m_free_buffer_words;
LocalDLFifoList<Log_waiter>
list(m_log_waiter_pool, ptr.p->m_log_buffer_waiters);
Ptr<Log_waiter> waiter; Ptr<Log_waiter> waiter;
Local_log_waiter_list
list(m_log_waiter_pool, ptr.p->m_log_buffer_waiters);
list.first(waiter); list.first(waiter);
infoEvent(" free_buffer_words: %d head(waiters).sz: %d %d", infoEvent(" free_buffer_words: %d head(waiters).sz: %d %d",
ptr.p->m_free_buffer_words, ptr.p->m_free_buffer_words,
...@@ -282,13 +287,21 @@ Lgman::execDUMP_STATE_ORD(Signal* signal){ ...@@ -282,13 +287,21 @@ Lgman::execDUMP_STATE_ORD(Signal* signal){
} }
if (!ptr.p->m_log_sync_waiters.isEmpty()) if (!ptr.p->m_log_sync_waiters.isEmpty())
{ {
LocalDLFifoList<Log_waiter>
list(m_log_waiter_pool, ptr.p->m_log_sync_waiters);
Ptr<Log_waiter> waiter; Ptr<Log_waiter> waiter;
Local_log_waiter_list
list(m_log_waiter_pool, ptr.p->m_log_sync_waiters);
list.first(waiter); list.first(waiter);
infoEvent(" m_last_synced_lsn: %lld: %d head(waiters).m_sync_lsn: %lld", infoEvent(" m_last_synced_lsn: %lld head(waiters %x).m_sync_lsn: %lld",
ptr.p->m_last_synced_lsn, ptr.p->m_last_synced_lsn,
waiter.i,
waiter.p->m_sync_lsn); waiter.p->m_sync_lsn);
while(!waiter.isNull())
{
ndbout_c("ptr: %x %p lsn: %lld next: %x",
waiter.i, waiter.p, waiter.p->m_sync_lsn, waiter.p->nextList);
list.next(waiter);
}
} }
m_logfile_group_list.next(ptr); m_logfile_group_list.next(ptr);
} }
...@@ -1031,7 +1044,7 @@ Logfile_client::sync_lsn(Signal* signal, ...@@ -1031,7 +1044,7 @@ Logfile_client::sync_lsn(Signal* signal,
bool empty= false; bool empty= false;
Ptr<Lgman::Log_waiter> wait; Ptr<Lgman::Log_waiter> wait;
{ {
LocalDLFifoList<Lgman::Log_waiter> Lgman::Local_log_waiter_list
list(m_lgman->m_log_waiter_pool, ptr.p->m_log_sync_waiters); list(m_lgman->m_log_waiter_pool, ptr.p->m_log_sync_waiters);
empty= list.isEmpty(); empty= list.isEmpty();
...@@ -1065,8 +1078,7 @@ Lgman::force_log_sync(Signal* signal, ...@@ -1065,8 +1078,7 @@ Lgman::force_log_sync(Signal* signal,
Ptr<Logfile_group> ptr, Ptr<Logfile_group> ptr,
Uint32 lsn_hi, Uint32 lsn_lo) Uint32 lsn_hi, Uint32 lsn_lo)
{ {
LocalDLFifoList<Lgman::Log_waiter> Local_log_waiter_list list(m_log_waiter_pool, ptr.p->m_log_sync_waiters);
list(m_log_waiter_pool, ptr.p->m_log_sync_waiters);
Uint64 force_lsn = lsn_hi; force_lsn <<= 32; force_lsn += lsn_lo; Uint64 force_lsn = lsn_hi; force_lsn <<= 32; force_lsn += lsn_lo;
if(ptr.p->m_last_sync_req_lsn < force_lsn) if(ptr.p->m_last_sync_req_lsn < force_lsn)
...@@ -1125,7 +1137,7 @@ Lgman::force_log_sync(Signal* signal, ...@@ -1125,7 +1137,7 @@ Lgman::force_log_sync(Signal* signal,
void void
Lgman::process_log_sync_waiters(Signal* signal, Ptr<Logfile_group> ptr) Lgman::process_log_sync_waiters(Signal* signal, Ptr<Logfile_group> ptr)
{ {
LocalDLFifoList<Log_waiter> Local_log_waiter_list
list(m_log_waiter_pool, ptr.p->m_log_sync_waiters); list(m_log_waiter_pool, ptr.p->m_log_sync_waiters);
if(list.isEmpty()) if(list.isEmpty())
...@@ -1144,7 +1156,7 @@ Lgman::process_log_sync_waiters(Signal* signal, Ptr<Logfile_group> ptr) ...@@ -1144,7 +1156,7 @@ Lgman::process_log_sync_waiters(Signal* signal, Ptr<Logfile_group> ptr)
SimulatedBlock* b = globalData.getBlock(block); SimulatedBlock* b = globalData.getBlock(block);
b->execute(signal, waiter.p->m_callback, 0); b->execute(signal, waiter.p->m_callback, 0);
list.release(waiter); list.releaseFirst(waiter);
} }
if(removed && !list.isEmpty()) if(removed && !list.isEmpty())
...@@ -1260,7 +1272,7 @@ Logfile_client::get_log_buffer(Signal* signal, Uint32 sz, ...@@ -1260,7 +1272,7 @@ Logfile_client::get_log_buffer(Signal* signal, Uint32 sz,
bool empty= false; bool empty= false;
{ {
Ptr<Lgman::Log_waiter> wait; Ptr<Lgman::Log_waiter> wait;
LocalDLFifoList<Lgman::Log_waiter> Lgman::Local_log_waiter_list
list(m_lgman->m_log_waiter_pool, ptr.p->m_log_buffer_waiters); list(m_lgman->m_log_waiter_pool, ptr.p->m_log_buffer_waiters);
empty= list.isEmpty(); empty= list.isEmpty();
...@@ -1486,7 +1498,7 @@ void ...@@ -1486,7 +1498,7 @@ void
Lgman::process_log_buffer_waiters(Signal* signal, Ptr<Logfile_group> ptr) Lgman::process_log_buffer_waiters(Signal* signal, Ptr<Logfile_group> ptr)
{ {
Uint32 free_buffer= ptr.p->m_free_buffer_words; Uint32 free_buffer= ptr.p->m_free_buffer_words;
LocalDLFifoList<Log_waiter> Local_log_waiter_list
list(m_log_waiter_pool, ptr.p->m_log_buffer_waiters); list(m_log_waiter_pool, ptr.p->m_log_buffer_waiters);
if(list.isEmpty()) if(list.isEmpty())
...@@ -1505,7 +1517,7 @@ Lgman::process_log_buffer_waiters(Signal* signal, Ptr<Logfile_group> ptr) ...@@ -1505,7 +1517,7 @@ Lgman::process_log_buffer_waiters(Signal* signal, Ptr<Logfile_group> ptr)
SimulatedBlock* b = globalData.getBlock(block); SimulatedBlock* b = globalData.getBlock(block);
b->execute(signal, waiter.p->m_callback, 0); b->execute(signal, waiter.p->m_callback, 0);
list.release(waiter); list.releaseFirst(waiter);
} }
if(removed && !list.isEmpty()) if(removed && !list.isEmpty())
......
...@@ -28,6 +28,9 @@ ...@@ -28,6 +28,9 @@
#include "diskpage.hpp" #include "diskpage.hpp"
#include <signaldata/GetTabInfo.hpp> #include <signaldata/GetTabInfo.hpp>
#include <WOPool.hpp>
#include <SLFifoList.hpp>
class Lgman : public SimulatedBlock class Lgman : public SimulatedBlock
{ {
public: public:
...@@ -77,18 +80,19 @@ protected: ...@@ -77,18 +80,19 @@ protected:
public: public:
struct Log_waiter struct Log_waiter
{ {
Callback m_callback;
union { union {
Uint32 m_size; Uint32 m_size;
Uint64 m_sync_lsn; Uint64 m_sync_lsn;
}; };
Uint32 m_block; Uint32 m_block;
Callback m_callback; Uint32 nextList;
union { Uint32 m_magic;
Uint32 nextPool;
Uint32 nextList;
};
Uint32 prevList;
}; };
typedef RecordPool<Log_waiter, WOPool> Log_waiter_pool;
typedef SLFifoListImpl<Log_waiter_pool, Log_waiter> Log_waiter_list;
typedef LocalSLFifoListImpl<Log_waiter_pool, Log_waiter> Local_log_waiter_list;
struct Undofile struct Undofile
{ {
...@@ -183,13 +187,14 @@ public: ...@@ -183,13 +187,14 @@ public:
Logfile_group::LG_FLUSH_THREAD; Logfile_group::LG_FLUSH_THREAD;
Uint64 m_last_lsn; Uint64 m_last_lsn;
Uint64 m_last_sync_req_lsn; Uint64 m_last_sync_req_lsn; // Outstanding
Uint64 m_last_synced_lsn; Uint64 m_last_synced_lsn; //
Uint64 m_max_sync_req_lsn; // User requested lsn
union { union {
Uint64 m_last_read_lsn; Uint64 m_last_read_lsn;
Uint64 m_last_lcp_lsn; Uint64 m_last_lcp_lsn;
}; };
DLFifoList<Log_waiter>::Head m_log_sync_waiters; Log_waiter_list::Head m_log_sync_waiters;
Buffer_idx m_tail_pos[3]; // 0 is cut, 1 is saved, 2 is current Buffer_idx m_tail_pos[3]; // 0 is cut, 1 is saved, 2 is current
Buffer_idx m_file_pos[2]; // 0 tail, 1 head = { file_ptr_i, page_no } Buffer_idx m_file_pos[2]; // 0 tail, 1 head = { file_ptr_i, page_no }
...@@ -199,7 +204,7 @@ public: ...@@ -199,7 +204,7 @@ public:
DLFifoList<Undofile>::Head m_meta_files;// Files being created or dropped DLFifoList<Undofile>::Head m_meta_files;// Files being created or dropped
Uint32 m_free_buffer_words; // Free buffer page words Uint32 m_free_buffer_words; // Free buffer page words
DLFifoList<Log_waiter>::Head m_log_buffer_waiters; Log_waiter_list::Head m_log_buffer_waiters;
Page_map::Head m_buffer_pages; // Pairs of { ptr.i, count } Page_map::Head m_buffer_pages; // Pairs of { ptr.i, count }
struct Position { struct Position {
Buffer_idx m_current_page; // { m_buffer_pages.i, left in range } Buffer_idx m_current_page; // { m_buffer_pages.i, left in range }
...@@ -234,7 +239,7 @@ private: ...@@ -234,7 +239,7 @@ private:
friend class Logfile_client; friend class Logfile_client;
ArrayPool<Undofile> m_file_pool; ArrayPool<Undofile> m_file_pool;
ArrayPool<Logfile_group> m_logfile_group_pool; ArrayPool<Logfile_group> m_logfile_group_pool;
ArrayPool<Log_waiter> m_log_waiter_pool; Log_waiter_pool m_log_waiter_pool;
Page_map::DataBufferPool m_data_buffer_pool; Page_map::DataBufferPool m_data_buffer_pool;
......
...@@ -24,8 +24,7 @@ ...@@ -24,8 +24,7 @@
/** /**
* Operations for dd * Operations for dd
* PGMAN_PAGE_REQUEST * PGMAN_PAGE_REQUEST
* LGMAN_LOG_BUFFER_WAITER * LGMAN_LOG_WAITER
* LGMAN_LOG_SYNC_WAITER
* DBTUP_PAGE_REQUEST * DBTUP_PAGE_REQUEST
*/ */
#define RG_DISK_OPERATIONS 1 #define RG_DISK_OPERATIONS 1
...@@ -45,12 +44,11 @@ ...@@ -45,12 +44,11 @@
/** /**
* Record types * Record types
*/ */
#define PGMAN_PAGE_REQUEST MAKE_TID(1, RG_DISK_OPERATIONS) #define RT_PGMAN_PAGE_REQUEST MAKE_TID(1, RG_DISK_OPERATIONS)
#define LGMAN_LOG_BUFFER_WAITER MAKE_TID(2, RG_DISK_OPERATIONS) #define RT_LGMAN_LOG_WAITER MAKE_TID(2, RG_DISK_OPERATIONS)
#define LGMAN_LOG_SYNC_WAITER MAKE_TID(3, RG_DISK_OPERATIONS)
#define DBTUP_PAGE_REQUEST MAKE_TID(4, RG_DISK_OPERATIONS) #define RT_DBTUP_PAGE_REQUEST MAKE_TID(3, RG_DISK_OPERATIONS)
#define DBTUP_EXTENT_INFO MAKE_TID(5, RG_DISK_RECORDS) #define RT_DBTUP_EXTENT_INFO MAKE_TID(4, RG_DISK_RECORDS)
#endif #endif
...@@ -45,6 +45,7 @@ public: ...@@ -45,6 +45,7 @@ public:
*/ */
bool setSize(Uint32 noOfElements, bool align = false, bool exit_on_error = true, bool guard = true); bool setSize(Uint32 noOfElements, bool align = false, bool exit_on_error = true, bool guard = true);
bool set(T*, Uint32 cnt, bool align = false); bool set(T*, Uint32 cnt, bool align = false);
void clear() { theArray = 0; }
inline Uint32 getNoOfFree() const { inline Uint32 getNoOfFree() const {
return noOfFree; return noOfFree;
......
...@@ -19,7 +19,8 @@ libkernel_a_SOURCES = \ ...@@ -19,7 +19,8 @@ libkernel_a_SOURCES = \
SectionReader.cpp \ SectionReader.cpp \
Mutex.cpp SafeCounter.cpp \ Mutex.cpp SafeCounter.cpp \
Rope.cpp \ Rope.cpp \
ndbd_malloc.cpp ndbd_malloc_impl.cpp Pool.cpp ndbd_malloc.cpp ndbd_malloc_impl.cpp \
Pool.cpp WOPool.cpp
INCLUDES_LOC = -I$(top_srcdir)/storage/ndb/src/mgmapi INCLUDES_LOC = -I$(top_srcdir)/storage/ndb/src/mgmapi
......
...@@ -25,13 +25,19 @@ Pool_context::alloc_page(Uint32 type_id, Uint32 *i) ...@@ -25,13 +25,19 @@ Pool_context::alloc_page(Uint32 type_id, Uint32 *i)
} }
void void
Pool_context::release_page(Uint32 type_id, Uint32 i, void* p) Pool_context::release_page(Uint32 type_id, Uint32 i)
{ {
m_block->m_ctx.m_mm.release_page(type_id, i, p); m_block->m_ctx.m_mm.release_page(type_id, i);
}
void*
Pool_context::get_memroot()
{
return m_block->m_ctx.m_mm.get_memroot();
} }
void void
Pool_context::handle_abort(const AbortArg &) Pool_context::handleAbort(int err, const char * msg)
{ {
m_block->progError(__LINE__, err, msg);
} }
...@@ -17,9 +17,12 @@ ...@@ -17,9 +17,12 @@
#ifndef NDB_POOL_HPP #ifndef NDB_POOL_HPP
#define NDB_POOL_HPP #define NDB_POOL_HPP
#include <ndb_global.h>
#include <kernel_types.h> #include <kernel_types.h>
/** /**
* Type bits
*
* Type id is 11 bits record type, and 5 bits resource id * Type id is 11 bits record type, and 5 bits resource id
* -> 2048 different kind of records and 32 different resource groups * -> 2048 different kind of records and 32 different resource groups
* *
...@@ -31,6 +34,12 @@ ...@@ -31,6 +34,12 @@
#define RG_MASK ((1 << RG_BITS) - 1) #define RG_MASK ((1 << RG_BITS) - 1)
#define MAKE_TID(TID,RG) ((TID << RG_BITS) | RG) #define MAKE_TID(TID,RG) ((TID << RG_BITS) | RG)
/**
* Page bits
*/
#define POOL_RECORD_BITS 13
#define POOL_RECORD_MASK ((1 << POOL_RECORD_BITS) - 1)
/** /**
* Record_info * Record_info
* *
...@@ -57,6 +66,11 @@ struct Resource_limit ...@@ -57,6 +66,11 @@ struct Resource_limit
struct Pool_context struct Pool_context
{ {
class SimulatedBlock* m_block; class SimulatedBlock* m_block;
/**
* Get mem root
*/
void* get_memroot();
/** /**
* Alloc consekutive pages * Alloc consekutive pages
...@@ -74,8 +88,8 @@ struct Pool_context ...@@ -74,8 +88,8 @@ struct Pool_context
* @param i : in : i value of first page * @param i : in : i value of first page
* @param p : in : pointer to first page * @param p : in : pointer to first page
*/ */
void release_page(Uint32 type_id, Uint32 i, void* p); void release_page(Uint32 type_id, Uint32 i);
/** /**
* Alloc consekutive pages * Alloc consekutive pages
* *
...@@ -97,21 +111,12 @@ struct Pool_context ...@@ -97,21 +111,12 @@ struct Pool_context
* @param p : in : pointer to first page * @param p : in : pointer to first page
* @param cnt : in : no of pages to release * @param cnt : in : no of pages to release
*/ */
void release_pages(Uint32 type_id, Uint32 i, void* p, Uint32 cnt); void release_pages(Uint32 type_id, Uint32 i, Uint32 cnt);
/** /**
* Pool abort * Abort
* Only know issue is getPtr with invalid i-value.
* If other emerges, we will add argument to this method
*/ */
struct AbortArg void handleAbort(int code, const char* msg);
{
Uint32 m_expected_magic;
Uint32 m_found_magic;
Uint32 i;
void * p;
};
void handle_abort(const AbortArg &);
}; };
template <typename T> template <typename T>
...@@ -136,19 +141,180 @@ struct ConstPtr ...@@ -136,19 +141,180 @@ struct ConstPtr
/** /**
* Any pool should implement the following * Any pool should implement the following
*/ */
struct Pool struct PoolImpl
{ {
public: Pool_context m_ctx;
Pool(); Record_info m_record_info;
void init(const Record_info& ri, const Pool_context& pc); void init(const Record_info& ri, const Pool_context& pc);
void init(const Record_info& ri, const Pool_context& pc);
bool seize(Uint32*, void**);
void* seize(Uint32*); bool seize(Ptr<void>&);
void release(Ptr<void>);
void release(Uint32 i, void* p);
void * getPtr(Uint32 i); void * getPtr(Uint32 i);
}; };
#endif #endif
template <typename T, typename P>
class RecordPool {
public:
RecordPool();
~RecordPool();
void init(Uint32 type_id, const Pool_context& pc);
void wo_pool_init(Uint32 type_id, const Pool_context& pc);
/**
* Update p value for ptr according to i value
*/
void getPtr(Ptr<T> &);
void getPtr(ConstPtr<T> &) const;
/**
* Get pointer for i value
*/
T * getPtr(Uint32 i);
const T * getConstPtr(Uint32 i) const;
/**
* Update p & i value for ptr according to <b>i</b> value
*/
void getPtr(Ptr<T> &, Uint32 i);
void getPtr(ConstPtr<T> &, Uint32 i) const;
/**
* Allocate an object from pool - update Ptr
*
* Return i
*/
bool seize(Ptr<T> &);
/**
* Return an object to pool
*/
void release(Uint32 i);
/**
* Return an object to pool
*/
void release(Ptr<T> &);
private:
P m_pool;
};
template <typename T, typename P>
inline
RecordPool<T, P>::RecordPool()
{
}
template <typename T, typename P>
inline
void
RecordPool<T, P>::init(Uint32 type_id, const Pool_context& pc)
{
Record_info ri;
ri.m_size = sizeof(T);
ri.m_offset_next_pool = offsetof(T, nextPool);
ri.m_offset_magic = offsetof(T, m_magic);
ri.m_type_id = type_id;
m_pool.init(ri, pc);
}
template <typename T, typename P>
inline
void
RecordPool<T, P>::wo_pool_init(Uint32 type_id, const Pool_context& pc)
{
Record_info ri;
ri.m_size = sizeof(T);
ri.m_offset_next_pool = 0;
ri.m_offset_magic = offsetof(T, m_magic);
ri.m_type_id = type_id;
m_pool.init(ri, pc);
}
template <typename T, typename P>
inline
RecordPool<T, P>::~RecordPool()
{
}
template <typename T, typename P>
inline
void
RecordPool<T, P>::getPtr(Ptr<T> & ptr)
{
ptr.p = static_cast<T*>(m_pool.getPtr(ptr.i));
}
template <typename T, typename P>
inline
void
RecordPool<T, P>::getPtr(ConstPtr<T> & ptr) const
{
ptr.p = static_cast<const T*>(m_pool.getPtr(ptr.i));
}
template <typename T, typename P>
inline
void
RecordPool<T, P>::getPtr(Ptr<T> & ptr, Uint32 i)
{
ptr.i = i;
ptr.p = static_cast<T*>(m_pool.getPtr(ptr.i));
}
template <typename T, typename P>
inline
void
RecordPool<T, P>::getPtr(ConstPtr<T> & ptr, Uint32 i) const
{
ptr.i = i;
ptr.p = static_cast<const T*>(m_pool.getPtr(ptr.i));
}
template <typename T, typename P>
inline
T *
RecordPool<T, P>::getPtr(Uint32 i)
{
return static_cast<T*>(m_pool.getPtr(i));
}
template <typename T, typename P>
inline
const T *
RecordPool<T, P>::getConstPtr(Uint32 i) const
{
return static_cast<const T*>(m_pool.getPtr(i));
}
template <typename T, typename P>
inline
bool
RecordPool<T, P>::seize(Ptr<T> & ptr)
{
return m_pool.seize(*(Ptr<void>*)&ptr);
}
template <typename T, typename P>
inline
void
RecordPool<T, P>::release(Uint32 i)
{
Ptr<T> p;
getPtr(p, i);
m_pool.release(p);
}
template <typename T, typename P>
inline
void
RecordPool<T, P>::release(Ptr<T> & ptr)
{
m_pool.release(*(Ptr<void>*)&ptr);
}
#endif #endif
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef SLFIFOLIST_HPP
#define SLFIFOLIST_HPP
#include <kernel_types.h>
#include "Pool.hpp"
/**
* Template class used for implementing an
* list of object retreived from a pool
*/
template <typename P, typename T, typename U = T>
class SLFifoListImpl
{
public:
/**
* List head
*/
struct Head
{
Head();
Uint32 firstItem;
Uint32 lastItem;
#ifdef VM_TRACE
bool in_use;
#endif
inline bool isEmpty() const { return firstItem == RNIL;}
};
SLFifoListImpl(P & thePool);
bool seizeFirst(Ptr<T> &);
bool seizeLast(Ptr<T> &);
bool seize(Ptr<T> & ptr) { return seizeLast(ptr);}
void releaseFirst(Ptr<T> &);
void addFirst(Ptr<T> &);
void addLast(Ptr<T> &);
void removeFirst(Ptr<T> &);
/**
* Update i & p value according to <b>i</b>
*/
void getPtr(Ptr<T> &, Uint32 i) const;
/**
* Update p value for ptr according to i value
*/
void getPtr(Ptr<T> &) const ;
/**
* Get pointer for i value
*/
T * getPtr(Uint32 i) const ;
/**
* Update ptr to first element in list
*
* Return i
*/
bool first(Ptr<T> &) const ;
/**
* Update ptr to first element in list
*
* Return i
*/
bool last(Ptr<T> &) const ;
/**
* Get next element
*
* NOTE ptr must be both p & i
*/
bool next(Ptr<T> &) const ;
/**
* Check if next exists i.e. this is not last
*
* NOTE ptr must be both p & i
*/
bool hasNext(const Ptr<T> &) const;
inline bool isEmpty() const { return head.firstItem == RNIL;}
protected:
Head head;
P & thePool;
};
template <typename P, typename T, typename U = T>
class LocalSLFifoListImpl : public SLFifoListImpl<P,T,U>
{
public:
LocalSLFifoListImpl(P & thePool, typename SLFifoListImpl<P,T,U>::Head &_src)
: SLFifoListImpl<P,T,U>(thePool), src(_src)
{
this->head = src;
#ifdef VM_TRACE
assert(src.in_use == false);
src.in_use = true;
#endif
}
~LocalSLFifoListImpl(){
#ifdef VM_TRACE
assert(src.in_use == true);
#endif
src = this->head;
}
private:
typename SLFifoListImpl<P,T,U>::Head & src;
};
template <typename P, typename T, typename U>
inline
SLFifoListImpl<P,T,U>::SLFifoListImpl(P & _pool):
thePool(_pool)
{
}
template <typename P, typename T, typename U>
inline
SLFifoListImpl<P,T,U>::Head::Head()
{
firstItem = RNIL;
lastItem = RNIL;
#ifdef VM_TRACE
in_use = false;
#endif
}
template <typename P, typename T, typename U>
inline
bool
SLFifoListImpl<P,T,U>::seizeFirst(Ptr<T> & p)
{
if (thePool.seize(p))
{
addFirst(p);
return true;
}
p.p = NULL;
return false;
}
template <typename P, typename T, typename U>
inline
bool
SLFifoListImpl<P,T,U>::seizeLast(Ptr<T> & p)
{
if (thePool.seize(p))
{
addLast(p);
return true;
}
p.p = NULL;
return false;
}
template <typename P, typename T, typename U>
inline
void
SLFifoListImpl<P,T,U>::addFirst(Ptr<T> & p)
{
Uint32 first = head.firstItem;
head.firstItem = p.i;
if (first == RNIL)
{
head.lastItem = p.i;
}
p.p->U::nextList = first;
}
template <typename P, typename T, typename U>
inline
void
SLFifoListImpl<P,T,U>::addLast(Ptr<T> & p)
{
T * t = p.p;
Uint32 last = head.lastItem;
t->U::nextList = RNIL;
head.lastItem = p.i;
if(last != RNIL)
{
T * t2 = thePool.getPtr(last);
t2->U::nextList = p.i;
}
else
{
head.firstItem = p.i;
}
}
template <typename P, typename T, typename U>
inline
void
SLFifoListImpl<P,T,U>::removeFirst(Ptr<T> & p)
{
Uint32 first = head.firstItem;
Uint32 last = head.lastItem;
assert(p.i == first);
if (first != last)
{
head.firstItem = p.p->U::nextList;
}
else
{
head.firstItem = head.lastItem = RNIL;
}
}
template <typename P, typename T, typename U>
inline
void
SLFifoListImpl<P,T,U>::releaseFirst(Ptr<T> & p)
{
removeFirst(p);
thePool.release(p);
}
template <typename P, typename T, typename U>
inline
void
SLFifoListImpl<P,T,U>::getPtr(Ptr<T> & p, Uint32 i) const
{
p.i = i;
p.p = thePool.getPtr(i);
}
template <typename P, typename T, typename U>
inline
void
SLFifoListImpl<P,T,U>::getPtr(Ptr<T> & p) const
{
thePool.getPtr(p);
}
template <typename P, typename T, typename U>
inline
T *
SLFifoListImpl<P,T,U>::getPtr(Uint32 i) const
{
return thePool.getPtr(i);
}
/**
* Update ptr to first element in list
*
* Return i
*/
template <typename P, typename T, typename U>
inline
bool
SLFifoListImpl<P,T,U>::first(Ptr<T> & p) const
{
p.i = head.firstItem;
if(p.i != RNIL){
p.p = thePool.getPtr(p.i);
return true;
}
p.p = NULL;
return false;
}
template <typename P, typename T, typename U>
inline
bool
SLFifoListImpl<P,T,U>::last(Ptr<T> & p) const
{
p.i = head.lastItem;
if(p.i != RNIL){
p.p = thePool.getPtr(p.i);
return true;
}
p.p = NULL;
return false;
}
template <typename P, typename T, typename U>
inline
bool
SLFifoListImpl<P,T,U>::next(Ptr<T> & p) const
{
p.i = p.p->U::nextList;
if(p.i != RNIL){
p.p = thePool.getPtr(p.i);
return true;
}
p.p = NULL;
return false;
}
template <typename P, typename T, typename U>
inline
bool
SLFifoListImpl<P,T,U>::hasNext(const Ptr<T> & p) const
{
return p.p->U::nextList != RNIL;
}
// Specializations
template <typename T, typename U = T>
class SLFifoList : public SLFifoListImpl<ArrayPool<T>, T, U>
{
public:
SLFifoList(ArrayPool<T> & p) : SLFifoListImpl<ArrayPool<T>, T, U>(p) {}
};
template <typename T, typename U = T>
class LocalSLFifoList : public LocalSLFifoListImpl<ArrayPool<T>,T,U> {
public:
LocalSLFifoList(ArrayPool<T> & p, typename SLFifoList<T,U>::Head & _src)
: LocalSLFifoListImpl<ArrayPool<T>,T,U>(p, _src) {}
};
#endif
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "WOPool.hpp"
#include <ndbd_exit_codes.h>
#include <NdbOut.hpp>
WOPool::WOPool()
{
bzero(this, sizeof(* this));
m_current_pos = GLOBAL_PAGE_SIZE_WORDS;
}
void
WOPool::init(const Record_info& ri, const Pool_context& pc)
{
m_ctx = pc;
m_record_info = ri;
m_record_info.m_size = ((ri.m_size + 3) >> 2); // Align to word boundary
m_record_info.m_offset_magic = ((ri.m_offset_magic + 3) >> 2);
m_memroot = (WOPage*)m_ctx.get_memroot();
ndbout_c("WOPool::init(%x, %d)",ri.m_type_id, m_record_info.m_size);
}
bool
WOPool::seize_new_page(Ptr<void>& ptr)
{
ndbout_c("WOPool::seize_new_page(%x)", m_record_info.m_type_id);
WOPage* page;
Uint32 page_no = RNIL;
if ((page = (WOPage*)m_ctx.alloc_page(m_record_info.m_type_id, &page_no)))
{
if (m_current_page)
{
m_current_page->m_ref_count = m_current_ref_count;
}
m_current_pos = 0;
m_current_ref_count = 0;
m_current_page_no = page_no;
m_current_page = page;
page->m_type_id = m_record_info.m_type_id;
bool ret = seize(ptr);
assert(ret);
return true;
}
return false;
}
void
WOPool::release_not_current(Ptr<void> ptr)
{
ndbout_c("WOPool::release_not_current(%x)", m_record_info.m_type_id);
WOPage* page = (WOPage*)(UintPtr(ptr.p) & ~(GLOBAL_PAGE_SIZE - 1));
Uint32 cnt = page->m_ref_count;
Uint32 type = page->m_type_id;
Uint32 ri_type = m_record_info.m_type_id;
if (likely(cnt && type == ri_type))
{
if (cnt == 1)
{
m_ctx.release_page(ri_type, ptr.i >> POOL_RECORD_BITS);
return;
}
page->m_ref_count = cnt - 1;
return;
}
handle_inconsistent_release(ptr);
}
void
WOPool::handle_invalid_release(Ptr<void> ptr)
{
char buf[255];
Uint32 pos = ptr.i & POOL_RECORD_MASK;
Uint32 pageI = ptr.i >> POOL_RECORD_BITS;
Uint32 * record_ptr_p = (Uint32*)ptr.p;
Uint32 * record_ptr_i = (m_memroot+pageI)->m_data + pos;
Uint32 magic = * (record_ptr_p + m_record_info.m_offset_magic);
snprintf(buf, sizeof(buf),
"Invalid memory release: ptr (%x %p %p) magic: (%.8x %.8x) memroot: %p page: %x",
ptr.i, ptr.p, record_ptr_i, magic, m_record_info.m_type_id,
m_memroot,
(m_memroot+pageI)->m_type_id);
m_ctx.handleAbort(NDBD_EXIT_PRGERR, buf);
}
void
WOPool::handle_invalid_get_ptr(Uint32 ptrI)
{
char buf[255];
Uint32 pos = ptrI & POOL_RECORD_MASK;
Uint32 pageI = ptrI >> POOL_RECORD_BITS;
Uint32 * record_ptr_i = (m_memroot+pageI)->m_data + pos;
Uint32 magic = * (record_ptr_i + m_record_info.m_offset_magic);
snprintf(buf, sizeof(buf),
"Invalid memory access: ptr (%x %p) magic: (%.8x %.8x) memroot: %p page: %x",
ptrI, record_ptr_i, magic, m_record_info.m_type_id,
m_memroot,
(m_memroot+pageI)->m_type_id);
m_ctx.handleAbort(NDBD_EXIT_PRGERR, buf);
}
void
WOPool::handle_inconsistent_release(Ptr<void> ptr)
{
WOPage* page = (WOPage*)(UintPtr(ptr.p) & ~(GLOBAL_PAGE_SIZE - 1));
Uint32 cnt = page->m_ref_count;
Uint32 type = page->m_type_id;
Uint32 ri_type = m_record_info.m_type_id;
char buf[255];
snprintf(buf, sizeof(buf),
"Memory corruption: ptr (%x %p) page (%d %x %x)",
ptr.i, ptr.p, cnt, type, ri_type);
m_ctx.handleAbort(NDBD_EXIT_PRGERR, buf);
}
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "Pool.hpp"
struct WOPage
{
Uint32 m_type_id;
Uint32 m_ref_count;
Uint32 m_next_page;
Uint32 m_data[GLOBAL_PAGE_SIZE_WORDS - 3];
};
/**
* Write Once Pool
*/
struct WOPool
{
Record_info m_record_info;
WOPage* m_memroot;
WOPage* m_current_page;
Pool_context m_ctx;
Uint32 m_current_page_no;
Uint16 m_current_pos;
Uint16 m_current_ref_count;
public:
WOPool();
void init(const Record_info& ri, const Pool_context& pc);
bool seize(Ptr<void>&);
void release(Ptr<void>);
void * getPtr(Uint32 i);
private:
bool seize_new_page(Ptr<void>&);
void release_not_current(Ptr<void>);
void handle_invalid_release(Ptr<void>);
void handle_invalid_get_ptr(Uint32 i);
void handle_inconsistent_release(Ptr<void>);
};
inline
bool
WOPool::seize(Ptr<void>& ptr)
{
Uint32 pos = m_current_pos;
Uint32 size = m_record_info.m_size;
WOPage *pageP = m_current_page;
if (likely(pos + size < GLOBAL_PAGE_SIZE_WORDS))
{
ptr.i = (m_current_page_no << POOL_RECORD_BITS) + pos;
ptr.p = (pageP->m_data + pos);
pageP->m_data[pos+m_record_info.m_offset_magic] = ~(Uint32)m_record_info.m_type_id;
m_current_pos = pos + size;
m_current_ref_count++;
return true;
}
return seize_new_page(ptr);
}
inline
void
WOPool::release(Ptr<void> ptr)
{
Uint32 cur_page = m_current_page_no;
Uint32 ptr_page = ptr.i >> POOL_RECORD_BITS;
Uint32 *magic_ptr = (((Uint32*)ptr.p)+m_record_info.m_offset_magic);
Uint32 magic_val = *magic_ptr;
if (likely(magic_val == ~(Uint32)m_record_info.m_type_id))
{
* magic_ptr = 0;
if (cur_page == ptr_page)
{
if (m_current_ref_count == 1)
{
m_current_pos = 0;
}
m_current_ref_count--;
return;
}
return release_not_current(ptr);
}
handle_invalid_release(ptr);
}
inline
void*
WOPool::getPtr(Uint32 i)
{
Uint32 page_no = i >> POOL_RECORD_BITS;
Uint32 page_idx = i & POOL_RECORD_MASK;
WOPage * page = m_memroot + page_no;
Uint32 * record = page->m_data + page_idx;
Uint32 magic_val = * (record + m_record_info.m_offset_magic);
if (likely(magic_val == ~(Uint32)m_record_info.m_type_id))
{
return record;
}
handle_invalid_get_ptr(i);
}
...@@ -557,13 +557,13 @@ Ndbd_mem_manager::alloc_page(Uint32 type, Uint32* i) ...@@ -557,13 +557,13 @@ Ndbd_mem_manager::alloc_page(Uint32 type, Uint32* i)
} }
void void
Ndbd_mem_manager::release_page(Uint32 type, Uint32 i, void * p) Ndbd_mem_manager::release_page(Uint32 type, Uint32 i)
{ {
Uint32 idx = type & RG_MASK; Uint32 idx = type & RG_MASK;
assert(idx && idx < XX_RL_COUNT); assert(idx && idx < XX_RL_COUNT);
Resource_limit tot = m_resource_limit[0]; Resource_limit tot = m_resource_limit[0];
Resource_limit rl = m_resource_limit[idx]; Resource_limit rl = m_resource_limit[idx];
Uint32 sub = (rl.m_curr < rl.m_min) ? 0 : 1; // Over min ? Uint32 sub = (rl.m_curr < rl.m_min) ? 0 : 1; // Over min ?
release(i, 1); release(i, 1);
m_resource_limit[0].m_curr = tot.m_curr - sub; m_resource_limit[0].m_curr = tot.m_curr - sub;
......
...@@ -66,7 +66,7 @@ public: ...@@ -66,7 +66,7 @@ public:
void dump() const ; void dump() const ;
void* alloc_page(Uint32 type, Uint32* i); void* alloc_page(Uint32 type, Uint32* i);
void release_page(Uint32 type, Uint32 i, void * p); void release_page(Uint32 type, Uint32 i);
/** /**
* Compute 2log of size * Compute 2log of size
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment