Commit 635ce25b authored by jonas@eel.(none)'s avatar jonas@eel.(none)

ndb dd -

  fix undo of bitmap
parent 044c51ab
......@@ -30,7 +30,7 @@ private:
BUSY_LOOP = 1,
CLEANUP_LOOP = 2,
LCP_LOOP = 3,
LCP_PREPARE = 4
LCP_LOCKED = 4
};
};
......
......@@ -11203,12 +11203,6 @@ void Dblqh::sendLCP_FRAGIDREQ(Signal* signal)
sendSignal(BACKUP_REF, GSN_LCP_PREPARE_REQ, signal,
LcpPrepareReq::SignalLength, JBB);
if (lcpPtr.p->firstFragmentFlag)
{
lcpPtr.p->m_outstanding++;
sendSignal(PGMAN_REF, GSN_LCP_PREPARE_REQ, signal,
LcpPrepareReq::SignalLength, JBB);
}
}//Dblqh::sendLCP_FRAGIDREQ()
void Dblqh::sendEMPTY_LCP_CONF(Signal* signal, bool idle)
......
......@@ -6,9 +6,11 @@ ndbd_redo_log_reader_SOURCES = redoLogReader/records.cpp \
include $(top_srcdir)/storage/ndb/config/common.mk.am
include $(top_srcdir)/storage/ndb/config/type_kernel.mk.am
LDADD += \
$(top_builddir)/storage/ndb/src/common/util/libgeneral.la \
$(top_builddir)/storage/ndb/src/common/portlib/libportlib.la
ndbd_redo_log_reader_LDFLAGS = @ndb_bin_am_ldflags@ \
$(top_builddir)/storage/ndb/src/libndbclient.la \
$(top_builddir)/dbug/libdbug.a \
$(top_builddir)/mysys/libmysys.a \
$(top_builddir)/strings/libmystrings.a
# Don't update the files from bitkeeper
%::SCCS/s.%
......@@ -56,7 +56,7 @@ void Dbtup::execTUP_DEALLOCREQ(Signal* signal)
if (ptr->m_header_bits & Tuple_header::LCP_KEEP)
{
ndbassert(ptr.p->m_header_bits & Tuple_header::FREED);
ndbassert(! (ptr->m_header_bits & Tuple_header::FREED));
ptr->m_header_bits |= Tuple_header::FREED;
return;
}
......
......@@ -287,8 +287,24 @@ Dbtup::restart_setup_page(Disk_alloc_info& alloc, Ptr<Page> pagePtr)
extentPtr.p->m_free_space += (real_free - estimated);
update_extent_pos(alloc, extentPtr);
}
}
#ifdef VM_TRACE
{
Local_key page;
page.m_file_no = pagePtr.p->m_file_no;
page.m_page_no = pagePtr.p->m_page_no;
Tablespace_client tsman(0, c_tsman,
0, 0, 0);
unsigned uncommitted, committed;
uncommitted = committed = ~(unsigned)0;
int ret = tsman.get_page_free_bits(&page, &uncommitted, &committed);
idx = alloc.calc_page_free_bits(real_free);
ddassert(idx == committed);
}
#endif
}
/**
* - Page free bits -
......@@ -743,10 +759,6 @@ Dbtup::disk_page_prealloc_callback_common(Signal* signal,
update_extent_pos(alloc, extentPtr);
}
else
{
ndbout << endl;
}
{
Page_request_list list(c_page_request_pool,
......@@ -771,15 +783,14 @@ Dbtup::disk_page_set_dirty(Ptr<Page> pagePtr)
key.m_page_no = pagePtr.p->m_page_no;
key.m_file_no = pagePtr.p->m_file_no;
pagePtr.p->nextList = pagePtr.p->prevList = RNIL;
if (DBG_DISK)
ndbout << " disk_page_set_dirty " << key << endl;
Uint32 tableId = pagePtr.p->m_table_id;
Uint32 fragId = pagePtr.p->m_fragment_id;
Uint32 free = pagePtr.p->free_space;
Uint32 used = pagePtr.p->uncommitted_used_space;
Ptr<Tablerec> tabPtr;
tabPtr.i= pagePtr.p->m_table_id;
ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
......@@ -789,19 +800,28 @@ Dbtup::disk_page_set_dirty(Ptr<Page> pagePtr)
Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
Uint32 free = pagePtr.p->free_space;
Uint32 used = pagePtr.p->uncommitted_used_space;
if (unlikely(pagePtr.p->m_restart_seq != globalData.m_restart_seq))
{
jam();
restart_setup_page(alloc, pagePtr);
idx = alloc.calc_page_free_bits(free);
used = 0;
}
else
{
idx &= ~0x8000;
ddassert(idx == alloc.calc_page_free_bits(free - used));
}
ddassert(free >= used);
Tablespace_client tsman(0, c_tsman,
fragPtr.p->fragTableId,
fragPtr.p->fragmentId,
fragPtr.p->m_tablespace_id);
ddassert(free >= used);
idx= alloc.calc_page_free_bits(free - used);
pagePtr.p->list_index = idx;
ArrayPool<Page> *pool= (ArrayPool<Page>*)&m_global_page_pool;
LocalDLList<Page> list(*pool, alloc.m_dirty_pages[idx]);
......@@ -1339,10 +1359,16 @@ Dbtup::disk_restart_undo_callback(Signal* signal,
Uint64 lsn = 0;
lsn += pagePtr.p->m_page_header.m_page_lsn_hi; lsn <<= 32;
lsn += pagePtr.p->m_page_header.m_page_lsn_lo;
undo->m_page_ptr = pagePtr;
if (undo->m_lsn <= lsn)
{
undo->m_page_ptr = pagePtr;
if (DBG_UNDO)
{
ndbout << "apply: " << undo->m_lsn << "(" << lsn << " )"
<< key << " type: " << undo->m_type << endl;
}
update = true;
if (DBG_UNDO)
......@@ -1368,13 +1394,17 @@ Dbtup::disk_restart_undo_callback(Signal* signal,
ndbout << "disk_restart_undo: " << undo->m_type << " "
<< undo->m_key << endl;
disk_restart_undo_page_bits(signal, undo);
lsn = undo->m_lsn - 1; // make sure undo isn't run again...
m_pgman.update_lsn(undo->m_key, lsn);
}
else if (DBG_UNDO)
{
ndbout << "ignore: " << undo->m_lsn << "(" << lsn << " )"
<< key << " type: " << undo->m_type << endl;
}
disk_restart_undo_page_bits(signal, undo);
disk_restart_undo_next(signal);
}
......
......@@ -280,6 +280,16 @@ Lgman::execDUMP_STATE_ORD(Signal* signal){
waiter.p->m_size,
2*File_formats::UNDO_PAGE_WORDS);
}
if (!ptr.p->m_log_sync_waiters.isEmpty())
{
LocalDLFifoList<Log_waiter>
list(m_log_waiter_pool, ptr.p->m_log_sync_waiters);
Ptr<Log_waiter> waiter;
list.first(waiter);
infoEvent(" m_last_synced_lsn: %lld: %d head(waiters).m_sync_lsn: %lld",
ptr.p->m_last_synced_lsn,
waiter.p->m_sync_lsn);
}
m_logfile_group_list.next(ptr);
}
}
......
......@@ -70,7 +70,6 @@ Pgman::Pgman(const Configuration & conf) :
addRecSignal(GSN_FSWRITEREF, &Pgman::execFSWRITEREF, true);
addRecSignal(GSN_FSWRITECONF, &Pgman::execFSWRITECONF);
addRecSignal(GSN_LCP_PREPARE_REQ, &Pgman::execLCP_PREPARE_REQ);
addRecSignal(GSN_LCP_FRAG_ORD, &Pgman::execLCP_FRAG_ORD);
addRecSignal(GSN_END_LCP_REQ, &Pgman::execEND_LCP_REQ);
......@@ -224,20 +223,20 @@ Pgman::execCONTINUEB(Signal* signal)
jam();
do_lcp_loop(signal);
break;
case PgmanContinueB::LCP_PREPARE:
case PgmanContinueB::LCP_LOCKED:
{
jam();
Ptr<Page_entry> ptr;
Page_sublist& pl = *m_page_sublist[Page_entry::SL_LOCKED];
pl.getPtr(ptr, data1);
if (pl.next(ptr))
if (data1 != RNIL)
{
process_lcp_prepare(signal, ptr);
pl.getPtr(ptr, data1);
process_lcp_locked(signal, ptr);
}
else
{
signal->theData[0] = 0;
sendSignal(DBLQH_REF, GSN_LCP_PREPARE_CONF, signal, 1, JBB);
signal->theData[0] = m_end_lcp_req.senderData;
sendSignal(m_end_lcp_req.senderRef, GSN_END_LCP_CONF, signal, 1, JBB);
}
return;
}
......@@ -1105,79 +1104,6 @@ Pgman::move_cleanup_ptr(Ptr<Page_entry> ptr)
// LCP
void
Pgman::execLCP_PREPARE_REQ(Signal* signal)
{
jamEntry();
/**
* Reserve pages for all LOCKED pages...
*/
Ptr<Page_entry> ptr;
Page_sublist& pl = *m_page_sublist[Page_entry::SL_LOCKED];
if (pl.first(ptr))
{
process_lcp_prepare(signal, ptr);
}
else
{
signal->theData[0] = 0;
sendSignal(DBLQH_REF, GSN_LCP_PREPARE_CONF, signal, 1, JBB);
}
}
void
Pgman::process_lcp_prepare(Signal* signal, Ptr<Page_entry> ptr)
{
ndbrequire(ptr.p->m_copy_page_i == RNIL);
Ptr<GlobalPage> copy;
ndbrequire(m_global_page_pool.seize(copy));
ptr.p->m_copy_page_i = copy.i;
signal->theData[0] = PgmanContinueB::LCP_PREPARE;
signal->theData[1] = ptr.i;
sendSignal(PGMAN_REF, GSN_CONTINUEB, signal, 2, JBB);
}
int
Pgman::create_copy_page(Ptr<Page_entry> ptr, Uint32 req_flags)
{
if (! (req_flags & DIRTY_FLAGS) && ! (ptr.p->m_state & Page_entry::COPY))
{
return ptr.p->m_real_page_i;
}
if (! (ptr.p->m_state & Page_entry::COPY))
{
ptr.p->m_state |= Page_entry::COPY;
Ptr<GlobalPage> src;
Ptr<GlobalPage> copy;
m_global_page_pool.getPtr(src, ptr.p->m_real_page_i);
m_global_page_pool.getPtr(copy, ptr.p->m_copy_page_i);
memcpy(copy.p, src.p, sizeof(GlobalPage));
}
return ptr.p->m_copy_page_i;
}
void
Pgman::restore_copy_page(Ptr<Page_entry> ptr)
{
Uint32 copyPtrI = ptr.p->m_copy_page_i;
if (ptr.p->m_state & Page_entry::COPY)
{
Ptr<GlobalPage> src;
Ptr<GlobalPage> copy;
m_global_page_pool.getPtr(src, ptr.p->m_real_page_i);
m_global_page_pool.getPtr(copy, copyPtrI);
memcpy(src.p, copy.p, sizeof(GlobalPage));
}
m_global_page_pool.release(copyPtrI);
ptr.p->m_state &= ~Page_entry::COPY;
ptr.p->m_copy_page_i = RNIL;
}
void
Pgman::execLCP_FRAG_ORD(Signal* signal)
......@@ -1251,7 +1177,8 @@ Pgman::process_lcp(Signal* signal)
DBG_LCP("LCP " << ptr << " - ");
if (ptr.p->m_last_lcp < m_last_lcp &&
(state & Page_entry::DIRTY))
(state & Page_entry::DIRTY) &&
(! (state & Page_entry::LOCKED)))
{
if(! (state & Page_entry::BOUND))
{
......@@ -1262,8 +1189,8 @@ Pgman::process_lcp(Signal* signal)
{
DBG_LCP(" BUSY" << endl);
break; // wait for it
}
if (state & Page_entry::PAGEOUT)
}
else if (state & Page_entry::PAGEOUT)
{
DBG_LCP(" PAGEOUT -> state |= LCP" << endl);
set_page_state(ptr, state | Page_entry::LCP);
......@@ -1279,11 +1206,6 @@ Pgman::process_lcp(Signal* signal)
ptr.p->m_last_lcp = m_last_lcp;
m_lcp_outstanding++;
}
else if (ptr.p->m_copy_page_i != RNIL)
{
DBG_LCP(" NOT DIRTY" << endl);
restore_copy_page(ptr);
}
else
{
DBG_LCP(" NOT DIRTY" << endl);
......@@ -1296,16 +1218,68 @@ Pgman::process_lcp(Signal* signal)
if (m_lcp_curr_bucket == ~(Uint32)0 && !m_lcp_outstanding)
{
DBG_LCP("GSN_END_LCP_CONF" << endl);
signal->theData[0] = m_end_lcp_req.senderData;
sendSignal(m_end_lcp_req.senderRef, GSN_END_LCP_CONF, signal, 1, JBB);
m_lcp_curr_bucket = ~(Uint32)0;
Ptr<Page_entry> ptr;
Page_sublist& pl = *m_page_sublist[Page_entry::SL_LOCKED];
if (pl.first(ptr))
{
process_lcp_locked(signal, ptr);
}
else
{
signal->theData[0] = m_end_lcp_req.senderData;
sendSignal(m_end_lcp_req.senderRef, GSN_END_LCP_CONF, signal, 1, JBB);
}
return false;
}
return true;
}
void
Pgman::process_lcp_locked(Signal* signal, Ptr<Page_entry> ptr)
{
ptr.p->m_last_lcp = m_last_lcp;
if (ptr.p->m_state & Page_entry::DIRTY)
{
Ptr<GlobalPage> org, copy;
ndbrequire(m_global_page_pool.seize(copy));
m_global_page_pool.getPtr(org, ptr.p->m_real_page_i);
memcpy(copy.p, org.p, sizeof(GlobalPage));
ptr.p->m_copy_page_i = copy.i;
m_lcp_outstanding++;
ptr.p->m_state |= Page_entry::LCP;
pageout(signal, ptr);
return;
}
Page_sublist& pl = *m_page_sublist[Page_entry::SL_LOCKED];
pl.next(ptr);
signal->theData[0] = PgmanContinueB::LCP_LOCKED;
signal->theData[1] = ptr.i;
sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
}
void
Pgman::process_lcp_locked_fswriteconf(Signal* signal, Ptr<Page_entry> ptr)
{
Ptr<GlobalPage> org, copy;
m_global_page_pool.getPtr(copy, ptr.p->m_copy_page_i);
m_global_page_pool.getPtr(org, ptr.p->m_real_page_i);
memcpy(org.p, copy.p, sizeof(GlobalPage));
m_global_page_pool.release(copy);
ptr.p->m_copy_page_i = RNIL;
Page_sublist& pl = *m_page_sublist[Page_entry::SL_LOCKED];
pl.next(ptr);
signal->theData[0] = PgmanContinueB::LCP_LOCKED;
signal->theData[1] = ptr.i;
sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
}
// page read and write
void
......@@ -1427,18 +1401,16 @@ Pgman::fswriteconf(Signal* signal, Ptr<Page_entry> ptr)
ndbrequire(m_stats.m_current_io_waits > 0);
m_stats.m_current_io_waits--;
if (ptr.p->m_copy_page_i != RNIL)
{
jam();
restore_copy_page(ptr);
state &= ~ Page_entry::COPY;
}
if (state & Page_entry::LCP)
{
ndbrequire(m_lcp_outstanding);
m_lcp_outstanding--;
state &= ~ Page_entry::LCP;
if (ptr.p->m_copy_page_i != RNIL)
{
process_lcp_locked_fswriteconf(signal, ptr);
}
}
set_page_state(ptr, state);
......@@ -1592,9 +1564,9 @@ Pgman::get_page(Signal* signal, Ptr<Page_entry> ptr, Page_request page_req)
! (req_flags & Page_request::UNLOCK_PAGE))
{
ptr.p->m_state |= (req_flags & DIRTY_FLAGS ? Page_entry::DIRTY : 0);
if (m_lcp_loop_on && ptr.p->m_copy_page_i != RNIL)
if (ptr.p->m_copy_page_i != RNIL)
{
return create_copy_page(ptr, req_flags);
return ptr.p->m_copy_page_i;
}
return ptr.p->m_real_page_i;
......
......@@ -305,7 +305,6 @@ private:
,PAGEIN = 0x0100 // paging in
,PAGEOUT = 0x0200 // paging out
,LOGSYNC = 0x0400 // undo WAL as part of pageout
,COPY = 0x0800 // Copy page for LCP
,LCP = 0x1000 // page is LCP flushed
,HOT = 0x2000 // page is hot
,ONSTACK = 0x4000 // page is on LIRS stack
......@@ -419,7 +418,6 @@ protected:
void execREAD_CONFIG_REQ(Signal* signal);
void execCONTINUEB(Signal* signal);
void execLCP_PREPARE_REQ(Signal* signal);
void execLCP_FRAG_ORD(Signal*);
void execEND_LCP_REQ(Signal*);
......@@ -462,9 +460,8 @@ private:
void move_cleanup_ptr(Ptr<Page_entry> ptr);
bool process_lcp(Signal*);
void process_lcp_prepare(Signal* signal, Ptr<Page_entry> ptr);
int create_copy_page(Ptr<Page_entry>, Uint32 req_flags);
void restore_copy_page(Ptr<Page_entry>);
void process_lcp_locked(Signal* signal, Ptr<Page_entry> ptr);
void process_lcp_locked_fswriteconf(Signal* signal, Ptr<Page_entry> ptr);
void pagein(Signal*, Ptr<Page_entry>);
void fsreadreq(Signal*, Ptr<Page_entry>);
......
......@@ -1777,7 +1777,7 @@ Tsman::restart_undo_page_free_bits(Signal* signal,
/**
* Handling of unmapped extent header pages is not implemented
*/
int flags = 0;
int flags = Page_cache_client::DIRTY_REQ;
int real_page_id;
if ((real_page_id = m_page_cache_client.get_page(signal, preq, flags)) > 0)
{
......@@ -1805,31 +1805,20 @@ Tsman::restart_undo_page_free_bits(Signal* signal,
lsn += page->m_page_header.m_page_lsn_hi; lsn <<= 32;
lsn += page->m_page_header.m_page_lsn_lo;
if (undo_lsn <= lsn)
{
/**
* Toggle word
*/
if (DBG_UNDO)
ndbout_c("tsman: apply %lld(%lld) %x -> %x",
undo_lsn, lsn, src, (bits | (bits << UNCOMMITTED_SHIFT)));
lsn = undo_lsn;
page->m_page_header.m_page_lsn_hi = lsn >> 32;
page->m_page_header.m_page_lsn_lo = lsn & 0xFFFFFFFF;
ndbassert((bits & ~(COMMITTED_MASK)) == 0);
header->update_free_bits(page_no_in_extent,
bits | (bits << UNCOMMITTED_SHIFT));
m_page_cache_client.update_lsn(preq.m_page, lsn);
}
else
/**
* Toggle word
*/
if (DBG_UNDO)
{
if (DBG_UNDO)
ndbout_c("tsman: apply %lld(%lld) %x -> %x",
undo_lsn, lsn, src, (bits | (bits << UNCOMMITTED_SHIFT)));
ndbout << "tsman: apply " << undo_lsn << "(" << lsn << ") "
<< *key << " " << (src & COMMITTED_MASK)
<< " -> " << bits << endl;
}
ndbassert((bits & ~(COMMITTED_MASK)) == 0);
header->update_free_bits(page_no_in_extent,
bits | (bits << UNCOMMITTED_SHIFT));
return 0;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment