/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include "pgman.hpp" #include <signaldata/FsRef.hpp> #include <signaldata/FsConf.hpp> #include <signaldata/FsReadWriteReq.hpp> #include <signaldata/PgmanContinueB.hpp> #include <signaldata/LCP.hpp> #include <dbtup/Dbtup.hpp> #include <DebuggerNames.hpp> /** * Requests that make page dirty */ #define DIRTY_FLAGS (Page_request::COMMIT_REQ | \ Page_request::DIRTY_REQ | \ Page_request::ALLOC_REQ) // todo use this #ifdef VM_TRACE #define dbg(x) \ do { if (! debugFlag) break; debugOut << "PGMAN: " << x << endl; } while (0) #else #define dbg(x) #endif static bool g_dbg_lcp = false; #if 1 #define DBG_LCP(x) #else #define DBG_LCP(x) if(g_dbg_lcp) ndbout << x #endif Pgman::Pgman(Block_context& ctx) : SimulatedBlock(PGMAN, ctx), m_file_map(m_data_buffer_pool), m_page_hashlist(m_page_entry_pool), m_page_stack(m_page_entry_pool), m_page_queue(m_page_entry_pool) #ifdef VM_TRACE ,debugOut(* new NullOutputStream()) ,debugFlag(false) #endif { BLOCK_CONSTRUCTOR(Pgman); // Add received signals addRecSignal(GSN_STTOR, &Pgman::execSTTOR); addRecSignal(GSN_READ_CONFIG_REQ, &Pgman::execREAD_CONFIG_REQ); addRecSignal(GSN_DUMP_STATE_ORD, &Pgman::execDUMP_STATE_ORD); addRecSignal(GSN_CONTINUEB, &Pgman::execCONTINUEB); addRecSignal(GSN_FSREADREF, &Pgman::execFSREADREF, true); addRecSignal(GSN_FSREADCONF, &Pgman::execFSREADCONF); addRecSignal(GSN_FSWRITEREF, &Pgman::execFSWRITEREF, true); addRecSignal(GSN_FSWRITECONF, &Pgman::execFSWRITECONF); addRecSignal(GSN_LCP_FRAG_ORD, &Pgman::execLCP_FRAG_ORD); addRecSignal(GSN_END_LCP_REQ, &Pgman::execEND_LCP_REQ); // loop status m_stats_loop_on = false; m_busy_loop_on = false; m_cleanup_loop_on = false; m_lcp_loop_on = false; // LCP variables m_last_lcp = 0; m_last_lcp_complete = 0; m_lcp_curr_bucket = ~(Uint32)0; m_lcp_outstanding = 0; // clean-up variables m_cleanup_ptr.i = RNIL; // should be a factor larger than number of pool pages m_data_buffer_pool.setSize(16); m_page_hashlist.setSize(512); for (Uint32 k = 0; k < Page_entry::SUBLIST_COUNT; k++) m_page_sublist[k] = new Page_sublist(m_page_entry_pool); } Pgman::~Pgman() { for (Uint32 k = 0; k < Page_entry::SUBLIST_COUNT; k++) delete m_page_sublist[k]; } BLOCK_FUNCTIONS(Pgman) void Pgman::execREAD_CONFIG_REQ(Signal* signal) { jamEntry(); const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr(); Uint32 ref = req->senderRef; Uint32 senderData = req->senderData; const ndb_mgm_configuration_iterator * p = m_ctx.m_config.getOwnConfigIterator(); ndbrequire(p != 0); Uint64 page_buffer = 64*1024*1024; ndb_mgm_get_int64_parameter(p, CFG_DB_DISK_PAGE_BUFFER_MEMORY, &page_buffer); if (page_buffer > 0) { page_buffer = (page_buffer + GLOBAL_PAGE_SIZE - 1) / GLOBAL_PAGE_SIZE; // in pages m_param.m_max_pages = page_buffer; m_page_entry_pool.setSize(m_param.m_lirs_stack_mult * page_buffer); m_param.m_max_hot_pages = (page_buffer * 9) / 10; } Pool_context pc; pc.m_block = this; m_page_request_pool.wo_pool_init(RT_PGMAN_PAGE_REQUEST, pc); ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend(); conf->senderRef = reference(); conf->senderData = senderData; sendSignal(ref, GSN_READ_CONFIG_CONF, signal, ReadConfigConf::SignalLength, JBB); } Pgman::Param::Param() : m_max_pages(64), // smallish for testing m_lirs_stack_mult(10), m_max_hot_pages(56), m_max_loop_count(256), m_max_io_waits(256), m_stats_loop_delay(1000), m_cleanup_loop_delay(200), m_lcp_loop_delay(0) { } Pgman::Stats::Stats() : m_num_pages(0), m_page_hits(0), m_page_faults(0), m_current_io_waits(0) { } void Pgman::execSTTOR(Signal* signal) { jamEntry(); const Uint32 startPhase = signal->theData[1]; switch (startPhase) { case 1: { Lgman* lgman = (Lgman*)globalData.getBlock(LGMAN); new (&m_lgman) Logfile_client(this, lgman, 0); c_tup = (Dbtup*)globalData.getBlock(DBTUP); } break; case 3: { // start forever loops do_stats_loop(signal); do_cleanup_loop(signal); m_stats_loop_on = true; m_cleanup_loop_on = true; } break; case 7: break; default: break; } sendSTTORRY(signal); } void Pgman::sendSTTORRY(Signal* signal) { signal->theData[0] = 0; signal->theData[3] = 1; signal->theData[4] = 3; signal->theData[5] = 7; signal->theData[6] = 255; // No more start phases from missra sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 7, JBB); } void Pgman::execCONTINUEB(Signal* signal) { jamEntry(); Uint32 data1 = signal->theData[1]; switch (signal->theData[0]) { case PgmanContinueB::STATS_LOOP: jam(); do_stats_loop(signal); break; case PgmanContinueB::BUSY_LOOP: jam(); do_busy_loop(signal); break; case PgmanContinueB::CLEANUP_LOOP: jam(); do_cleanup_loop(signal); break; case PgmanContinueB::LCP_LOOP: jam(); do_lcp_loop(signal); break; case PgmanContinueB::LCP_LOCKED: { jam(); Ptr<Page_entry> ptr; Page_sublist& pl = *m_page_sublist[Page_entry::SL_LOCKED]; if (data1 != RNIL) { pl.getPtr(ptr, data1); process_lcp_locked(signal, ptr); } else { signal->theData[0] = m_end_lcp_req.senderData; sendSignal(m_end_lcp_req.senderRef, GSN_END_LCP_CONF, signal, 1, JBB); } return; } default: ndbrequire(false); break; } } // page entry Pgman::Page_entry::Page_entry(Uint32 file_no, Uint32 page_no) : m_state(0), m_file_no(file_no), m_page_no(page_no), m_real_page_i(RNIL), m_copy_page_i(RNIL), m_lsn(0), m_last_lcp(0), m_dirty_count(0), m_busy_count(0), m_requests() { } // page lists Uint32 Pgman::get_sublist_no(Page_state state) { if (state == 0) { return ZNIL; } if (state & Page_entry::REQUEST) { if (! (state & Page_entry::BOUND)) { return Page_entry::SL_BIND; } if (! (state & Page_entry::MAPPED)) { if (! (state & Page_entry::PAGEIN)) { return Page_entry::SL_MAP; } return Page_entry::SL_MAP_IO; } if (! (state & Page_entry::PAGEOUT)) { return Page_entry::SL_CALLBACK; } return Page_entry::SL_CALLBACK_IO; } if (state & Page_entry::BUSY) { return Page_entry::SL_BUSY; } if (state & Page_entry::LOCKED) { return Page_entry::SL_LOCKED; } if (state == Page_entry::ONSTACK) { return Page_entry::SL_IDLE; } return Page_entry::SL_OTHER; } void Pgman::set_page_state(Ptr<Page_entry> ptr, Page_state new_state) { #ifdef VM_TRACE debugOut << "PGMAN: >set_page_state: state=" << hex << new_state << endl; debugOut << "PGMAN: " << ptr << ": before" << endl; #endif Page_state old_state = ptr.p->m_state; if (old_state != new_state) { Uint32 old_list_no = get_sublist_no(old_state); Uint32 new_list_no = get_sublist_no(new_state); if (old_state != 0) { ndbrequire(old_list_no != ZNIL); if (old_list_no != new_list_no) { Page_sublist& old_list = *m_page_sublist[old_list_no]; old_list.remove(ptr); } } if (new_state != 0) { ndbrequire(new_list_no != ZNIL); if (old_list_no != new_list_no) { Page_sublist& new_list = *m_page_sublist[new_list_no]; new_list.add(ptr); } } ptr.p->m_state = new_state; } #ifdef VM_TRACE debugOut << "PGMAN: " << ptr << ": after" << endl; debugOut << "PGMAN: <set_page_state" << endl; #endif } // seize/release pages and entries bool Pgman::seize_cache_page(Ptr<GlobalPage>& gptr) { // page cache has no own pool yet bool ok = m_global_page_pool.seize(gptr); // zero is reserved as return value for queued request if (ok && gptr.i == 0) ok = m_global_page_pool.seize(gptr); if (ok) { ndbrequire(m_stats.m_num_pages < m_param.m_max_pages); m_stats.m_num_pages++; } return ok; } void Pgman::release_cache_page(Uint32 i) { m_global_page_pool.release(i); ndbrequire(m_stats.m_num_pages != 0); m_stats.m_num_pages--; } bool Pgman::find_page_entry(Ptr<Page_entry>& ptr, Uint32 file_no, Uint32 page_no) { Page_entry key; key.m_file_no = file_no; key.m_page_no = page_no; if (m_page_hashlist.find(ptr, key)) { #ifdef VM_TRACE debugOut << "PGMAN: find_page_entry" << endl; debugOut << "PGMAN: " << ptr << endl; #endif return true; } return false; } Uint32 Pgman::seize_page_entry(Ptr<Page_entry>& ptr, Uint32 file_no, Uint32 page_no) { if (m_page_entry_pool.seize(ptr)) { new (ptr.p) Page_entry(file_no, page_no); m_page_hashlist.add(ptr); #ifdef VM_TRACE ptr.p->m_this = this; debugOut << "PGMAN: seize_page_entry" << endl; debugOut << "PGMAN: " << ptr << endl; #endif return true; } return false; } bool Pgman::get_page_entry(Ptr<Page_entry>& ptr, Uint32 file_no, Uint32 page_no) { if (find_page_entry(ptr, file_no, page_no)) { jam(); ndbrequire(ptr.p->m_state != 0); m_stats.m_page_hits++; #ifdef VM_TRACE debugOut << "PGMAN: get_page_entry: found" << endl; debugOut << "PGMAN: " << ptr << endl; #endif return true; } if (m_page_entry_pool.getNoOfFree() == 0) { jam(); Page_sublist& pl_idle = *m_page_sublist[Page_entry::SL_IDLE]; Ptr<Page_entry> idle_ptr; if (pl_idle.first(idle_ptr)) { jam(); #ifdef VM_TRACE debugOut << "PGMAN: get_page_entry: re-use idle entry" << endl; debugOut << "PGMAN: " << idle_ptr << endl; #endif Page_state state = idle_ptr.p->m_state; ndbrequire(state == Page_entry::ONSTACK); Page_stack& pl_stack = m_page_stack; ndbrequire(pl_stack.hasPrev(idle_ptr)); pl_stack.remove(idle_ptr); state &= ~ Page_entry::ONSTACK; set_page_state(idle_ptr, state); ndbrequire(idle_ptr.p->m_state == 0); release_page_entry(idle_ptr); } } if (seize_page_entry(ptr, file_no, page_no)) { jam(); ndbrequire(ptr.p->m_state == 0); m_stats.m_page_faults++; #ifdef VM_TRACE debugOut << "PGMAN: get_page_entry: seize" << endl; debugOut << "PGMAN: " << ptr << endl; #endif return true; } ndbrequire(false); return false; } void Pgman::release_page_entry(Ptr<Page_entry>& ptr) { #ifdef VM_TRACE debugOut << "PGMAN: release_page_entry" << endl; debugOut << "PGMAN: " << ptr << endl; #endif Page_state state = ptr.p->m_state; ndbrequire(ptr.p->m_requests.isEmpty()); ndbrequire(! (state & Page_entry::ONSTACK)); ndbrequire(! (state & Page_entry::ONQUEUE)); ndbrequire(ptr.p->m_real_page_i == RNIL); if (! (state & Page_entry::LOCKED)) ndbrequire(! (state & Page_entry::REQUEST)); set_page_state(ptr, 0); m_page_hashlist.remove(ptr); m_page_entry_pool.release(ptr); } // LIRS /* * After the hot entry at stack bottom is removed, additional entries * are removed until next hot entry is found. There are 3 cases for the * removed entry: 1) a bound entry is already on queue 2) an unbound * entry with open requests enters queue at bind time 3) an unbound * entry without requests is returned to entry pool. */ void Pgman::lirs_stack_prune() { #ifdef VM_TRACE debugOut << "PGMAN: >lirs_stack_prune" << endl; #endif Page_stack& pl_stack = m_page_stack; Page_queue& pl_queue = m_page_queue; Ptr<Page_entry> ptr; while (pl_stack.first(ptr)) // first is stack bottom { Page_state state = ptr.p->m_state; if (state & Page_entry::HOT) { jam(); break; } #ifdef VM_TRACE debugOut << "PGMAN: " << ptr << ": prune from stack" << endl; #endif pl_stack.remove(ptr); state &= ~ Page_entry::ONSTACK; set_page_state(ptr, state); if (state & Page_entry::BOUND) { jam(); ndbrequire(state & Page_entry::ONQUEUE); } else if (state & Page_entry::REQUEST) { // enters queue at bind jam(); ndbrequire(! (state & Page_entry::ONQUEUE)); } else { jam(); release_page_entry(ptr); } } #ifdef VM_TRACE debugOut << "PGMAN: <lirs_stack_prune" << endl; #endif } /* * Remove the hot entry at stack bottom and make it cold and do stack * pruning. There are 2 cases for the removed entry: 1) a bound entry * is moved to queue 2) an unbound entry must have requests and enters * queue at bind time. */ void Pgman::lirs_stack_pop() { #ifdef VM_TRACE debugOut << "PGMAN: lirs_stack_pop" << endl; #endif Page_stack& pl_stack = m_page_stack; Page_queue& pl_queue = m_page_queue; Ptr<Page_entry> ptr; bool ok = pl_stack.first(ptr); ndbrequire(ok); Page_state state = ptr.p->m_state; #ifdef VM_TRACE debugOut << "PGMAN: " << ptr << ": pop from stack" << endl; #endif ndbrequire(state & Page_entry::HOT); ndbrequire(state & Page_entry::ONSTACK); pl_stack.remove(ptr); state &= ~ Page_entry::HOT; state &= ~ Page_entry::ONSTACK; ndbrequire(! (state & Page_entry::ONQUEUE)); if (state & Page_entry::BOUND) { jam(); pl_queue.add(ptr); state |= Page_entry::ONQUEUE; } else { // enters queue at bind jam(); ndbrequire(state & Page_entry::REQUEST); } set_page_state(ptr, state); lirs_stack_prune(); } /* * Update LIRS lists when page is referenced. */ void Pgman::lirs_reference(Ptr<Page_entry> ptr) { #ifdef VM_TRACE debugOut << "PGMAN: >lirs_reference" << endl; debugOut << "PGMAN: " << ptr << endl; #endif Page_stack& pl_stack = m_page_stack; Page_queue& pl_queue = m_page_queue; Page_state state = ptr.p->m_state; ndbrequire(! (state & Page_entry::LOCKED)); // even non-LIRS cache pages are counted on l.h.s. if (m_stats.m_num_pages >= m_param.m_max_hot_pages) { if (state & Page_entry::HOT) { // case 1 jam(); ndbrequire(state & Page_entry::ONSTACK); bool at_bottom = ! pl_stack.hasPrev(ptr); pl_stack.remove(ptr); pl_stack.add(ptr); if (at_bottom) { jam(); lirs_stack_prune(); } } else if (state & Page_entry::ONSTACK) { // case 2a 3a jam(); pl_stack.remove(ptr); if (! pl_stack.isEmpty()) { jam(); lirs_stack_pop(); } pl_stack.add(ptr); state |= Page_entry::HOT; if (state & Page_entry::ONQUEUE) { jam(); move_cleanup_ptr(ptr); pl_queue.remove(ptr); state &= ~ Page_entry::ONQUEUE; } } else { // case 2b 3b jam(); pl_stack.add(ptr); state |= Page_entry::ONSTACK; if (state & Page_entry::ONQUEUE) { jam(); move_cleanup_ptr(ptr); pl_queue.remove(ptr); state &= ~ Page_entry::ONQUEUE; } if (state & Page_entry::BOUND) { jam(); pl_queue.add(ptr); state |= Page_entry::ONQUEUE; } else { // enters queue at bind jam(); } } } else { #ifdef VM_TRACE debugOut << "PGMAN: filling up initial hot pages: " << m_stats.m_num_pages << " of " << m_param.m_max_hot_pages << endl; #endif jam(); if (state & Page_entry::ONSTACK) { jam(); pl_stack.remove(ptr); } pl_stack.add(ptr); state |= Page_entry::ONSTACK; state |= Page_entry::HOT; // it could be on queue already if (state & Page_entry::ONQUEUE) { jam(); pl_queue.remove(ptr); state &= ~Page_entry::ONQUEUE; } } set_page_state(ptr, state); #ifdef VM_TRACE debugOut << "PGMAN: <lirs_reference" << endl; #endif } // continueB loops void Pgman::do_stats_loop(Signal* signal) { #ifdef VM_TRACE debugOut << "PGMAN: do_stats_loop" << endl; verify_all(); #endif Uint32 delay = m_param.m_stats_loop_delay; signal->theData[0] = PgmanContinueB::STATS_LOOP; sendSignalWithDelay(PGMAN_REF, GSN_CONTINUEB, signal, delay, 1); } void Pgman::do_busy_loop(Signal* signal, bool direct) { #ifdef VM_TRACE debugOut << "PGMAN: >do_busy_loop on=" << m_busy_loop_on << " direct=" << direct << endl; #endif Uint32 restart = false; if (direct) { // may not cover the calling entry (void)process_bind(signal); (void)process_map(signal); // callback must be queued if (! m_busy_loop_on) { restart = true; m_busy_loop_on = true; } } else { ndbrequire(m_busy_loop_on); restart += process_bind(signal); restart += process_map(signal); restart += process_callback(signal); if (! restart) { m_busy_loop_on = false; } } if (restart) { signal->theData[0] = PgmanContinueB::BUSY_LOOP; sendSignal(PGMAN_REF, GSN_CONTINUEB, signal, 1, JBB); } #ifdef VM_TRACE debugOut << "PGMAN: <do_busy_loop on=" << m_busy_loop_on << " restart=" << restart << endl; #endif } void Pgman::do_cleanup_loop(Signal* signal) { #ifdef VM_TRACE debugOut << "PGMAN: do_cleanup_loop" << endl; #endif process_cleanup(signal); Uint32 delay = m_param.m_cleanup_loop_delay; signal->theData[0] = PgmanContinueB::CLEANUP_LOOP; sendSignalWithDelay(PGMAN_REF, GSN_CONTINUEB, signal, delay, 1); } void Pgman::do_lcp_loop(Signal* signal, bool direct) { #ifdef VM_TRACE debugOut << "PGMAN: >do_lcp_loop on=" << m_lcp_loop_on << " direct=" << direct << endl; #endif Uint32 restart = false; if (direct) { ndbrequire(! m_lcp_loop_on); restart = true; m_lcp_loop_on = true; } else { ndbrequire(m_lcp_loop_on); restart += process_lcp(signal); if (! restart) { m_lcp_loop_on = false; } } if (restart) { Uint32 delay = m_param.m_lcp_loop_delay; signal->theData[0] = PgmanContinueB::LCP_LOOP; if (delay) sendSignalWithDelay(PGMAN_REF, GSN_CONTINUEB, signal, delay, 1); else sendSignal(PGMAN_REF, GSN_CONTINUEB, signal, 1, JBB); } #ifdef VM_TRACE debugOut << "PGMAN: <do_lcp_loop on=" << m_lcp_loop_on << " restart=" << restart << endl; #endif } // busy loop bool Pgman::process_bind(Signal* signal) { #ifdef VM_TRACE debugOut << "PGMAN: >process_bind" << endl; #endif int max_count = 32; Page_sublist& pl_bind = *m_page_sublist[Page_entry::SL_BIND]; while (! pl_bind.isEmpty() && --max_count >= 0) { jam(); Ptr<Page_entry> ptr; pl_bind.first(ptr); if (! process_bind(signal, ptr)) { jam(); break; } } #ifdef VM_TRACE debugOut << "PGMAN: <process_bind" << endl; #endif return ! pl_bind.isEmpty(); } bool Pgman::process_bind(Signal* signal, Ptr<Page_entry> ptr) { #ifdef VM_TRACE debugOut << "PGMAN: " << ptr << " : process_bind" << endl; #endif Page_sublist& pl_bind = *m_page_sublist[Page_entry::SL_BIND]; Page_queue& pl_queue = m_page_queue; Ptr<GlobalPage> gptr; if (m_stats.m_num_pages < m_param.m_max_pages) { jam(); bool ok = seize_cache_page(gptr); // to handle failure requires some changes in LIRS ndbrequire(ok); } else { jam(); Ptr<Page_entry> clean_ptr; if (! pl_queue.first(clean_ptr)) { jam(); #ifdef VM_TRACE debugOut << "PGMAN: bind failed: queue empty" << endl; #endif // XXX busy loop return false; } Page_state clean_state = clean_ptr.p->m_state; // under unusual circumstances it could still be paging in if (! (clean_state & Page_entry::MAPPED) || clean_state & Page_entry::DIRTY || clean_state & Page_entry::REQUEST) { jam(); #ifdef VM_TRACE debugOut << "PGMAN: bind failed: queue front not evictable" << endl; debugOut << "PGMAN: " << clean_ptr << endl; #endif // XXX busy loop return false; } #ifdef VM_TRACE debugOut << "PGMAN: " << clean_ptr << " : evict" << endl; #endif ndbassert(clean_ptr.p->m_dirty_count == 0); ndbrequire(clean_state & Page_entry::ONQUEUE); ndbrequire(clean_state & Page_entry::BOUND); ndbrequire(clean_state & Page_entry::MAPPED); move_cleanup_ptr(clean_ptr); pl_queue.remove(clean_ptr); clean_state &= ~ Page_entry::ONQUEUE; gptr.i = clean_ptr.p->m_real_page_i; clean_ptr.p->m_real_page_i = RNIL; clean_state &= ~ Page_entry::BOUND; clean_state &= ~ Page_entry::MAPPED; set_page_state(clean_ptr, clean_state); if (! (clean_state & Page_entry::ONSTACK)) release_page_entry(clean_ptr); m_global_page_pool.getPtr(gptr); } Page_state state = ptr.p->m_state; ptr.p->m_real_page_i = gptr.i; state |= Page_entry::BOUND; if (state & Page_entry::EMPTY) { jam(); state |= Page_entry::MAPPED; } if (! (state & Page_entry::LOCKED) && ! (state & Page_entry::ONQUEUE) && ! (state & Page_entry::HOT)) { jam(); #ifdef VM_TRACE debugOut << "PGMAN: " << ptr << " : add to queue at bind" << endl; #endif pl_queue.add(ptr); state |= Page_entry::ONQUEUE; } set_page_state(ptr, state); return true; } bool Pgman::process_map(Signal* signal) { #ifdef VM_TRACE debugOut << "PGMAN: >process_map" << endl; #endif int max_count = 0; if (m_param.m_max_io_waits > m_stats.m_current_io_waits) { max_count = m_param.m_max_io_waits - m_stats.m_current_io_waits; max_count = max_count / 2 + 1; } Page_sublist& pl_map = *m_page_sublist[Page_entry::SL_MAP]; while (! pl_map.isEmpty() && --max_count >= 0) { jam(); Ptr<Page_entry> ptr; pl_map.first(ptr); if (! process_map(signal, ptr)) { jam(); break; } } #ifdef VM_TRACE debugOut << "PGMAN: <process_map" << endl; #endif return ! pl_map.isEmpty(); } bool Pgman::process_map(Signal* signal, Ptr<Page_entry> ptr) { #ifdef VM_TRACE debugOut << "PGMAN: " << ptr << " : process_map" << endl; #endif pagein(signal, ptr); return true; } bool Pgman::process_callback(Signal* signal) { #ifdef VM_TRACE debugOut << "PGMAN: >process_callback" << endl; #endif int max_count = 1; Page_sublist& pl_callback = *m_page_sublist[Page_entry::SL_CALLBACK]; Ptr<Page_entry> ptr; pl_callback.first(ptr); while (! ptr.isNull() && --max_count >= 0) { jam(); Ptr<Page_entry> curr = ptr; pl_callback.next(ptr); if (! process_callback(signal, curr)) { jam(); break; } } #ifdef VM_TRACE debugOut << "PGMAN: <process_callback" << endl; #endif return ! pl_callback.isEmpty(); } bool Pgman::process_callback(Signal* signal, Ptr<Page_entry> ptr) { #ifdef VM_TRACE debugOut << "PGMAN: " << ptr << " : process_callback" << endl; #endif int max_count = 1; Page_state state = ptr.p->m_state; while (! ptr.p->m_requests.isEmpty() && --max_count >= 0) { jam(); SimulatedBlock* b; Callback callback; { /** * Make sure list is in own scope if callback will access this * list again (destructor restores list head). */ Local_page_request_list req_list(m_page_request_pool, ptr.p->m_requests); Ptr<Page_request> req_ptr; req_list.first(req_ptr); #ifdef VM_TRACE debugOut << "PGMAN: " << req_ptr << " : process_callback" << endl; #endif #ifdef ERROR_INSERT if (req_ptr.p->m_flags & Page_request::DELAY_REQ) { Uint64 now = NdbTick_CurrentMillisecond(); if (now < req_ptr.p->m_delay_until_time) { break; } } #endif b = globalData.getBlock(req_ptr.p->m_block); callback = req_ptr.p->m_callback; if (req_ptr.p->m_flags & DIRTY_FLAGS) { jam(); state |= Page_entry::DIRTY; ndbassert(ptr.p->m_dirty_count); ptr.p->m_dirty_count --; } req_list.releaseFirst(req_ptr); } ndbrequire(state & Page_entry::BOUND); ndbrequire(state & Page_entry::MAPPED); // callback may re-enter PGMAN and change page state set_page_state(ptr, state); b->execute(signal, callback, ptr.p->m_real_page_i); state = ptr.p->m_state; } if (ptr.p->m_requests.isEmpty()) { jam(); state &= ~ Page_entry::REQUEST; } set_page_state(ptr, state); return true; } // cleanup loop bool Pgman::process_cleanup(Signal* signal) { #ifdef VM_TRACE debugOut << "PGMAN: >process_cleanup" << endl; #endif Page_queue& pl_queue = m_page_queue; // XXX for now start always from beginning m_cleanup_ptr.i = RNIL; if (m_cleanup_ptr.i == RNIL && ! pl_queue.first(m_cleanup_ptr)) { jam(); #ifdef VM_TRACE debugOut << "PGMAN: <process_cleanup: empty queue" << endl; #endif return false; } int max_loop_count = m_param.m_max_loop_count; int max_count = 0; if (m_param.m_max_io_waits > m_stats.m_current_io_waits) { max_count = m_param.m_max_io_waits - m_stats.m_current_io_waits; max_count = max_count / 2 + 1; } Ptr<Page_entry> ptr = m_cleanup_ptr; while (max_loop_count != 0 && max_count != 0) { Page_state state = ptr.p->m_state; ndbrequire(! (state & Page_entry::LOCKED)); if (state & Page_entry::BUSY) { #ifdef VM_TRACE debugOut << "PGMAN: process_cleanup: break on busy page" << endl; debugOut << "PGMAN: " << ptr << endl; #endif break; } if (state & Page_entry::DIRTY && ! (state & Page_entry::PAGEIN) && ! (state & Page_entry::PAGEOUT)) { #ifdef VM_TRACE debugOut << "PGMAN: " << ptr << " : process_cleanup" << endl; #endif c_tup->disk_page_unmap_callback(ptr.p->m_real_page_i, ptr.p->m_dirty_count); pageout(signal, ptr); max_count--; } if (! pl_queue.hasNext(ptr)) break; pl_queue.next(ptr); max_loop_count--; } m_cleanup_ptr = ptr; #ifdef VM_TRACE debugOut << "PGMAN: <process_cleanup" << endl; #endif return true; } /* * Call this before queue.remove(ptr). If the removed entry is the * clean-up pointer, move it towards front. */ void Pgman::move_cleanup_ptr(Ptr<Page_entry> ptr) { Page_queue& pl_queue = m_page_queue; if (ptr.i == m_cleanup_ptr.i) { jam(); pl_queue.prev(m_cleanup_ptr); } } // LCP void Pgman::execLCP_FRAG_ORD(Signal* signal) { LcpFragOrd* ord = (LcpFragOrd*)signal->getDataPtr(); ndbrequire(ord->lcpId >= m_last_lcp_complete + 1 || m_last_lcp_complete == 0); m_last_lcp = ord->lcpId; DBG_LCP("Pgman::execLCP_FRAG_ORD lcp: " << m_last_lcp << endl); #ifdef VM_TRACE debugOut << "PGMAN: execLCP_FRAG_ORD" << " this=" << m_last_lcp << " last_complete=" << m_last_lcp_complete << " bucket=" << m_lcp_curr_bucket << endl; #endif } void Pgman::execEND_LCP_REQ(Signal* signal) { EndLcpReq* req = (EndLcpReq*)signal->getDataPtr(); m_end_lcp_req = *req; DBG_LCP("execEND_LCP_REQ" << endl); ndbrequire(!m_lcp_outstanding); m_lcp_curr_bucket = 0; #ifdef VM_TRACE debugOut << "PGMAN: execEND_LCP_REQ" << " this=" << m_last_lcp << " last_complete=" << m_last_lcp_complete << " bucket=" << m_lcp_curr_bucket << " outstanding=" << m_lcp_outstanding << endl; #endif m_last_lcp_complete = m_last_lcp; do_lcp_loop(signal, true); } bool Pgman::process_lcp(Signal* signal) { Page_hashlist& pl_hash = m_page_hashlist; int max_count = 0; if (m_param.m_max_io_waits > m_stats.m_current_io_waits) { max_count = m_param.m_max_io_waits - m_stats.m_current_io_waits; max_count = max_count / 2 + 1; } #ifdef VM_TRACE debugOut << "PGMAN: process_lcp" << " this=" << m_last_lcp << " last_complete=" << m_last_lcp_complete << " bucket=" << m_lcp_curr_bucket << " outstanding=" << m_lcp_outstanding << endl; #endif // start or re-start from beginning of current hash bucket if (m_lcp_curr_bucket != ~(Uint32)0) { Page_hashlist::Iterator iter; pl_hash.next(m_lcp_curr_bucket, iter); Uint32 loop = 0; while (iter.curr.i != RNIL && m_lcp_outstanding < (Uint32) max_count && (loop ++ < 32 || iter.bucket == m_lcp_curr_bucket)) { Ptr<Page_entry>& ptr = iter.curr; Page_state state = ptr.p->m_state; DBG_LCP("LCP " << ptr << " - "); if (ptr.p->m_last_lcp < m_last_lcp && (state & Page_entry::DIRTY) && (! (state & Page_entry::LOCKED))) { if(! (state & Page_entry::BOUND)) { ndbout << ptr << endl; ndbrequire(false); } if (state & Page_entry::BUSY) { DBG_LCP(" BUSY" << endl); break; // wait for it } else if (state & Page_entry::PAGEOUT) { DBG_LCP(" PAGEOUT -> state |= LCP" << endl); set_page_state(ptr, state | Page_entry::LCP); } else { DBG_LCP(" pageout()" << endl); ptr.p->m_state |= Page_entry::LCP; c_tup->disk_page_unmap_callback(ptr.p->m_real_page_i, ptr.p->m_dirty_count); pageout(signal, ptr); } ptr.p->m_last_lcp = m_last_lcp; m_lcp_outstanding++; } else { DBG_LCP(" NOT DIRTY" << endl); } pl_hash.next(iter); } m_lcp_curr_bucket = (iter.curr.i != RNIL ? iter.bucket : ~(Uint32)0); } if (m_lcp_curr_bucket == ~(Uint32)0 && !m_lcp_outstanding) { Ptr<Page_entry> ptr; Page_sublist& pl = *m_page_sublist[Page_entry::SL_LOCKED]; if (pl.first(ptr)) { process_lcp_locked(signal, ptr); } else { signal->theData[0] = m_end_lcp_req.senderData; sendSignal(m_end_lcp_req.senderRef, GSN_END_LCP_CONF, signal, 1, JBB); } return false; } return true; } void Pgman::process_lcp_locked(Signal* signal, Ptr<Page_entry> ptr) { CRASH_INSERTION(11006); ptr.p->m_last_lcp = m_last_lcp; if (ptr.p->m_state & Page_entry::DIRTY) { Ptr<GlobalPage> org, copy; ndbrequire(m_global_page_pool.seize(copy)); m_global_page_pool.getPtr(org, ptr.p->m_real_page_i); memcpy(copy.p, org.p, sizeof(GlobalPage)); ptr.p->m_copy_page_i = copy.i; m_lcp_outstanding++; ptr.p->m_state |= Page_entry::LCP; pageout(signal, ptr); return; } Page_sublist& pl = *m_page_sublist[Page_entry::SL_LOCKED]; pl.next(ptr); signal->theData[0] = PgmanContinueB::LCP_LOCKED; signal->theData[1] = ptr.i; sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB); } void Pgman::process_lcp_locked_fswriteconf(Signal* signal, Ptr<Page_entry> ptr) { Ptr<GlobalPage> org, copy; m_global_page_pool.getPtr(copy, ptr.p->m_copy_page_i); m_global_page_pool.getPtr(org, ptr.p->m_real_page_i); memcpy(org.p, copy.p, sizeof(GlobalPage)); m_global_page_pool.release(copy); ptr.p->m_copy_page_i = RNIL; Page_sublist& pl = *m_page_sublist[Page_entry::SL_LOCKED]; pl.next(ptr); signal->theData[0] = PgmanContinueB::LCP_LOCKED; signal->theData[1] = ptr.i; sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB); } // page read and write void Pgman::pagein(Signal* signal, Ptr<Page_entry> ptr) { #ifdef VM_TRACE debugOut << "PGMAN: pagein" << endl; debugOut << "PGMAN: " << ptr << endl; #endif ndbrequire(! (ptr.p->m_state & Page_entry::PAGEIN)); set_page_state(ptr, ptr.p->m_state | Page_entry::PAGEIN); fsreadreq(signal, ptr); m_stats.m_current_io_waits++; } void Pgman::fsreadconf(Signal* signal, Ptr<Page_entry> ptr) { #ifdef VM_TRACE debugOut << "PGMAN: fsreadconf" << endl; debugOut << "PGMAN: " << ptr << endl; #endif ndbrequire(ptr.p->m_state & Page_entry::PAGEIN); Page_state state = ptr.p->m_state; state &= ~ Page_entry::PAGEIN; state &= ~ Page_entry::EMPTY; state |= Page_entry::MAPPED; set_page_state(ptr, state); { /** * Update lsn record on page * as it can be modified/flushed wo/ update_lsn has been called * (e.g. prealloc) and it then would get lsn 0, which is bad * when running undo and following SR */ Ptr<GlobalPage> pagePtr; m_global_page_pool.getPtr(pagePtr, ptr.p->m_real_page_i); File_formats::Datafile::Data_page* page = (File_formats::Datafile::Data_page*)pagePtr.p; Uint64 lsn = 0; lsn += page->m_page_header.m_page_lsn_hi; lsn <<= 32; lsn += page->m_page_header.m_page_lsn_lo; ptr.p->m_lsn = lsn; } ndbrequire(m_stats.m_current_io_waits > 0); m_stats.m_current_io_waits--; ptr.p->m_last_lcp = m_last_lcp_complete; do_busy_loop(signal, true); } void Pgman::pageout(Signal* signal, Ptr<Page_entry> ptr) { #ifdef VM_TRACE debugOut << "PGMAN: pageout" << endl; debugOut << "PGMAN: " << ptr << endl; #endif Page_state state = ptr.p->m_state; ndbrequire(state & Page_entry::BOUND); ndbrequire(state & Page_entry::MAPPED); ndbrequire(! (state & Page_entry::BUSY)); ndbrequire(! (state & Page_entry::PAGEOUT)); state |= Page_entry::PAGEOUT; // update lsn on page prior to write Ptr<GlobalPage> pagePtr; m_global_page_pool.getPtr(pagePtr, ptr.p->m_real_page_i); File_formats::Datafile::Data_page* page = (File_formats::Datafile::Data_page*)pagePtr.p; page->m_page_header.m_page_lsn_hi = ptr.p->m_lsn >> 32; page->m_page_header.m_page_lsn_lo = ptr.p->m_lsn & 0xFFFFFFFF; // undo WAL Logfile_client::Request req; req.m_callback.m_callbackData = ptr.i; req.m_callback.m_callbackFunction = safe_cast(&Pgman::logsync_callback); int ret = m_lgman.sync_lsn(signal, ptr.p->m_lsn, &req, 0); if (ret > 0) { fswritereq(signal, ptr); m_stats.m_current_io_waits++; } else { ndbrequire(ret == 0); state |= Page_entry::LOGSYNC; } set_page_state(ptr, state); } void Pgman::logsync_callback(Signal* signal, Uint32 ptrI, Uint32 res) { Ptr<Page_entry> ptr; m_page_entry_pool.getPtr(ptr, ptrI); #ifdef VM_TRACE debugOut << "PGMAN: logsync_callback" << endl; debugOut << "PGMAN: " << ptr << endl; #endif // it is OK to be "busy" at this point (the commit is queued) Page_state state = ptr.p->m_state; ndbrequire(state & Page_entry::PAGEOUT); ndbrequire(state & Page_entry::LOGSYNC); state &= ~ Page_entry::LOGSYNC; set_page_state(ptr, state); fswritereq(signal, ptr); m_stats.m_current_io_waits++; } void Pgman::fswriteconf(Signal* signal, Ptr<Page_entry> ptr) { #ifdef VM_TRACE debugOut << "PGMAN: fswriteconf" << endl; debugOut << "PGMAN: " << ptr << endl; #endif Page_state state = ptr.p->m_state; ndbrequire(state & Page_entry::PAGEOUT); state &= ~ Page_entry::PAGEOUT; state &= ~ Page_entry::EMPTY; state &= ~ Page_entry::DIRTY; ndbrequire(m_stats.m_current_io_waits > 0); m_stats.m_current_io_waits--; if (state & Page_entry::LCP) { ndbrequire(m_lcp_outstanding); m_lcp_outstanding--; state &= ~ Page_entry::LCP; if (ptr.p->m_copy_page_i != RNIL) { process_lcp_locked_fswriteconf(signal, ptr); } } set_page_state(ptr, state); do_busy_loop(signal, true); } // file system interface void Pgman::fsreadreq(Signal* signal, Ptr<Page_entry> ptr) { File_map::ConstDataBufferIterator it; bool ret = m_file_map.first(it) && m_file_map.next(it, ptr.p->m_file_no); ndbrequire(ret); Uint32 fd = * it.data; ndbrequire(ptr.p->m_page_no > 0); FsReadWriteReq* req = (FsReadWriteReq*)signal->getDataPtrSend(); req->filePointer = fd; req->userReference = reference(); req->userPointer = ptr.i; req->varIndex = ptr.p->m_page_no; req->numberOfPages = 1; req->operationFlag = 0; FsReadWriteReq::setFormatFlag(req->operationFlag, FsReadWriteReq::fsFormatGlobalPage); req->data.pageData[0] = ptr.p->m_real_page_i; sendSignal(NDBFS_REF, GSN_FSREADREQ, signal, FsReadWriteReq::FixedLength + 1, JBA); } void Pgman::execFSREADCONF(Signal* signal) { jamEntry(); FsConf* conf = (FsConf*)signal->getDataPtr(); Ptr<Page_entry> ptr; m_page_entry_pool.getPtr(ptr, conf->userPointer); fsreadconf(signal, ptr); } void Pgman::execFSREADREF(Signal* signal) { jamEntry(); SimulatedBlock::execFSREADREF(signal); ndbrequire(false); } void Pgman::fswritereq(Signal* signal, Ptr<Page_entry> ptr) { File_map::ConstDataBufferIterator it; m_file_map.first(it); m_file_map.next(it, ptr.p->m_file_no); Uint32 fd = * it.data; ndbrequire(ptr.p->m_page_no > 0); FsReadWriteReq* req = (FsReadWriteReq*)signal->getDataPtrSend(); req->filePointer = fd; req->userReference = reference(); req->userPointer = ptr.i; req->varIndex = ptr.p->m_page_no; req->numberOfPages = 1; req->operationFlag = 0; FsReadWriteReq::setFormatFlag(req->operationFlag, FsReadWriteReq::fsFormatGlobalPage); req->data.pageData[0] = ptr.p->m_real_page_i; #if ERROR_INSERT_CODE if (ptr.p->m_state & Page_entry::LOCKED) { sendSignalWithDelay(NDBFS_REF, GSN_FSWRITEREQ, signal, 3000, FsReadWriteReq::FixedLength + 1); ndbout_c("pageout locked (3s)"); return; } #endif sendSignal(NDBFS_REF, GSN_FSWRITEREQ, signal, FsReadWriteReq::FixedLength + 1, JBA); } void Pgman::execFSWRITECONF(Signal* signal) { jamEntry(); FsConf* conf = (FsConf*)signal->getDataPtr(); Ptr<Page_entry> ptr; m_page_entry_pool.getPtr(ptr, conf->userPointer); fswriteconf(signal, ptr); } void Pgman::execFSWRITEREF(Signal* signal) { jamEntry(); SimulatedBlock::execFSWRITEREF(signal); ndbrequire(false); } // client methods int Pgman::get_page(Signal* signal, Ptr<Page_entry> ptr, Page_request page_req) { jamEntry(); #ifdef VM_TRACE Ptr<Page_request> tmp = { &page_req, RNIL}; debugOut << "PGMAN: >get_page" << endl; debugOut << "PGMAN: " << ptr << endl; debugOut << "PGMAN: " << tmp << endl; #endif Uint32 req_flags = page_req.m_flags; if (req_flags & Page_request::EMPTY_PAGE) { // Only one can "init" a page at a time //ndbrequire(ptr.p->m_requests.isEmpty()); } Page_state state = ptr.p->m_state; bool is_new = (state == 0); bool busy_count = false; if (req_flags & Page_request::LOCK_PAGE) { jam(); state |= Page_entry::LOCKED; } if (req_flags & Page_request::ALLOC_REQ) { jam(); } else if (req_flags & Page_request::COMMIT_REQ) { busy_count = true; state |= Page_entry::BUSY; } else if ((req_flags & Page_request::OP_MASK) != ZREAD) { jam(); } // update LIRS if (! (state & Page_entry::LOCKED) && ! (req_flags & Page_request::CORR_REQ)) { jam(); set_page_state(ptr, state); lirs_reference(ptr); state = ptr.p->m_state; } const Page_state LOCKED = Page_entry::LOCKED | Page_entry::MAPPED; if ((state & LOCKED) == LOCKED && ! (req_flags & Page_request::UNLOCK_PAGE)) { ptr.p->m_state |= (req_flags & DIRTY_FLAGS ? Page_entry::DIRTY : 0); if (ptr.p->m_copy_page_i != RNIL) { return ptr.p->m_copy_page_i; } return ptr.p->m_real_page_i; } bool only_request = ptr.p->m_requests.isEmpty(); #ifdef ERROR_INSERT if (req_flags & Page_request::DELAY_REQ) { jam(); only_request = false; } #endif if (only_request && state & Page_entry::MAPPED) { if (! (state & Page_entry::PAGEOUT)) { if (req_flags & DIRTY_FLAGS) state |= Page_entry::DIRTY; ptr.p->m_busy_count += busy_count; set_page_state(ptr, state); #ifdef VM_TRACE debugOut << "PGMAN: <get_page: immediate" << endl; #endif ndbrequire(ptr.p->m_real_page_i != RNIL); return ptr.p->m_real_page_i; } } if (! (req_flags & (Page_request::LOCK_PAGE | Page_request::UNLOCK_PAGE))) { ndbrequire(! (state & Page_entry::LOCKED)); } // queue the request Ptr<Pgman::Page_request> req_ptr; { Local_page_request_list req_list(m_page_request_pool, ptr.p->m_requests); if (! (req_flags & Page_request::ALLOC_REQ)) req_list.seizeLast(req_ptr); else req_list.seizeFirst(req_ptr); } if (req_ptr.i == RNIL) { if (is_new) { release_page_entry(ptr); } return -1; } req_ptr.p->m_block = page_req.m_block; req_ptr.p->m_flags = page_req.m_flags; req_ptr.p->m_callback = page_req.m_callback; #ifdef ERROR_INSERT req_ptr.p->m_delay_until_time = page_req.m_delay_until_time; #endif state |= Page_entry::REQUEST; if (only_request && req_flags & Page_request::EMPTY_PAGE) { state |= Page_entry::EMPTY; } if (req_flags & Page_request::UNLOCK_PAGE) { // keep it locked } ptr.p->m_busy_count += busy_count; ptr.p->m_dirty_count += !!(req_flags & DIRTY_FLAGS); set_page_state(ptr, state); do_busy_loop(signal, true); #ifdef VM_TRACE debugOut << "PGMAN: " << req_ptr << endl; debugOut << "PGMAN: <get_page: queued" << endl; #endif return 0; } void Pgman::update_lsn(Ptr<Page_entry> ptr, Uint32 block, Uint64 lsn) { jamEntry(); #ifdef VM_TRACE const char* bname = getBlockName(block, "?"); debugOut << "PGMAN: >update_lsn: block=" << bname << " lsn=" << lsn << endl; debugOut << "PGMAN: " << ptr << endl; #endif Page_state state = ptr.p->m_state; ptr.p->m_lsn = lsn; if (state & Page_entry::BUSY) { ndbrequire(ptr.p->m_busy_count != 0); if (--ptr.p->m_busy_count == 0) { state &= ~ Page_entry::BUSY; } } state |= Page_entry::DIRTY; set_page_state(ptr, state); #ifdef VM_TRACE debugOut << "PGMAN: " << ptr << endl; debugOut << "PGMAN: <update_lsn" << endl; #endif } Uint32 Pgman::create_data_file() { File_map::DataBufferIterator it; if(m_file_map.first(it)) { do { if(*it.data == RNIL) { *it.data = (1u << 31) | it.pos; return it.pos; } } while(m_file_map.next(it)); } Uint32 file_no = m_file_map.getSize(); Uint32 fd = (1u << 31) | file_no; if (m_file_map.append(&fd, 1)) { return file_no; } return RNIL; } Uint32 Pgman::alloc_data_file(Uint32 file_no) { Uint32 sz = m_file_map.getSize(); if (file_no >= sz) { Uint32 len = file_no - sz + 1; Uint32 fd = RNIL; while (len--) { if (! m_file_map.append(&fd, 1)) return RNIL; } } File_map::DataBufferIterator it; m_file_map.first(it); m_file_map.next(it, file_no); if (* it.data != RNIL) return RNIL; *it.data = (1u << 31) | file_no; return file_no; } void Pgman::map_file_no(Uint32 file_no, Uint32 fd) { File_map::DataBufferIterator it; m_file_map.first(it); m_file_map.next(it, file_no); assert(*it.data == ((1u << 31) | file_no)); *it.data = fd; } void Pgman::free_data_file(Uint32 file_no, Uint32 fd) { File_map::DataBufferIterator it; m_file_map.first(it); m_file_map.next(it, file_no); if (fd == RNIL) { ndbrequire(*it.data == ((1u << 31) | file_no)); } else { ndbrequire(*it.data == fd); } *it.data = RNIL; } int Pgman::drop_page(Ptr<Page_entry> ptr) { #ifdef VM_TRACE debugOut << "PGMAN: drop_page" << endl; debugOut << "PGMAN: " << ptr << endl; #endif Page_stack& pl_stack = m_page_stack; Page_queue& pl_queue = m_page_queue; Page_state state = ptr.p->m_state; if (! (state & (Page_entry::PAGEIN | Page_entry::PAGEOUT))) { ndbrequire(state & Page_entry::BOUND); ndbrequire(state & Page_entry::MAPPED); if (state & Page_entry::ONSTACK) { jam(); bool at_bottom = ! pl_stack.hasPrev(ptr); pl_stack.remove(ptr); state &= ~ Page_entry::ONSTACK; if (at_bottom && (state & Page_entry::HOT)) { jam(); lirs_stack_prune(); } } if (state & Page_entry::ONQUEUE) { jam(); pl_queue.remove(ptr); state &= ~ Page_entry::ONQUEUE; } if (ptr.p->m_real_page_i != RNIL) { jam(); release_cache_page(ptr.p->m_real_page_i); ptr.p->m_real_page_i = RNIL; } set_page_state(ptr, state); release_page_entry(ptr); return 1; } ndbrequire(false); return -1; } // debug #ifdef VM_TRACE void Pgman::verify_page_entry(Ptr<Page_entry> ptr) { Uint32 ptrI = ptr.i; Page_state state = ptr.p->m_state; bool has_req = state & Page_entry::REQUEST; bool has_req2 = ! ptr.p->m_requests.isEmpty(); ndbrequire(has_req == has_req2 || dump_page_lists(ptrI)); bool is_bound = state & Page_entry::BOUND; bool is_bound2 = ptr.p->m_real_page_i != RNIL; ndbrequire(is_bound == is_bound2 || dump_page_lists(ptrI)); bool is_mapped = state & Page_entry::MAPPED; // mapped implies bound ndbrequire(! is_mapped || is_bound || dump_page_lists(ptrI)); // bound is mapped or has open requests ndbrequire(! is_bound || is_mapped || has_req || dump_page_lists(ptrI)); bool on_stack = state & Page_entry::ONSTACK; bool is_hot = state & Page_entry::HOT; // hot entry must be on stack ndbrequire(! is_hot || on_stack || dump_page_lists(ptrI)); bool on_queue = state & Page_entry::ONQUEUE; // hot entry is not on queue ndbrequire(! is_hot || ! on_queue || dump_page_lists(ptrI)); bool is_locked = state & Page_entry::LOCKED; bool on_queue2 = ! is_locked && ! is_hot && is_bound; ndbrequire(on_queue == on_queue2 || dump_page_lists(ptrI)); // entries waiting to enter queue bool to_queue = ! is_locked && ! is_hot && ! is_bound && has_req; // page is either LOCKED or under LIRS bool is_lirs = on_stack || to_queue || on_queue; ndbrequire(is_locked == ! is_lirs || dump_page_lists(ptrI)); bool pagein = state & Page_entry::PAGEIN; bool pageout = state & Page_entry::PAGEOUT; // cannot read and write at same time ndbrequire(! pagein || ! pageout || dump_page_lists(ptrI)); Uint32 no = get_sublist_no(state); switch (no) { case Page_entry::SL_BIND: ndbrequire(! pagein && ! pageout || dump_page_lists(ptrI)); break; case Page_entry::SL_MAP: ndbrequire(! pagein && ! pageout || dump_page_lists(ptrI)); break; case Page_entry::SL_MAP_IO: ndbrequire(pagein && ! pageout || dump_page_lists(ptrI)); break; case Page_entry::SL_CALLBACK: ndbrequire(! pagein && ! pageout || dump_page_lists(ptrI)); break; case Page_entry::SL_CALLBACK_IO: ndbrequire(! pagein && pageout || dump_page_lists(ptrI)); break; case Page_entry::SL_BUSY: break; case Page_entry::SL_LOCKED: break; case Page_entry::SL_IDLE: break; case Page_entry::SL_OTHER: break; default: ndbrequire(false || dump_page_lists(ptrI)); break; } } void Pgman::verify_page_lists() { Page_hashlist& pl_hash = m_page_hashlist; Page_stack& pl_stack = m_page_stack; Page_queue& pl_queue = m_page_queue; Ptr<Page_entry> ptr; Uint32 stack_count = 0; Uint32 queue_count = 0; Uint32 queuewait_count = 0; Uint32 locked_bound_count = 0; Page_hashlist::Iterator iter; pl_hash.next(0, iter); while (iter.curr.i != RNIL) { verify_page_entry(iter.curr); Page_state state = iter.curr.p->m_state; if (state & Page_entry::ONSTACK) stack_count++; if (state & Page_entry::ONQUEUE) queue_count++; if (! (state & Page_entry::LOCKED) && ! (state & Page_entry::HOT) && (state & Page_entry::REQUEST) && ! (state & Page_entry::BOUND)) queuewait_count++; if (state & Page_entry::LOCKED && state & Page_entry::BOUND) locked_bound_count++; pl_hash.next(iter); } ndbrequire(stack_count == pl_stack.count() || dump_page_lists()); ndbrequire(queue_count == pl_queue.count() || dump_page_lists()); Uint32 hot_count = 0; Uint32 hot_bound_count = 0; Uint32 cold_bound_count = 0; Uint32 stack_request_count = 0; Uint32 queue_request_count = 0; Uint32 i1 = RNIL; for (pl_stack.first(ptr); ptr.i != RNIL; pl_stack.next(ptr)) { ndbrequire(i1 != ptr.i); i1 = ptr.i; Page_state state = ptr.p->m_state; ndbrequire(state & Page_entry::ONSTACK || dump_page_lists()); if (! pl_stack.hasPrev(ptr)) ndbrequire(state & Page_entry::HOT || dump_page_lists()); if (state & Page_entry::HOT) { hot_count++; if (state & Page_entry::BOUND) hot_bound_count++; } if (state & Page_entry::REQUEST) stack_request_count++; } Uint32 i2 = RNIL; for (pl_queue.first(ptr); ptr.i != RNIL; pl_queue.next(ptr)) { ndbrequire(i2 != ptr.i); i2 = ptr.i; Page_state state = ptr.p->m_state; ndbrequire(state & Page_entry::ONQUEUE || dump_page_lists()); ndbrequire(state & Page_entry::BOUND || dump_page_lists()); cold_bound_count++; if (state & Page_entry::REQUEST) queue_request_count++; } Uint32 tot_bound_count = locked_bound_count + hot_bound_count + cold_bound_count; ndbrequire(m_stats.m_num_pages == tot_bound_count || dump_page_lists()); Uint32 k; Uint32 entry_count = 0; for (k = 0; k < Page_entry::SUBLIST_COUNT; k++) { const Page_sublist& pl = *m_page_sublist[k]; for (pl.first(ptr); ptr.i != RNIL; pl.next(ptr)) { ndbrequire(get_sublist_no(ptr.p->m_state) == k || dump_page_lists()); entry_count++; } } ndbrequire(entry_count == pl_hash.count() || dump_page_lists()); debugOut << "PGMAN: loop" << " stats=" << m_stats_loop_on << " busy=" << m_busy_loop_on << " cleanup=" << m_cleanup_loop_on << " lcp=" << m_lcp_loop_on << endl; debugOut << "PGMAN:" << " entry:" << pl_hash.count() << " cache:" << m_stats.m_num_pages << "(" << locked_bound_count << "L)" << " stack:" << pl_stack.count() << " hot:" << hot_count << " hot_bound:" << hot_bound_count << " stack_request:" << stack_request_count << " queue:" << pl_queue.count() << " queue_request:" << queue_request_count << " queuewait:" << queuewait_count << endl; debugOut << "PGMAN:"; for (k = 0; k < Page_entry::SUBLIST_COUNT; k++) { const Page_sublist& pl = *m_page_sublist[k]; debugOut << " " << get_sublist_name(k) << ":" << pl.count(); } debugOut << endl; } void Pgman::verify_all() { Page_sublist& pl_bind = *m_page_sublist[Page_entry::SL_BIND]; Page_sublist& pl_map = *m_page_sublist[Page_entry::SL_MAP]; Page_sublist& pl_callback = *m_page_sublist[Page_entry::SL_CALLBACK]; if (! pl_bind.isEmpty() || ! pl_map.isEmpty() || ! pl_callback.isEmpty()) { ndbrequire(m_busy_loop_on || dump_page_lists()); } verify_page_lists(); } bool Pgman::dump_page_lists(Uint32 ptrI) { if (! debugFlag) open_debug_file(1); debugOut << "PGMAN: page list dump" << endl; if (ptrI != RNIL) debugOut << "PGMAN: error on PE [" << ptrI << "]" << endl; Page_hashlist& pl_hash = m_page_hashlist; Page_stack& pl_stack = m_page_stack; Page_queue& pl_queue = m_page_queue; Ptr<Page_entry> ptr; Uint32 n; char buf[40]; debugOut << "hash:" << endl; Page_hashlist::Iterator iter; pl_hash.next(0, iter); n = 0; while (iter.curr.i != RNIL) { sprintf(buf, "%03d", n++); debugOut << buf << " " << iter.curr << endl; pl_hash.next(iter); } debugOut << "stack:" << endl; n = 0; for (pl_stack.first(ptr); ptr.i != RNIL; pl_stack.next(ptr)) { sprintf(buf, "%03d", n++); debugOut << buf << " " << ptr << endl; } debugOut << "queue:" << endl; n = 0; for (pl_queue.first(ptr); ptr.i != RNIL; pl_queue.next(ptr)) { sprintf(buf, "%03d", n++); debugOut << buf << " " << ptr << endl; } Uint32 k; for (k = 0; k < Page_entry::SUBLIST_COUNT; k++) { debugOut << get_sublist_name(k) << ":" << endl; const Page_sublist& pl = *m_page_sublist[k]; for (pl.first(ptr); ptr.i != RNIL; pl.next(ptr)) { sprintf(buf, "%03d", n++); debugOut << buf << " " << ptr << endl; } } if (! debugFlag) open_debug_file(0); return false; } #endif const char* Pgman::get_sublist_name(Uint32 list_no) { switch (list_no) { case Page_entry::SL_BIND: return "bind"; case Page_entry::SL_MAP: return "map"; case Page_entry::SL_MAP_IO: return "map_io"; case Page_entry::SL_CALLBACK: return "callback"; case Page_entry::SL_CALLBACK_IO: return "callback_io"; case Page_entry::SL_BUSY: return "busy"; case Page_entry::SL_LOCKED: return "locked"; case Page_entry::SL_IDLE: return "idle"; case Page_entry::SL_OTHER: return "other"; } return "?"; } NdbOut& operator<<(NdbOut& out, Ptr<Pgman::Page_request> ptr) { const Pgman::Page_request& pr = *ptr.p; const char* bname = getBlockName(pr.m_block, "?"); out << "PR"; if (ptr.i != RNIL) out << " [" << dec << ptr.i << "]"; out << " block=" << bname; out << " flags=" << hex << pr.m_flags; out << "," << dec << (pr.m_flags & Pgman::Page_request::OP_MASK); { if (pr.m_flags & Pgman::Page_request::LOCK_PAGE) out << ",lock_page"; if (pr.m_flags & Pgman::Page_request::EMPTY_PAGE) out << ",empty_page"; if (pr.m_flags & Pgman::Page_request::ALLOC_REQ) out << ",alloc_req"; if (pr.m_flags & Pgman::Page_request::COMMIT_REQ) out << ",commit_req"; if (pr.m_flags & Pgman::Page_request::DIRTY_REQ) out << ",dirty_req"; if (pr.m_flags & Pgman::Page_request::CORR_REQ) out << ",corr_req"; } return out; } NdbOut& operator<<(NdbOut& out, Ptr<Pgman::Page_entry> ptr) { const Pgman::Page_entry pe = *ptr.p; Uint32 list_no = Pgman::get_sublist_no(pe.m_state); out << "PE [" << dec << ptr.i << "]"; out << " state=" << hex << pe.m_state; { if (pe.m_state & Pgman::Page_entry::REQUEST) out << ",request"; if (pe.m_state & Pgman::Page_entry::EMPTY) out << ",empty"; if (pe.m_state & Pgman::Page_entry::BOUND) out << ",bound"; if (pe.m_state & Pgman::Page_entry::MAPPED) out << ",mapped"; if (pe.m_state & Pgman::Page_entry::DIRTY) out << ",dirty"; if (pe.m_state & Pgman::Page_entry::USED) out << ",used"; if (pe.m_state & Pgman::Page_entry::BUSY) out << ",busy"; if (pe.m_state & Pgman::Page_entry::LOCKED) out << ",locked"; if (pe.m_state & Pgman::Page_entry::PAGEIN) out << ",pagein"; if (pe.m_state & Pgman::Page_entry::PAGEOUT) out << ",pageout"; if (pe.m_state & Pgman::Page_entry::LOGSYNC) out << ",logsync"; if (pe.m_state & Pgman::Page_entry::LCP) out << ",lcp"; if (pe.m_state & Pgman::Page_entry::HOT) out << ",hot"; if (pe.m_state & Pgman::Page_entry::ONSTACK) out << ",onstack"; if (pe.m_state & Pgman::Page_entry::ONQUEUE) out << ",onqueue"; } out << " list="; if (list_no == ZNIL) out << "NONE"; else { out << dec << list_no; out << "," << Pgman::get_sublist_name(list_no); } out << " diskpage=" << dec << pe.m_file_no << "," << pe.m_page_no; if (pe.m_real_page_i == RNIL) out << " realpage=RNIL"; else out << " realpage=" << dec << pe.m_real_page_i; out << " lsn=" << dec << pe.m_lsn; out << " busy_count=" << dec << pe.m_busy_count; #ifdef VM_TRACE { Pgman::Page_stack& pl_stack = pe.m_this->m_page_stack; if (! pl_stack.hasNext(ptr)) out << " top"; if (! pl_stack.hasPrev(ptr)) out << " bottom"; } { Pgman::Local_page_request_list req_list(ptr.p->m_this->m_page_request_pool, ptr.p->m_requests); if (! req_list.isEmpty()) { Ptr<Pgman::Page_request> req_ptr; out << " req:"; for (req_list.first(req_ptr); req_ptr.i != RNIL; req_list.next(req_ptr)) { out << " " << req_ptr; } } } #endif return out; } #ifdef VM_TRACE void Pgman::open_debug_file(Uint32 flag) { if (flag) { FILE* f = globalSignalLoggers.getOutputStream(); debugOut = *new NdbOut(*new FileOutputStream(f)); } else { debugOut = *new NdbOut(*new NullOutputStream()); } } #endif void Pgman::execDUMP_STATE_ORD(Signal* signal) { jamEntry(); Page_hashlist& pl_hash = m_page_hashlist; #ifdef VM_TRACE if (signal->theData[0] == 11000 && signal->getLength() == 2) { Uint32 flag = signal->theData[1]; open_debug_file(flag); debugFlag = flag; } #endif if (signal->theData[0] == 11001) { // XXX print hash list if no sublist Uint32 list = 0; if (signal->getLength() > 1) list = signal->theData[1]; Page_sublist& pl = *m_page_sublist[list]; Ptr<Page_entry> ptr; for (pl.first(ptr); ptr.i != RNIL; pl.next(ptr)) { ndbout << ptr << endl; infoEvent(" PE [ file: %d page: %d ] state: %x lsn: %lld lcp: %d busy: %d req-list: %d", ptr.p->m_file_no, ptr.p->m_page_no, ptr.p->m_state, ptr.p->m_lsn, ptr.p->m_last_lcp, ptr.p->m_busy_count, !ptr.p->m_requests.isEmpty()); } } if (signal->theData[0] == 11002 && signal->getLength() == 3) { Page_entry key; key.m_file_no = signal->theData[1]; key.m_page_no = signal->theData[2]; Ptr<Page_entry> ptr; if (pl_hash.find(ptr, key)) { ndbout << "pageout " << ptr << endl; c_tup->disk_page_unmap_callback(ptr.p->m_real_page_i, ptr.p->m_dirty_count); pageout(signal, ptr); } } if (signal->theData[0] == 11003) { #ifdef VM_TRACE verify_page_lists(); dump_page_lists(); #else ndbout << "Only in VM_TRACE builds" << endl; #endif } if (signal->theData[0] == 11004) { ndbout << "Dump LCP bucket m_lcp_outstanding: " << m_lcp_outstanding; if (m_lcp_curr_bucket != ~(Uint32)0) { Page_hashlist::Iterator iter; pl_hash.next(m_lcp_curr_bucket, iter); ndbout_c(" %d", m_lcp_curr_bucket); while (iter.curr.i != RNIL && iter.bucket == m_lcp_curr_bucket) { Ptr<Page_entry>& ptr = iter.curr; ndbout << ptr << endl; pl_hash.next(iter); } ndbout_c("-- done"); } else { ndbout_c(" == ~0"); } } if (signal->theData[0] == 11005) { g_dbg_lcp = ~g_dbg_lcp; } if (signal->theData[0] == 11006) { SET_ERROR_INSERT_VALUE(11006); } } // page cache client Page_cache_client::Page_cache_client(SimulatedBlock* block, Pgman* pgman) { m_block = block->number(); m_pgman = pgman; }