Commit cd976d16 authored by unknown's avatar unknown

ndb dd -

  fix undo of bitmap


storage/ndb/include/kernel/signaldata/PgmanContinueB.hpp:
  Handle locked pages last
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp:
  Remove LCP_PREPARE to pgman
storage/ndb/src/kernel/blocks/dblqh/Makefile.am:
  Fix make of redo reader
storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp:
  Fix assert
storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp:
  Fix bitmap undo
storage/ndb/src/kernel/blocks/lgman.cpp:
  Better lgman dump
storage/ndb/src/kernel/blocks/pgman.cpp:
  LCP locked pages last
storage/ndb/src/kernel/blocks/pgman.hpp:
  Handle locked pages last
storage/ndb/src/kernel/blocks/tsman.cpp:
  Fix bitmap undo
parent d5fad326
...@@ -30,7 +30,7 @@ private: ...@@ -30,7 +30,7 @@ private:
BUSY_LOOP = 1, BUSY_LOOP = 1,
CLEANUP_LOOP = 2, CLEANUP_LOOP = 2,
LCP_LOOP = 3, LCP_LOOP = 3,
LCP_PREPARE = 4 LCP_LOCKED = 4
}; };
}; };
......
...@@ -11203,12 +11203,6 @@ void Dblqh::sendLCP_FRAGIDREQ(Signal* signal) ...@@ -11203,12 +11203,6 @@ void Dblqh::sendLCP_FRAGIDREQ(Signal* signal)
sendSignal(BACKUP_REF, GSN_LCP_PREPARE_REQ, signal, sendSignal(BACKUP_REF, GSN_LCP_PREPARE_REQ, signal,
LcpPrepareReq::SignalLength, JBB); LcpPrepareReq::SignalLength, JBB);
if (lcpPtr.p->firstFragmentFlag)
{
lcpPtr.p->m_outstanding++;
sendSignal(PGMAN_REF, GSN_LCP_PREPARE_REQ, signal,
LcpPrepareReq::SignalLength, JBB);
}
}//Dblqh::sendLCP_FRAGIDREQ() }//Dblqh::sendLCP_FRAGIDREQ()
void Dblqh::sendEMPTY_LCP_CONF(Signal* signal, bool idle) void Dblqh::sendEMPTY_LCP_CONF(Signal* signal, bool idle)
......
...@@ -6,9 +6,11 @@ ndbd_redo_log_reader_SOURCES = redoLogReader/records.cpp \ ...@@ -6,9 +6,11 @@ ndbd_redo_log_reader_SOURCES = redoLogReader/records.cpp \
include $(top_srcdir)/storage/ndb/config/common.mk.am include $(top_srcdir)/storage/ndb/config/common.mk.am
include $(top_srcdir)/storage/ndb/config/type_kernel.mk.am include $(top_srcdir)/storage/ndb/config/type_kernel.mk.am
LDADD += \ ndbd_redo_log_reader_LDFLAGS = @ndb_bin_am_ldflags@ \
$(top_builddir)/storage/ndb/src/common/util/libgeneral.la \ $(top_builddir)/storage/ndb/src/libndbclient.la \
$(top_builddir)/storage/ndb/src/common/portlib/libportlib.la $(top_builddir)/dbug/libdbug.a \
$(top_builddir)/mysys/libmysys.a \
$(top_builddir)/strings/libmystrings.a
# Don't update the files from bitkeeper # Don't update the files from bitkeeper
%::SCCS/s.% %::SCCS/s.%
...@@ -56,7 +56,7 @@ void Dbtup::execTUP_DEALLOCREQ(Signal* signal) ...@@ -56,7 +56,7 @@ void Dbtup::execTUP_DEALLOCREQ(Signal* signal)
if (ptr->m_header_bits & Tuple_header::LCP_KEEP) if (ptr->m_header_bits & Tuple_header::LCP_KEEP)
{ {
ndbassert(ptr.p->m_header_bits & Tuple_header::FREED); ndbassert(! (ptr->m_header_bits & Tuple_header::FREED));
ptr->m_header_bits |= Tuple_header::FREED; ptr->m_header_bits |= Tuple_header::FREED;
return; return;
} }
......
...@@ -287,8 +287,24 @@ Dbtup::restart_setup_page(Disk_alloc_info& alloc, Ptr<Page> pagePtr) ...@@ -287,8 +287,24 @@ Dbtup::restart_setup_page(Disk_alloc_info& alloc, Ptr<Page> pagePtr)
extentPtr.p->m_free_space += (real_free - estimated); extentPtr.p->m_free_space += (real_free - estimated);
update_extent_pos(alloc, extentPtr); update_extent_pos(alloc, extentPtr);
} }
}
#ifdef VM_TRACE
{
Local_key page;
page.m_file_no = pagePtr.p->m_file_no;
page.m_page_no = pagePtr.p->m_page_no;
Tablespace_client tsman(0, c_tsman,
0, 0, 0);
unsigned uncommitted, committed;
uncommitted = committed = ~(unsigned)0;
int ret = tsman.get_page_free_bits(&page, &uncommitted, &committed);
idx = alloc.calc_page_free_bits(real_free);
ddassert(idx == committed);
}
#endif
}
/** /**
* - Page free bits - * - Page free bits -
...@@ -743,10 +759,6 @@ Dbtup::disk_page_prealloc_callback_common(Signal* signal, ...@@ -743,10 +759,6 @@ Dbtup::disk_page_prealloc_callback_common(Signal* signal,
update_extent_pos(alloc, extentPtr); update_extent_pos(alloc, extentPtr);
} }
else
{
ndbout << endl;
}
{ {
Page_request_list list(c_page_request_pool, Page_request_list list(c_page_request_pool,
...@@ -771,15 +783,14 @@ Dbtup::disk_page_set_dirty(Ptr<Page> pagePtr) ...@@ -771,15 +783,14 @@ Dbtup::disk_page_set_dirty(Ptr<Page> pagePtr)
key.m_page_no = pagePtr.p->m_page_no; key.m_page_no = pagePtr.p->m_page_no;
key.m_file_no = pagePtr.p->m_file_no; key.m_file_no = pagePtr.p->m_file_no;
pagePtr.p->nextList = pagePtr.p->prevList = RNIL;
if (DBG_DISK) if (DBG_DISK)
ndbout << " disk_page_set_dirty " << key << endl; ndbout << " disk_page_set_dirty " << key << endl;
Uint32 tableId = pagePtr.p->m_table_id; Uint32 tableId = pagePtr.p->m_table_id;
Uint32 fragId = pagePtr.p->m_fragment_id; Uint32 fragId = pagePtr.p->m_fragment_id;
Uint32 free = pagePtr.p->free_space;
Uint32 used = pagePtr.p->uncommitted_used_space;
Ptr<Tablerec> tabPtr; Ptr<Tablerec> tabPtr;
tabPtr.i= pagePtr.p->m_table_id; tabPtr.i= pagePtr.p->m_table_id;
ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec); ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
...@@ -789,19 +800,28 @@ Dbtup::disk_page_set_dirty(Ptr<Page> pagePtr) ...@@ -789,19 +800,28 @@ Dbtup::disk_page_set_dirty(Ptr<Page> pagePtr)
Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info; Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
Uint32 free = pagePtr.p->free_space;
Uint32 used = pagePtr.p->uncommitted_used_space;
if (unlikely(pagePtr.p->m_restart_seq != globalData.m_restart_seq)) if (unlikely(pagePtr.p->m_restart_seq != globalData.m_restart_seq))
{ {
jam();
restart_setup_page(alloc, pagePtr); restart_setup_page(alloc, pagePtr);
idx = alloc.calc_page_free_bits(free);
used = 0;
} }
else
{
idx &= ~0x8000;
ddassert(idx == alloc.calc_page_free_bits(free - used));
}
ddassert(free >= used);
Tablespace_client tsman(0, c_tsman, Tablespace_client tsman(0, c_tsman,
fragPtr.p->fragTableId, fragPtr.p->fragTableId,
fragPtr.p->fragmentId, fragPtr.p->fragmentId,
fragPtr.p->m_tablespace_id); fragPtr.p->m_tablespace_id);
ddassert(free >= used);
idx= alloc.calc_page_free_bits(free - used);
pagePtr.p->list_index = idx; pagePtr.p->list_index = idx;
ArrayPool<Page> *pool= (ArrayPool<Page>*)&m_global_page_pool; ArrayPool<Page> *pool= (ArrayPool<Page>*)&m_global_page_pool;
LocalDLList<Page> list(*pool, alloc.m_dirty_pages[idx]); LocalDLList<Page> list(*pool, alloc.m_dirty_pages[idx]);
...@@ -1340,9 +1360,15 @@ Dbtup::disk_restart_undo_callback(Signal* signal, ...@@ -1340,9 +1360,15 @@ Dbtup::disk_restart_undo_callback(Signal* signal,
lsn += pagePtr.p->m_page_header.m_page_lsn_hi; lsn <<= 32; lsn += pagePtr.p->m_page_header.m_page_lsn_hi; lsn <<= 32;
lsn += pagePtr.p->m_page_header.m_page_lsn_lo; lsn += pagePtr.p->m_page_header.m_page_lsn_lo;
undo->m_page_ptr = pagePtr;
if (undo->m_lsn <= lsn) if (undo->m_lsn <= lsn)
{ {
undo->m_page_ptr = pagePtr; if (DBG_UNDO)
{
ndbout << "apply: " << undo->m_lsn << "(" << lsn << " )"
<< key << " type: " << undo->m_type << endl;
}
update = true; update = true;
if (DBG_UNDO) if (DBG_UNDO)
...@@ -1368,13 +1394,17 @@ Dbtup::disk_restart_undo_callback(Signal* signal, ...@@ -1368,13 +1394,17 @@ Dbtup::disk_restart_undo_callback(Signal* signal,
ndbout << "disk_restart_undo: " << undo->m_type << " " ndbout << "disk_restart_undo: " << undo->m_type << " "
<< undo->m_key << endl; << undo->m_key << endl;
disk_restart_undo_page_bits(signal, undo);
lsn = undo->m_lsn - 1; // make sure undo isn't run again... lsn = undo->m_lsn - 1; // make sure undo isn't run again...
m_pgman.update_lsn(undo->m_key, lsn); m_pgman.update_lsn(undo->m_key, lsn);
} }
else if (DBG_UNDO)
{
ndbout << "ignore: " << undo->m_lsn << "(" << lsn << " )"
<< key << " type: " << undo->m_type << endl;
}
disk_restart_undo_page_bits(signal, undo);
disk_restart_undo_next(signal); disk_restart_undo_next(signal);
} }
......
...@@ -280,6 +280,16 @@ Lgman::execDUMP_STATE_ORD(Signal* signal){ ...@@ -280,6 +280,16 @@ Lgman::execDUMP_STATE_ORD(Signal* signal){
waiter.p->m_size, waiter.p->m_size,
2*File_formats::UNDO_PAGE_WORDS); 2*File_formats::UNDO_PAGE_WORDS);
} }
if (!ptr.p->m_log_sync_waiters.isEmpty())
{
LocalDLFifoList<Log_waiter>
list(m_log_waiter_pool, ptr.p->m_log_sync_waiters);
Ptr<Log_waiter> waiter;
list.first(waiter);
infoEvent(" m_last_synced_lsn: %lld: %d head(waiters).m_sync_lsn: %lld",
ptr.p->m_last_synced_lsn,
waiter.p->m_sync_lsn);
}
m_logfile_group_list.next(ptr); m_logfile_group_list.next(ptr);
} }
} }
......
...@@ -70,7 +70,6 @@ Pgman::Pgman(const Configuration & conf) : ...@@ -70,7 +70,6 @@ Pgman::Pgman(const Configuration & conf) :
addRecSignal(GSN_FSWRITEREF, &Pgman::execFSWRITEREF, true); addRecSignal(GSN_FSWRITEREF, &Pgman::execFSWRITEREF, true);
addRecSignal(GSN_FSWRITECONF, &Pgman::execFSWRITECONF); addRecSignal(GSN_FSWRITECONF, &Pgman::execFSWRITECONF);
addRecSignal(GSN_LCP_PREPARE_REQ, &Pgman::execLCP_PREPARE_REQ);
addRecSignal(GSN_LCP_FRAG_ORD, &Pgman::execLCP_FRAG_ORD); addRecSignal(GSN_LCP_FRAG_ORD, &Pgman::execLCP_FRAG_ORD);
addRecSignal(GSN_END_LCP_REQ, &Pgman::execEND_LCP_REQ); addRecSignal(GSN_END_LCP_REQ, &Pgman::execEND_LCP_REQ);
...@@ -224,20 +223,20 @@ Pgman::execCONTINUEB(Signal* signal) ...@@ -224,20 +223,20 @@ Pgman::execCONTINUEB(Signal* signal)
jam(); jam();
do_lcp_loop(signal); do_lcp_loop(signal);
break; break;
case PgmanContinueB::LCP_PREPARE: case PgmanContinueB::LCP_LOCKED:
{ {
jam(); jam();
Ptr<Page_entry> ptr; Ptr<Page_entry> ptr;
Page_sublist& pl = *m_page_sublist[Page_entry::SL_LOCKED]; Page_sublist& pl = *m_page_sublist[Page_entry::SL_LOCKED];
pl.getPtr(ptr, data1); if (data1 != RNIL)
if (pl.next(ptr))
{ {
process_lcp_prepare(signal, ptr); pl.getPtr(ptr, data1);
process_lcp_locked(signal, ptr);
} }
else else
{ {
signal->theData[0] = 0; signal->theData[0] = m_end_lcp_req.senderData;
sendSignal(DBLQH_REF, GSN_LCP_PREPARE_CONF, signal, 1, JBB); sendSignal(m_end_lcp_req.senderRef, GSN_END_LCP_CONF, signal, 1, JBB);
} }
return; return;
} }
...@@ -1105,79 +1104,6 @@ Pgman::move_cleanup_ptr(Ptr<Page_entry> ptr) ...@@ -1105,79 +1104,6 @@ Pgman::move_cleanup_ptr(Ptr<Page_entry> ptr)
// LCP // LCP
void
Pgman::execLCP_PREPARE_REQ(Signal* signal)
{
jamEntry();
/**
* Reserve pages for all LOCKED pages...
*/
Ptr<Page_entry> ptr;
Page_sublist& pl = *m_page_sublist[Page_entry::SL_LOCKED];
if (pl.first(ptr))
{
process_lcp_prepare(signal, ptr);
}
else
{
signal->theData[0] = 0;
sendSignal(DBLQH_REF, GSN_LCP_PREPARE_CONF, signal, 1, JBB);
}
}
void
Pgman::process_lcp_prepare(Signal* signal, Ptr<Page_entry> ptr)
{
ndbrequire(ptr.p->m_copy_page_i == RNIL);
Ptr<GlobalPage> copy;
ndbrequire(m_global_page_pool.seize(copy));
ptr.p->m_copy_page_i = copy.i;
signal->theData[0] = PgmanContinueB::LCP_PREPARE;
signal->theData[1] = ptr.i;
sendSignal(PGMAN_REF, GSN_CONTINUEB, signal, 2, JBB);
}
int
Pgman::create_copy_page(Ptr<Page_entry> ptr, Uint32 req_flags)
{
if (! (req_flags & DIRTY_FLAGS) && ! (ptr.p->m_state & Page_entry::COPY))
{
return ptr.p->m_real_page_i;
}
if (! (ptr.p->m_state & Page_entry::COPY))
{
ptr.p->m_state |= Page_entry::COPY;
Ptr<GlobalPage> src;
Ptr<GlobalPage> copy;
m_global_page_pool.getPtr(src, ptr.p->m_real_page_i);
m_global_page_pool.getPtr(copy, ptr.p->m_copy_page_i);
memcpy(copy.p, src.p, sizeof(GlobalPage));
}
return ptr.p->m_copy_page_i;
}
void
Pgman::restore_copy_page(Ptr<Page_entry> ptr)
{
Uint32 copyPtrI = ptr.p->m_copy_page_i;
if (ptr.p->m_state & Page_entry::COPY)
{
Ptr<GlobalPage> src;
Ptr<GlobalPage> copy;
m_global_page_pool.getPtr(src, ptr.p->m_real_page_i);
m_global_page_pool.getPtr(copy, copyPtrI);
memcpy(src.p, copy.p, sizeof(GlobalPage));
}
m_global_page_pool.release(copyPtrI);
ptr.p->m_state &= ~Page_entry::COPY;
ptr.p->m_copy_page_i = RNIL;
}
void void
Pgman::execLCP_FRAG_ORD(Signal* signal) Pgman::execLCP_FRAG_ORD(Signal* signal)
...@@ -1251,7 +1177,8 @@ Pgman::process_lcp(Signal* signal) ...@@ -1251,7 +1177,8 @@ Pgman::process_lcp(Signal* signal)
DBG_LCP("LCP " << ptr << " - "); DBG_LCP("LCP " << ptr << " - ");
if (ptr.p->m_last_lcp < m_last_lcp && if (ptr.p->m_last_lcp < m_last_lcp &&
(state & Page_entry::DIRTY)) (state & Page_entry::DIRTY) &&
(! (state & Page_entry::LOCKED)))
{ {
if(! (state & Page_entry::BOUND)) if(! (state & Page_entry::BOUND))
{ {
...@@ -1263,7 +1190,7 @@ Pgman::process_lcp(Signal* signal) ...@@ -1263,7 +1190,7 @@ Pgman::process_lcp(Signal* signal)
DBG_LCP(" BUSY" << endl); DBG_LCP(" BUSY" << endl);
break; // wait for it break; // wait for it
} }
if (state & Page_entry::PAGEOUT) else if (state & Page_entry::PAGEOUT)
{ {
DBG_LCP(" PAGEOUT -> state |= LCP" << endl); DBG_LCP(" PAGEOUT -> state |= LCP" << endl);
set_page_state(ptr, state | Page_entry::LCP); set_page_state(ptr, state | Page_entry::LCP);
...@@ -1279,11 +1206,6 @@ Pgman::process_lcp(Signal* signal) ...@@ -1279,11 +1206,6 @@ Pgman::process_lcp(Signal* signal)
ptr.p->m_last_lcp = m_last_lcp; ptr.p->m_last_lcp = m_last_lcp;
m_lcp_outstanding++; m_lcp_outstanding++;
} }
else if (ptr.p->m_copy_page_i != RNIL)
{
DBG_LCP(" NOT DIRTY" << endl);
restore_copy_page(ptr);
}
else else
{ {
DBG_LCP(" NOT DIRTY" << endl); DBG_LCP(" NOT DIRTY" << endl);
...@@ -1296,16 +1218,68 @@ Pgman::process_lcp(Signal* signal) ...@@ -1296,16 +1218,68 @@ Pgman::process_lcp(Signal* signal)
if (m_lcp_curr_bucket == ~(Uint32)0 && !m_lcp_outstanding) if (m_lcp_curr_bucket == ~(Uint32)0 && !m_lcp_outstanding)
{ {
DBG_LCP("GSN_END_LCP_CONF" << endl); Ptr<Page_entry> ptr;
Page_sublist& pl = *m_page_sublist[Page_entry::SL_LOCKED];
if (pl.first(ptr))
{
process_lcp_locked(signal, ptr);
}
else
{
signal->theData[0] = m_end_lcp_req.senderData; signal->theData[0] = m_end_lcp_req.senderData;
sendSignal(m_end_lcp_req.senderRef, GSN_END_LCP_CONF, signal, 1, JBB); sendSignal(m_end_lcp_req.senderRef, GSN_END_LCP_CONF, signal, 1, JBB);
m_lcp_curr_bucket = ~(Uint32)0; }
return false; return false;
} }
return true; return true;
} }
void
Pgman::process_lcp_locked(Signal* signal, Ptr<Page_entry> ptr)
{
ptr.p->m_last_lcp = m_last_lcp;
if (ptr.p->m_state & Page_entry::DIRTY)
{
Ptr<GlobalPage> org, copy;
ndbrequire(m_global_page_pool.seize(copy));
m_global_page_pool.getPtr(org, ptr.p->m_real_page_i);
memcpy(copy.p, org.p, sizeof(GlobalPage));
ptr.p->m_copy_page_i = copy.i;
m_lcp_outstanding++;
ptr.p->m_state |= Page_entry::LCP;
pageout(signal, ptr);
return;
}
Page_sublist& pl = *m_page_sublist[Page_entry::SL_LOCKED];
pl.next(ptr);
signal->theData[0] = PgmanContinueB::LCP_LOCKED;
signal->theData[1] = ptr.i;
sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
}
void
Pgman::process_lcp_locked_fswriteconf(Signal* signal, Ptr<Page_entry> ptr)
{
Ptr<GlobalPage> org, copy;
m_global_page_pool.getPtr(copy, ptr.p->m_copy_page_i);
m_global_page_pool.getPtr(org, ptr.p->m_real_page_i);
memcpy(org.p, copy.p, sizeof(GlobalPage));
m_global_page_pool.release(copy);
ptr.p->m_copy_page_i = RNIL;
Page_sublist& pl = *m_page_sublist[Page_entry::SL_LOCKED];
pl.next(ptr);
signal->theData[0] = PgmanContinueB::LCP_LOCKED;
signal->theData[1] = ptr.i;
sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
}
// page read and write // page read and write
void void
...@@ -1427,18 +1401,16 @@ Pgman::fswriteconf(Signal* signal, Ptr<Page_entry> ptr) ...@@ -1427,18 +1401,16 @@ Pgman::fswriteconf(Signal* signal, Ptr<Page_entry> ptr)
ndbrequire(m_stats.m_current_io_waits > 0); ndbrequire(m_stats.m_current_io_waits > 0);
m_stats.m_current_io_waits--; m_stats.m_current_io_waits--;
if (ptr.p->m_copy_page_i != RNIL)
{
jam();
restore_copy_page(ptr);
state &= ~ Page_entry::COPY;
}
if (state & Page_entry::LCP) if (state & Page_entry::LCP)
{ {
ndbrequire(m_lcp_outstanding); ndbrequire(m_lcp_outstanding);
m_lcp_outstanding--; m_lcp_outstanding--;
state &= ~ Page_entry::LCP; state &= ~ Page_entry::LCP;
if (ptr.p->m_copy_page_i != RNIL)
{
process_lcp_locked_fswriteconf(signal, ptr);
}
} }
set_page_state(ptr, state); set_page_state(ptr, state);
...@@ -1592,9 +1564,9 @@ Pgman::get_page(Signal* signal, Ptr<Page_entry> ptr, Page_request page_req) ...@@ -1592,9 +1564,9 @@ Pgman::get_page(Signal* signal, Ptr<Page_entry> ptr, Page_request page_req)
! (req_flags & Page_request::UNLOCK_PAGE)) ! (req_flags & Page_request::UNLOCK_PAGE))
{ {
ptr.p->m_state |= (req_flags & DIRTY_FLAGS ? Page_entry::DIRTY : 0); ptr.p->m_state |= (req_flags & DIRTY_FLAGS ? Page_entry::DIRTY : 0);
if (m_lcp_loop_on && ptr.p->m_copy_page_i != RNIL) if (ptr.p->m_copy_page_i != RNIL)
{ {
return create_copy_page(ptr, req_flags); return ptr.p->m_copy_page_i;
} }
return ptr.p->m_real_page_i; return ptr.p->m_real_page_i;
......
...@@ -305,7 +305,6 @@ private: ...@@ -305,7 +305,6 @@ private:
,PAGEIN = 0x0100 // paging in ,PAGEIN = 0x0100 // paging in
,PAGEOUT = 0x0200 // paging out ,PAGEOUT = 0x0200 // paging out
,LOGSYNC = 0x0400 // undo WAL as part of pageout ,LOGSYNC = 0x0400 // undo WAL as part of pageout
,COPY = 0x0800 // Copy page for LCP
,LCP = 0x1000 // page is LCP flushed ,LCP = 0x1000 // page is LCP flushed
,HOT = 0x2000 // page is hot ,HOT = 0x2000 // page is hot
,ONSTACK = 0x4000 // page is on LIRS stack ,ONSTACK = 0x4000 // page is on LIRS stack
...@@ -419,7 +418,6 @@ protected: ...@@ -419,7 +418,6 @@ protected:
void execREAD_CONFIG_REQ(Signal* signal); void execREAD_CONFIG_REQ(Signal* signal);
void execCONTINUEB(Signal* signal); void execCONTINUEB(Signal* signal);
void execLCP_PREPARE_REQ(Signal* signal);
void execLCP_FRAG_ORD(Signal*); void execLCP_FRAG_ORD(Signal*);
void execEND_LCP_REQ(Signal*); void execEND_LCP_REQ(Signal*);
...@@ -462,9 +460,8 @@ private: ...@@ -462,9 +460,8 @@ private:
void move_cleanup_ptr(Ptr<Page_entry> ptr); void move_cleanup_ptr(Ptr<Page_entry> ptr);
bool process_lcp(Signal*); bool process_lcp(Signal*);
void process_lcp_prepare(Signal* signal, Ptr<Page_entry> ptr); void process_lcp_locked(Signal* signal, Ptr<Page_entry> ptr);
int create_copy_page(Ptr<Page_entry>, Uint32 req_flags); void process_lcp_locked_fswriteconf(Signal* signal, Ptr<Page_entry> ptr);
void restore_copy_page(Ptr<Page_entry>);
void pagein(Signal*, Ptr<Page_entry>); void pagein(Signal*, Ptr<Page_entry>);
void fsreadreq(Signal*, Ptr<Page_entry>); void fsreadreq(Signal*, Ptr<Page_entry>);
......
...@@ -1777,7 +1777,7 @@ Tsman::restart_undo_page_free_bits(Signal* signal, ...@@ -1777,7 +1777,7 @@ Tsman::restart_undo_page_free_bits(Signal* signal,
/** /**
* Handling of unmapped extent header pages is not implemented * Handling of unmapped extent header pages is not implemented
*/ */
int flags = 0; int flags = Page_cache_client::DIRTY_REQ;
int real_page_id; int real_page_id;
if ((real_page_id = m_page_cache_client.get_page(signal, preq, flags)) > 0) if ((real_page_id = m_page_cache_client.get_page(signal, preq, flags)) > 0)
{ {
...@@ -1805,31 +1805,20 @@ Tsman::restart_undo_page_free_bits(Signal* signal, ...@@ -1805,31 +1805,20 @@ Tsman::restart_undo_page_free_bits(Signal* signal,
lsn += page->m_page_header.m_page_lsn_hi; lsn <<= 32; lsn += page->m_page_header.m_page_lsn_hi; lsn <<= 32;
lsn += page->m_page_header.m_page_lsn_lo; lsn += page->m_page_header.m_page_lsn_lo;
if (undo_lsn <= lsn)
{
/** /**
* Toggle word * Toggle word
*/ */
if (DBG_UNDO) if (DBG_UNDO)
ndbout_c("tsman: apply %lld(%lld) %x -> %x", {
undo_lsn, lsn, src, (bits | (bits << UNCOMMITTED_SHIFT))); ndbout << "tsman: apply " << undo_lsn << "(" << lsn << ") "
<< *key << " " << (src & COMMITTED_MASK)
<< " -> " << bits << endl;
}
lsn = undo_lsn;
page->m_page_header.m_page_lsn_hi = lsn >> 32;
page->m_page_header.m_page_lsn_lo = lsn & 0xFFFFFFFF;
ndbassert((bits & ~(COMMITTED_MASK)) == 0); ndbassert((bits & ~(COMMITTED_MASK)) == 0);
header->update_free_bits(page_no_in_extent, header->update_free_bits(page_no_in_extent,
bits | (bits << UNCOMMITTED_SHIFT)); bits | (bits << UNCOMMITTED_SHIFT));
m_page_cache_client.update_lsn(preq.m_page, lsn);
}
else
{
if (DBG_UNDO)
ndbout_c("tsman: apply %lld(%lld) %x -> %x",
undo_lsn, lsn, src, (bits | (bits << UNCOMMITTED_SHIFT)));
}
return 0; return 0;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment